He creado una clase cuyo propósito es abstraer el control del acceso concurrente a una cola.Cómo se limpia correctamente después de que se cancela la tarea de larga ejecución
La clase está diseñada para ser instanciada en un único subproceso, escrita por varios subprocesos y luego leída desde un subproceso individual posterior.
Tengo una sola tarea de larga ejecución generada dentro de la clase que realizará un ciclo de bloqueo y activará un evento si un elemento se quita con éxito.
Mi pregunta es esta: ¿Mi implementación de la cancelación de la tarea de larga ejecución Y el posterior uso correcto de limpieza/restablecimiento del objeto CancellationTokenSource
?
Idealmente, me gustaría que un objeto activo pueda detenerse y reiniciarse mientras se mantiene la disponibilidad para agregar a la cola.
He utilizado el artículo de Peter Bromberg como base: Producer/Consumer Queue and BlockingCollection in C# 4.0
siguiente código:
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Test
{
public delegate void DeliverNextQueuedItemHandler<T>(T item);
public sealed class SOQueueManagerT<T>
{
ConcurrentQueue<T> _multiQueue;
BlockingCollection<T> _queue;
CancellationTokenSource _canceller;
Task _listener = null;
public event DeliverNextQueuedItemHandler<T> OnNextItem;
public bool IsRunning { get; private set; }
public int QueueSize
{
get
{
if (_queue != null)
return _queue.Count;
return -1;
}
}
public CancellationTokenSource CancellationTokenSource
{
get
{
if (_canceller == null)
_canceller = new CancellationTokenSource();
return _canceller;
}
}
public SOQueueManagerT()
{
_multiQueue = new ConcurrentQueue<T>();
_queue = new BlockingCollection<T>(_multiQueue);
IsRunning = false;
}
public void Start()
{
if (_listener == null)
{
IsRunning = true;
_listener = Task.Factory.StartNew(() =>
{
while (!CancellationTokenSource.Token.IsCancellationRequested)
{
T item;
if (_queue.TryTake(out item, 100))
{
if (OnNextItem != null)
{
OnNextItem(item);
}
}
}
},
CancellationTokenSource.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
}
public void Stop()
{
if (_listener != null)
{
CancellationTokenSource.Cancel();
CleanUp();
}
}
public void Add(T item)
{
_queue.Add(item);
}
private void CleanUp()
{
_listener.Wait(2000);
if (_listener.IsCompleted)
{
IsRunning = false;
_listener = null;
_canceller = null;
}
}
}
}
ACTUALIZACIÓN Esto es lo que he ido con al final. No es perfecto, pero hasta ahora está haciendo el trabajo.
public sealed class TaskQueueManager<T>
{
ConcurrentQueue<T> _multiQueue;
BlockingCollection<T> _queue;
CancellationTokenSource _canceller;
Task _listener = null;
public event DeliverNextQueuedItemHandler<T> OnNextItem;
public bool IsRunning
{
get
{
if (_listener == null)
return false;
else if (_listener.Status == TaskStatus.Running ||
_listener.Status == TaskStatus.Created ||
_listener.Status == TaskStatus.WaitingForActivation ||
_listener.Status == TaskStatus.WaitingToRun ||
_listener.IsCanceled)
return true;
else
return false;
}
}
public int QueueSize
{
get
{
if (_queue != null)
return _queue.Count;
return -1;
}
}
public TaskQueueManager()
{
_multiQueue = new ConcurrentQueue<T>();
_queue = new BlockingCollection<T>(_multiQueue);
}
public void Start()
{
if (_listener == null)
{
_canceller = new CancellationTokenSource();
_listener = Task.Factory.StartNew(() =>
{
while (!_canceller.Token.IsCancellationRequested)
{
T item;
if (_queue.TryTake(out item, 100))
{
if (OnNextItem != null)
{
try
{
OnNextItem(item);
}
catch (Exception e)
{
//log or call an event
}
}
}
}
},
_canceller.Token,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
}
public void Stop()
{
if (_listener != null)
{
_canceller.Cancel();
if (_listener.IsCanceled && !_listener.IsCompleted)
_listener.Wait();
_listener = null;
_canceller = null;
}
}
public void Add(T item)
{
if (item != null)
{
_queue.Add(item);
}
else
{
throw new ArgumentNullException("TaskQueueManager<" + typeof(T).Name + ">.Add item is null");
}
}
}
John: Sí, también descubrí que llamar a Stop() varias veces probablemente causará problemas. He alterado el método Stop() para que espere a que la tarea se complete antes de volver. Sí, esto lo convierte en una llamada de bloqueo, en esta etapa está bien. Podría proporcionar una anulación en el método Stop para proporcionar un tiempo de espera en el que reenviar a la llamada de espera (tiempo de espera). Sin embargo, el punto de IsRunning que realiza es válido. – MattC
@MattC ¿Has mirado la muestra del código? –
Sí, lo hice, es interesante y veo lo que has hecho. el problema específico al que me refería es el manejo correcto del inicio, la detención y el reinicio de la tarea del consumidor interno. Feliz por agregar la cola mientras el consumidor está detenido. He eliminado completamente el método CleanUp y he basado la propiedad IsRunning más específicamente en el estado de la tarea. Voy a dejar esto abierto solo que alguien agrega una respuesta específicamente sobre mi uso de TPL. Si no, tomaré el tuyo. – MattC