2011-03-09 15 views
5

He creado una clase cuyo propósito es abstraer el control del acceso concurrente a una cola.Cómo se limpia correctamente después de que se cancela la tarea de larga ejecución

La clase está diseñada para ser instanciada en un único subproceso, escrita por varios subprocesos y luego leída desde un subproceso individual posterior.

Tengo una sola tarea de larga ejecución generada dentro de la clase que realizará un ciclo de bloqueo y activará un evento si un elemento se quita con éxito.

Mi pregunta es esta: ¿Mi implementación de la cancelación de la tarea de larga ejecución Y el posterior uso correcto de limpieza/restablecimiento del objeto CancellationTokenSource?

Idealmente, me gustaría que un objeto activo pueda detenerse y reiniciarse mientras se mantiene la disponibilidad para agregar a la cola.

He utilizado el artículo de Peter Bromberg como base: Producer/Consumer Queue and BlockingCollection in C# 4.0

siguiente código:

using System; 
using System.Collections.Concurrent; 
using System.Threading; 
using System.Threading.Tasks; 

namespace Test 
{ 
    public delegate void DeliverNextQueuedItemHandler<T>(T item); 

public sealed class SOQueueManagerT<T> 
{ 

    ConcurrentQueue<T> _multiQueue; 
    BlockingCollection<T> _queue; 
    CancellationTokenSource _canceller; 
    Task _listener = null; 

    public event DeliverNextQueuedItemHandler<T> OnNextItem; 

    public bool IsRunning { get; private set; } 
    public int QueueSize 
    { 
     get 
     { 
      if (_queue != null) 
       return _queue.Count; 
      return -1; 
     } 
    } 

    public CancellationTokenSource CancellationTokenSource 
    { 
     get 
     { 
      if (_canceller == null) 
       _canceller = new CancellationTokenSource(); 

      return _canceller; 
     } 
    } 

    public SOQueueManagerT() 
    { 
     _multiQueue = new ConcurrentQueue<T>(); 
     _queue = new BlockingCollection<T>(_multiQueue); 

     IsRunning = false; 
    } 

    public void Start() 
    { 
     if (_listener == null) 
     { 


      IsRunning = true; 

      _listener = Task.Factory.StartNew(() => 
      { 

       while (!CancellationTokenSource.Token.IsCancellationRequested) 
       { 
        T item; 
        if (_queue.TryTake(out item, 100)) 
        { 
         if (OnNextItem != null) 
         { 

          OnNextItem(item); 
         } 
        } 

       } 
      }, 
      CancellationTokenSource.Token, 
      TaskCreationOptions.LongRunning, 
      TaskScheduler.Default); 
     } 
    } 

    public void Stop() 
    { 
     if (_listener != null) 
     { 
      CancellationTokenSource.Cancel(); 
      CleanUp(); 
     } 
    } 

    public void Add(T item) 
    { 
     _queue.Add(item); 
    } 

    private void CleanUp() 
    { 
     _listener.Wait(2000); 
     if (_listener.IsCompleted) 
     { 
      IsRunning = false; 
      _listener = null; 
      _canceller = null; 
     } 
    } 


} 
} 

ACTUALIZACIÓN Esto es lo que he ido con al final. No es perfecto, pero hasta ahora está haciendo el trabajo.

public sealed class TaskQueueManager<T> 
{ 
    ConcurrentQueue<T> _multiQueue; 
    BlockingCollection<T> _queue; 
    CancellationTokenSource _canceller; 
    Task _listener = null; 

    public event DeliverNextQueuedItemHandler<T> OnNextItem; 

    public bool IsRunning 
    { 
     get 
     { 
      if (_listener == null) 
       return false; 
      else if (_listener.Status == TaskStatus.Running || 
       _listener.Status == TaskStatus.Created || 
       _listener.Status == TaskStatus.WaitingForActivation || 
       _listener.Status == TaskStatus.WaitingToRun || 
       _listener.IsCanceled) 
       return true; 
      else 
       return false; 
     } 
    } 
    public int QueueSize 
    { 
     get 
     { 
      if (_queue != null) 
       return _queue.Count; 
      return -1; 
     } 
    } 

    public TaskQueueManager() 
    { 
     _multiQueue = new ConcurrentQueue<T>(); 
     _queue = new BlockingCollection<T>(_multiQueue); 
    } 

    public void Start() 
    { 
     if (_listener == null) 
     { 
      _canceller = new CancellationTokenSource(); 

      _listener = Task.Factory.StartNew(() => 
      { 
       while (!_canceller.Token.IsCancellationRequested) 
       { 
        T item; 
        if (_queue.TryTake(out item, 100)) 
        { 
         if (OnNextItem != null) 
         { 
          try 
          { 
           OnNextItem(item); 
          } 
          catch (Exception e) 
          { 
           //log or call an event 
          } 
         } 
        } 
       } 
      }, 
      _canceller.Token, 
      TaskCreationOptions.LongRunning, 
      TaskScheduler.Default); 
     } 
    } 

    public void Stop() 
    { 
     if (_listener != null) 
     { 
      _canceller.Cancel(); 

      if (_listener.IsCanceled && !_listener.IsCompleted) 
       _listener.Wait(); 

      _listener = null; 
      _canceller = null; 
     } 
    } 

    public void Add(T item) 
    { 
     if (item != null) 
     { 
      _queue.Add(item); 
     } 
     else 
     { 
      throw new ArgumentNullException("TaskQueueManager<" + typeof(T).Name + ">.Add item is null"); 
     } 
    } 
} 

Respuesta

1

La programación cuidadosa es lo único que va a cortar. Incluso si cancela la operación, es posible que tenga una operación pendiente que no se completa en un período de moda. Podría ser una operación de bloqueo en punto muerto. En este caso, su programa no terminará realmente.

Por ejemplo, si llamo a su método CleanUp varias veces o si llamo a Start primero, tengo la sensación de que se va a bloquear.

Un tiempo de espera de 2 segundos durante la limpieza, se siente más arbitrario de lo planeado, e incluso llegaría a cerciorarme de que las cosas se apaguen correctamente o cuelguen/cuelguen (nunca desea dejar cosas concurrentes en un estado desconocido).

Además, el IsRunning se establece explícitamente, no se infiere del estado del objeto.

Para obtener inspiración, me gustaría que miraras una clase similar que escribí recientemente, es un patrón productor/consumidor que hace su trabajo en un hilo de fondo. Puede encontrar ese código fuente en CodePlex. Sin embargo, esto fue diseñado para resolver un problema muy específico.

Aquí, la cancelación se resuelve ingresando un tipo específico que solo el hilo de trabajo reconoce y, por lo tanto, comienza a cerrarse. Esto también garantiza que nunca cancele el trabajo pendiente, solo se consideran unidades de trabajo completas.

Para mejorar un poco esta situación, puede tener un temporizador separado para el trabajo actual y cancelar o cancelar el trabajo incompleto si se cancela. Ahora, implementar una transacción como tiene un comportamiento similar al de prueba y error porque necesita ver cada posible caso de esquina y preguntarse, ¿qué sucede si el programa falla aquí? Idealmente, todas estas rutas de código conducen a un estado recuperable o conocido desde el que puede reanudar su trabajo. Pero como creo que ya has adivinado, eso requerirá una programación cuidadosa y muchas pruebas.

+0

John: Sí, también descubrí que llamar a Stop() varias veces probablemente causará problemas. He alterado el método Stop() para que espere a que la tarea se complete antes de volver. Sí, esto lo convierte en una llamada de bloqueo, en esta etapa está bien. Podría proporcionar una anulación en el método Stop para proporcionar un tiempo de espera en el que reenviar a la llamada de espera (tiempo de espera). Sin embargo, el punto de IsRunning que realiza es válido. – MattC

+0

@MattC ¿Has mirado la muestra del código? –

+0

Sí, lo hice, es interesante y veo lo que has hecho. el problema específico al que me refería es el manejo correcto del inicio, la detención y el reinicio de la tarea del consumidor interno. Feliz por agregar la cola mientras el consumidor está detenido. He eliminado completamente el método CleanUp y he basado la propiedad IsRunning más específicamente en el estado de la tarea. Voy a dejar esto abierto solo que alguien agrega una respuesta específicamente sobre mi uso de TPL. Si no, tomaré el tuyo. – MattC

Cuestiones relacionadas