2011-10-23 14 views
23

Me pregunto si existe una implementación/envoltura para ConcurrentQueue, similar a BlockingCollection donde tomar de la colección no bloquea, sino que es asincrónico y provocará una asincronía hasta que se coloque un elemento en el cola.espera Cola basada en tareas

He creado mi propia implementación, pero parece que no está funcionando como se esperaba. Me pregunto si estoy reinventando algo que ya existe.

Aquí está mi aplicación:

public class MessageQueue<T> 
{ 
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); 

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
     new ConcurrentQueue<TaskCompletionSource<T>>(); 

    object queueSyncLock = new object(); 

    public void Enqueue(T item) 
    { 
     queue.Enqueue(item); 
     ProcessQueues(); 
    } 

    public async Task<T> Dequeue() 
    { 
     TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(); 
     waitingQueue.Enqueue(tcs); 
     ProcessQueues(); 
     return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; 
    } 

    private void ProcessQueues() 
    { 
     TaskCompletionSource<T> tcs=null; 
     T firstItem=default(T); 
     while (true) 
     { 
      bool ok; 
      lock (queueSyncLock) 
      { 
       ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem); 
       if (ok) 
       { 
        waitingQueue.TryDequeue(out tcs); 
        queue.TryDequeue(out firstItem); 
       } 
      } 
      if (!ok) break; 
      tcs.SetResult(firstItem); 
     } 
    } 
} 
+0

oh asco .... ...... –

+21

@ AdamSack: de hecho, pero su comentario no me ayuda. – spender

Respuesta

36

No sé de una solución sin bloqueo, pero puede echar un vistazo a la nueva Dataflow library, parte de la Async CTP. Un simple BufferBlock<T> debería ser suficiente, por ejemplo .:

BufferBlock<int> buffer = new BufferBlock<int>(); 

producción y el consumo se realizan más fácilmente a través de métodos de extensión sobre los tipos de bloques de flujo de datos.

La producción es tan simple como:

buffer.Post(13); 

y el consumo es asincrónico listo:

int item = await buffer.ReceiveAsync(); 

Yo recomiendo que use flujo de datos, si es posible; hacer que ese buffer sea eficiente y correcto es más difícil de lo que parece.

+0

Esto parece muy prometedor ... lo verificará mañana. Gracias. Se parece mucho a un puerto CCR. – spender

+2

¡Echó un vistazo antes de irse a la cama! Parece que Dataflow se adapta muy bien a mis necesidades. Parece cerrar la brecha entre lo que ofrece TPL y lo que se ofrece en CCR (que he utilizado con gran éxito). Me hace sentir positivo de que el excelente trabajo en CCR no se ha desperdiciado. Esta es la respuesta correcta (¡y algo brillante y nuevo para hundir mis dientes!) Gracias @StephenCleary. – spender

1

Puede ser excesiva para su caso de uso (dada la curva de aprendizaje), pero Reactive Extentions proporciona todo el pegamento que usted podría desear para la composición asíncrona.

Se suscribe esencialmente a los cambios y se le envían a medida que están disponibles, y puede hacer que el sistema inserte los cambios en una secuencia separada.

+0

Estoy al menos parcialmente versado en Reactive, pero es un poco esotérico utilizarlo en producción, ya que otros pueden tener que mantener el código.Realmente estoy explorando la simplicidad que async/await está aportando a un producto de servidor previamente muy complicado, y estoy tratando de mantener toda la tecnología asincrónica bajo una sola tecnología. – spender

-1

Se podía usar un BlockingCollection (usando el valor por defecto ConcurrentQueue) y envolver la llamada a Take en un Task para que pueda await que:

var bc = new BlockingCollection<T>(); 

T element = await Task.Run(() => bc.Take()); 
+4

Buena idea, pero no estoy contento con el bloqueo. Voy a tener unos miles de clientes cada uno con su propia cola de mensajes. Cualquier bloqueo hundirá la nave porque atará hilos sin hacer nada. El motivo por el que quiero una Tarea no bloqueante es que pueda mantener todas las operaciones en el grupo de subprocesos sin causar la inanición de subprocesos. – spender

0

Aquí está la aplicación Actualmente estoy usando.

public class MessageQueue<T> 
{ 
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); 
    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
     new ConcurrentQueue<TaskCompletionSource<T>>(); 
    object queueSyncLock = new object(); 
    public void Enqueue(T item) 
    { 
     queue.Enqueue(item); 
     ProcessQueues(); 
    } 

    public async Task<T> DequeueAsync(CancellationToken ct) 
    { 
     TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(); 
     ct.Register(() => 
     { 
      lock (queueSyncLock) 
      { 
       tcs.TrySetCanceled(); 
      } 
     }); 
     waitingQueue.Enqueue(tcs); 
     ProcessQueues(); 
     return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; 
    } 

    private void ProcessQueues() 
    { 
     TaskCompletionSource<T> tcs = null; 
     T firstItem = default(T); 
     lock (queueSyncLock) 
     { 
      while (true) 
      { 
       if (waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem)) 
       { 
        waitingQueue.TryDequeue(out tcs); 
        if (tcs.Task.IsCanceled) 
        { 
         continue; 
        } 
        queue.TryDequeue(out firstItem); 
       } 
       else 
       { 
        break; 
       } 
       tcs.SetResult(firstItem); 
      } 
     } 
    } 
} 

funciona bastante bien, pero hay un buen montón de contención en queueSyncLock, como yo estoy haciendo un buen montón de uso de la CancellationToken para cancelar algunas de las tareas en espera. Por supuesto, esto conduce a mucho menos bloqueo que iba a ver con un BlockingCollection pero ...

Me pregunto si hay una más suave, bloquee los medios libres de lograr el mismo fin

2

Mi atempt (que tiene un evento genera cuando se crea una "promesa", y que puede ser utilizado por un productor externo para saber cuándo hay que producir más artículos):

public class AsyncQueue<T> 
{ 
    private ConcurrentQueue<T> _bufferQueue; 
    private ConcurrentQueue<TaskCompletionSource<T>> _promisesQueue; 
    private object _syncRoot = new object(); 

    public AsyncQueue() 
    { 
     _bufferQueue = new ConcurrentQueue<T>(); 
     _promisesQueue = new ConcurrentQueue<TaskCompletionSource<T>>(); 
    } 

    /// <summary> 
    /// Enqueues the specified item. 
    /// </summary> 
    /// <param name="item">The item.</param> 
    public void Enqueue(T item) 
    { 
     TaskCompletionSource<T> promise; 
     do 
     { 
      if (_promisesQueue.TryDequeue(out promise) && 
       !promise.Task.IsCanceled && 
       promise.TrySetResult(item)) 
      { 
       return;          
      } 
     } 
     while (promise != null); 

     lock (_syncRoot) 
     { 
      if (_promisesQueue.TryDequeue(out promise) && 
       !promise.Task.IsCanceled && 
       promise.TrySetResult(item)) 
      { 
       return; 
      } 

      _bufferQueue.Enqueue(item); 
     }    
    } 

    /// <summary> 
    /// Dequeues the asynchronous. 
    /// </summary> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    /// <returns></returns> 
    public Task<T> DequeueAsync(CancellationToken cancellationToken) 
    { 
     T item; 

     if (!_bufferQueue.TryDequeue(out item)) 
     { 
      lock (_syncRoot) 
      { 
       if (!_bufferQueue.TryDequeue(out item)) 
       { 
        var promise = new TaskCompletionSource<T>(); 
        cancellationToken.Register(() => promise.TrySetCanceled()); 

        _promisesQueue.Enqueue(promise); 
        this.PromiseAdded.RaiseEvent(this, EventArgs.Empty); 

        return promise.Task; 
       } 
      } 
     } 

     return Task.FromResult(item); 
    } 

    /// <summary> 
    /// Gets a value indicating whether this instance has promises. 
    /// </summary> 
    /// <value> 
    /// <c>true</c> if this instance has promises; otherwise, <c>false</c>. 
    /// </value> 
    public bool HasPromises 
    { 
     get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; } 
    } 

    /// <summary> 
    /// Occurs when a new promise 
    /// is generated by the queue 
    /// </summary> 
    public event EventHandler PromiseAdded; 
} 
+0

Creo que esta es la mejor solución. Lo he implementado y lo he probado extensivamente. Algunas notas: la llamada a! Promesa.Task.IsCanceled es innecesario. Agregué un ManualResetEventSlim para seguir cuando el bufferQueue está vacío para que un llamador pueda bloquear y esperar a que la cola se vacíe. –

+0

Usted [debería deshacerse] (http://stackoverflow.com/a/21653382/298609) 'CancellationTokenRegistration' que obtuvo de la llamada' cancellationToken.Register'. – Paya

Cuestiones relacionadas