2009-04-29 17 views
17

Estoy buscando una implementación de cola de bloqueo de subprocesos seguros para .NET. Por "cola de bloqueo de subprocesos de seguridad" quiero decir: - acceso seguro de subprocesos a una cola donde la llamada de método Dequeue bloquea un subproceso hasta que otro subproceso pone (Enqueue) algún valor. Por el momento encontré este: http://www.eggheadcafe.com/articles/20060414.asp (Pero es para .NET 1.1).Implementación de cola de bloqueo de subprocesos en .NET

Podría alguien comentar/criticar la corrección de esta implementación. O sugiera uno más. Gracias de antemano.

Respuesta

8

¿Qué tal este Creating a blocking Queue in .NET?

Si lo necesita para .NET 1.1 (no estaba seguro de la pregunta), solo deje caer los genéricos y reemplace T con object.

+0

Gracias por un enlace. Es extraño que no haya encontrado ese tema buscando "cola". Ok, ese tema y el mío son más o menos lo mismo. por cierto: no necesito portar a .net 1.1. La solución en el tema relacionado es muy similar en http://www.eggheadcafe.com/articles/20060414.asp – Shrike

+1

@Shrike, nada extraño. Solo un ejemplo más de lo mala que es la búsqueda de StackOverflow. Es tan malo que todos te digan que simplemente uses Google (y el comando 'site:') en su lugar. – Ash

+0

@Ash: eso es probablemente cierto ... así es como busco stackoverflow ;-p –

0

Queue.Synchronized http://msdn.microsoft.com/en-us/library/system.collections.queue.synchronized(VS.71).aspx

es un punto de partida de todos modos, nunca he utilizado una cola de bloqueo. Perdón por la publicación no tan relevante.

+1

Creo que esto es solo una cola segura para subprocesos, que es ligeramente diferente de lo que está pidiendo. – tylerl

+1

sí, lo vi después de publicar. Editado para reflejar –

+0

¿El método Dequeue de un contenedor devuelto por Queue.Synchronized() bloquea el hilo actual? No. Imagine que tiene dos hilos, uno de ellos está poniendo valores en una cola y otro está obteniendo de esa cola. El segundo subproceso tiene que agrupar la cola en una CPU que consume bucles. Es malo. – Shrike

0

Microsoft tiene un muy buen ejemplo de esto:

//Copyright (C) Microsoft Corporation. All rights reserved. 

using System; 
using System.Threading; 
using System.Collections; 
using System.Collections.Generic; 

// The thread synchronization events are encapsulated in this 
// class to allow them to easily be passed to the Consumer and 
// Producer classes. 
public class SyncEvents 
{ 
    public SyncEvents() 
    { 
     // AutoResetEvent is used for the "new item" event because 
     // we want this event to reset automatically each time the 
     // consumer thread responds to this event. 
     _newItemEvent = new AutoResetEvent(false); 

     // ManualResetEvent is used for the "exit" event because 
     // we want multiple threads to respond when this event is 
     // signaled. If we used AutoResetEvent instead, the event 
     // object would revert to a non-signaled state with after 
     // a single thread responded, and the other thread would 
     // fail to terminate. 
     _exitThreadEvent = new ManualResetEvent(false); 

     // The two events are placed in a WaitHandle array as well so 
     // that the consumer thread can block on both events using 
     // the WaitAny method. 
     _eventArray = new WaitHandle[2]; 
     _eventArray[0] = _newItemEvent; 
     _eventArray[1] = _exitThreadEvent; 
    } 

    // Public properties allow safe access to the events. 
    public EventWaitHandle ExitThreadEvent 
    { 
     get { return _exitThreadEvent; } 
    } 
    public EventWaitHandle NewItemEvent 
    { 
     get { return _newItemEvent; } 
    } 
    public WaitHandle[] EventArray 
    { 
     get { return _eventArray; } 
    } 

    private EventWaitHandle _newItemEvent; 
    private EventWaitHandle _exitThreadEvent; 
    private WaitHandle[] _eventArray; 
} 

// The Producer class asynchronously (using a worker thread) 
// adds items to the queue until there are 20 items. 
public class Producer 
{ 
    public Producer(Queue<int> q, SyncEvents e) 
    { 
     _queue = q; 
     _syncEvents = e; 
    } 
    public void ThreadRun() 
    { 
     int count = 0; 
     Random r = new Random(); 
     while (!_syncEvents.ExitThreadEvent.WaitOne(0, false)) 
     { 
      lock (((ICollection)_queue).SyncRoot) 
      { 
       while (_queue.Count < 20) 
       { 
        _queue.Enqueue(r.Next(0, 100)); 
        _syncEvents.NewItemEvent.Set(); 
        count++; 
       } 
      } 
     } 
     Console.WriteLine("Producer thread: produced {0} items", count); 
    } 
    private Queue<int> _queue; 
    private SyncEvents _syncEvents; 
} 

// The Consumer class uses its own worker thread to consume items 
// in the queue. The Producer class notifies the Consumer class 
// of new items with the NewItemEvent. 
public class Consumer 
{ 
    public Consumer(Queue<int> q, SyncEvents e) 
    { 
     _queue = q; 
     _syncEvents = e; 
    } 
    public void ThreadRun() 
    { 
     int count = 0; 
     while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1) 
     { 
      lock (((ICollection)_queue).SyncRoot) 
      { 
       int item = _queue.Dequeue(); 
      } 
      count++; 
     } 
     Console.WriteLine("Consumer Thread: consumed {0} items", count); 
    } 
    private Queue<int> _queue; 
    private SyncEvents _syncEvents; 
} 

public class ThreadSyncSample 
{ 
    private static void ShowQueueContents(Queue<int> q) 
    { 
     // Enumerating a collection is inherently not thread-safe, 
     // so it is imperative that the collection be locked throughout 
     // the enumeration to prevent the consumer and producer threads 
     // from modifying the contents. (This method is called by the 
     // primary thread only.) 
     lock (((ICollection)q).SyncRoot) 
     { 
      foreach (int i in q) 
      { 
       Console.Write("{0} ", i); 
      } 
     } 
     Console.WriteLine(); 
    } 

    static void Main() 
    { 
     // Configure struct containing event information required 
     // for thread synchronization. 
     SyncEvents syncEvents = new SyncEvents(); 

     // Generic Queue collection is used to store items to be 
     // produced and consumed. In this case 'int' is used. 
     Queue<int> queue = new Queue<int>(); 

     // Create objects, one to produce items, and one to 
     // consume. The queue and the thread synchronization 
     // events are passed to both objects. 
     Console.WriteLine("Configuring worker threads..."); 
     Producer producer = new Producer(queue, syncEvents); 
     Consumer consumer = new Consumer(queue, syncEvents); 

     // Create the thread objects for producer and consumer 
     // objects. This step does not create or launch the 
     // actual threads. 
     Thread producerThread = new Thread(producer.ThreadRun); 
     Thread consumerThread = new Thread(consumer.ThreadRun); 

     // Create and launch both threads.  
     Console.WriteLine("Launching producer and consumer threads...");   
     producerThread.Start(); 
     consumerThread.Start(); 

     // Let producer and consumer threads run for 10 seconds. 
     // Use the primary thread (the thread executing this method) 
     // to display the queue contents every 2.5 seconds. 
     for (int i = 0; i < 4; i++) 
     { 
      Thread.Sleep(2500); 
      ShowQueueContents(queue); 
     } 

     // Signal both consumer and producer thread to terminate. 
     // Both threads will respond because ExitThreadEvent is a 
     // manual-reset event--so it stays 'set' unless explicitly reset. 
     Console.WriteLine("Signaling threads to terminate..."); 
     syncEvents.ExitThreadEvent.Set(); 

     // Use Join to block primary thread, first until the producer thread 
     // terminates, then until the consumer thread terminates. 
     Console.WriteLine("main thread waiting for threads to finish..."); 
     producerThread.Join(); 
     consumerThread.Join(); 
    } 
} 
0

Por favor, tenga en cuenta que el bloqueo en código de llamada puede ser una mejor opción si usted tiene el control total sobre él. Considere acceder a su cola en un bucle: adquirirá innecesariamente bloqueos múltiples veces, lo que podría generar una penalización de rendimiento.

1

El ejemplo de Microsoft es bueno, pero no está encapsulado en una clase. Además, requiere que el hilo consumidor se esté ejecutando en el MTA (debido a la llamada WaitAny). Hay algunos casos en los que es posible que deba ejecutar en una STA (por ejemplo, si está haciendo interoperabilidad COM). En estos casos, WaitAny no puede ser utilizado.

Tengo una clase simple cola de bloqueo que supera este problema aquí: http://element533.blogspot.com/2010/01/stoppable-blocking-queue-for-net.html

20

Para la referencia, .NET 4 introduce el tipo System.Collections.Concurrent.BlockingCollection<T> para hacer frente a esto. Para la cola sin bloqueo, puede usar System.Collections.Concurrent.ConcurrentQueue<T>. Tenga en cuenta que ConcurrentQueue<T> probablemente se utilizará como el almacén de datos subyacente para el BlockingCollection<T> para el uso del OP.

+0

¿Podría proporcionar un ejemplo de código que agregue un par de métodos para que podamos ver esto en acción? – theJerm

Cuestiones relacionadas