2009-11-01 60 views
21

Recientemente me encontré con una implementación de patrón de productor/consumidor C#. es muy simple y (al menos para mí) muy elegante.C# productor/consumidor

parece haber sido ideado alrededor de 2006, así que me preguntaba si esta aplicación es
- seguro
- siendo aplicable Código

está por debajo (código original se ha hecho referencia en http://bytes.com/topic/net/answers/575276-producer-consumer#post2251375)

using System; 
using System.Collections; 
using System.Threading; 

public class Test 
{ 
    static ProducerConsumer queue; 

    static void Main() 
    { 
     queue = new ProducerConsumer(); 
     new Thread(new ThreadStart(ConsumerJob)).Start(); 

     Random rng = new Random(0); 
     for (int i=0; i < 10; i++) 
     { 
      Console.WriteLine ("Producing {0}", i); 
      queue.Produce(i); 
      Thread.Sleep(rng.Next(1000)); 
     } 
    } 

    static void ConsumerJob() 
    { 
     // Make sure we get a different random seed from the 
     // first thread 
     Random rng = new Random(1); 
     // We happen to know we've only got 10 
     // items to receive 
     for (int i=0; i < 10; i++) 
     { 
      object o = queue.Consume(); 
      Console.WriteLine ("\t\t\t\tConsuming {0}", o); 
      Thread.Sleep(rng.Next(1000)); 
     } 
    } 
} 

public class ProducerConsumer 
{ 
    readonly object listLock = new object(); 
    Queue queue = new Queue(); 

    public void Produce(object o) 
    { 
     lock (listLock) 
     { 
      queue.Enqueue(o); 

      // We always need to pulse, even if the queue wasn't 
      // empty before. Otherwise, if we add several items 
      // in quick succession, we may only pulse once, waking 
      // a single thread up, even if there are multiple threads 
      // waiting for items.    
      Monitor.Pulse(listLock); 
     } 
    } 

    public object Consume() 
    { 
     lock (listLock) 
     { 
      // If the queue is empty, wait for an item to be added 
      // Note that this is a while loop, as we may be pulsed 
      // but not wake up before another thread has come in and 
      // consumed the newly added object. In that case, we'll 
      // have to wait for another pulse. 
      while (queue.Count==0) 
      { 
       // This releases listLock, only reacquiring it 
       // after being woken up by a call to Pulse 
       Monitor.Wait(listLock); 
      } 
      return queue.Dequeue(); 
     } 
    } 
} 
+0

¿Qué versión de .NET estás buscando? –

+0

Hola, tengo un ejemplo de trabajo que usa BlockingCollection para implementar un patrón simple Productor-Consumidor en [este enlace] (https://stackoverflow.com/questions/733793/implementing-the-producer-consumer-pattern-in- c-sharp/47179576 # 47179576). –

Respuesta

28

El código es anterior a eso; lo escribí antes de que apareciera .NET 2.0. El concepto de una cola productor/consumidor es forma más antigua que aunque :)

Sí, ese código es seguro por lo que yo soy consciente - pero tiene algunas deficiencias:

  • No es genérico. Una versión moderna sin duda sería genérica.
  • No tiene forma de detener la cola. Una forma simple de detener la cola (para que todos los subprocesos de consumidor se retiren) es tener un token de "detener el trabajo" que se puede poner en la cola. A continuación, agrega tantos tokens como hilos tenga. Alternativamente, tiene una bandera separada para indicar que quiere detenerse. (Esto permite que los otros hilos se detengan antes de terminar todo el trabajo actual en la cola.)
  • Si los trabajos son muy pequeños, consumir un solo trabajo a la vez puede no ser lo más eficiente.

Las ideas detrás del código son más importantes que el código en sí, para ser sincero.

+0

pulsante no necesariamente despierta a un consumidor, sé que no es probable, pero en teoría, solo los productores pueden ejecutar infinitamente. también ive espera/impulso del monitor medido no tiene ventaja de rendimiento sobre los manejadores de espera del evento – TakeMeAsAGuest

+0

@TakeMeAsAGuest: No estoy seguro de lo que quiere decir con la primera parte. ¿Está diciendo que ha visto casos donde los hilos han estado esperando en un monitor, pero un pulso no hizo nada? ? En cuanto al rendimiento, hay muchos escenarios diferentes a tener en cuenta (hardware, software, cantidad de hilos en espera, etc.). Veré si puedo encontrar algunas referencias en el libro de Joe Duffy ... –

23

Podría hacer algo como el siguiente fragmento de código. Es genérico y tiene un método para poner nulos en cola (o cualquier indicador que quiera usar) para indicarle a los hilos de trabajo que salgan.

El código se toma aquí: http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 

namespace ConsoleApplication1 
{ 

    public class TaskQueue<T> : IDisposable where T : class 
    { 
     object locker = new object(); 
     Thread[] workers; 
     Queue<T> taskQ = new Queue<T>(); 

     public TaskQueue(int workerCount) 
     { 
      workers = new Thread[workerCount]; 

      // Create and start a separate thread for each worker 
      for (int i = 0; i < workerCount; i++) 
       (workers[i] = new Thread(Consume)).Start(); 
     } 

     public void Dispose() 
     { 
      // Enqueue one null task per worker to make each exit. 
      foreach (Thread worker in workers) EnqueueTask(null); 
      foreach (Thread worker in workers) worker.Join(); 
     } 

     public void EnqueueTask(T task) 
     { 
      lock (locker) 
      { 
       taskQ.Enqueue(task); 
       Monitor.PulseAll(locker); 
      } 
     } 

     void Consume() 
     { 
      while (true) 
      { 
       T task; 
       lock (locker) 
       { 
        while (taskQ.Count == 0) Monitor.Wait(locker); 
        task = taskQ.Dequeue(); 
       } 
       if (task == null) return;   // This signals our exit 
       Console.Write(task); 
       Thread.Sleep(1000);    // Simulate time-consuming task 
      } 
     } 
    } 
} 
+3

Esta es hasta ahora la mejor implementación del patrón del consumidor productor. Lo he usado recientemente en mi aplicación de subprocesos múltiples y funciona sin problemas incluso en 1000-1500 subprocesos. –

+0

Estoy seguro de que me falta algo aquí (estando _muy_ oxidado en mis habilidades C#) pero este ejemplo no invoca un método en la 'tarea' consumida (a diferencia del código [no genérico] referenciado (http: // www.albahari.com/threading/part4.aspx#%5FWait%5Fand%5FPulse) que almacena e invoca un delegado 'Action' en el consumidor. Entonces, ¿cuál es el punto de hacer esto genérico si no demuestra la invocación de método en la tarea consumida? (Tratando de entender este ejemplo vs. la implementación a la que se hace referencia.) – Inactivist

+2

@Inactivist Puede agregar una Acción al controlador, guardar como campo _acción, e invocar eso justo después de hacer Dequeue(), como sigue: _acción (tarea); – Marcus

1

Advertencia: Si usted lee los comentarios, vas a entender mi respuesta es incorrecta :)

Hay un posible estancamiento en tu codigo.

imaginar el siguiente caso, para mayor claridad, que utiliza un enfoque de un solo hilo, pero debería ser fácil de convertir a multi-hilo con el sueño:

// We create some actions... 
object locker = new object(); 

Action action1 =() => { 
    lock (locker) 
    { 
     System.Threading.Monitor.Wait(locker); 
     Console.WriteLine("This is action1"); 
    } 
}; 

Action action2 =() => { 
    lock (locker) 
    { 
     System.Threading.Monitor.Wait(locker); 
     Console.WriteLine("This is action2"); 
    } 
}; 

// ... (stuff happens, etc.) 

// Imagine both actions were running 
// and there's 0 items in the queue 

// And now the producer kicks in... 
lock (locker) 
{ 
    // This would add a job to the queue 

    Console.WriteLine("Pulse now!"); 
    System.Threading.Monitor.Pulse(locker); 
} 

// ... (more stuff) 
// and the actions finish now! 

Console.WriteLine("Consume action!"); 
action1(); // Oops... they're locked... 
action2(); 

Por favor, hágamelo saber si esto no lo hace cualquier sentido.

Si esto se confirma, la respuesta a su pregunta es "no, no es seguro";) Espero que esto ayude.

+0

No veo ningún punto muerto con el código del póster original, porque los artículos pueden agregarse a la cola en momentos en que cada consumidor esté fuera del candado (en cuyo caso el consumidor, la próxima vez que adquiera el candado, observará que la cola no está vacía y, por lo tanto, no tiene que esperar) o esperar un pulso (en cuyo caso, se garantiza que el pulso despierte al consumidor). – supercat

+0

Debo admitir que casi olvidé los detalles de lo que estaba pensando. Leyendo de nuevo, creo que el problema es el ciclo while. Mantendrá el hilo del consumidor bloqueado hasta que haya algo en la cola y eso evite que el productor se bloquee y enqueque. ¿Eso tiene sentido? – DiogoNeves

+1

El método 'Monitor.Wait()' libera el bloqueo que está esperando, hasta que otro hilo lo impulse. El hilo del consumidor se bloqueará dentro del método "Consumir", pero eso no evitará que el productor lo alimente. El mayor peligro sería que si el productor se da por vencido antes de generar todos los datos que el consumidor espera, el consumidor esperará para siempre por cosas que nunca llegarán. Eso puede ser tratado con, por ejemplo, teniendo una bandera 'AllDone'. – supercat

13

De vuelta en el día aprendí cómo funciona Monitor.Wait/Pulse (y mucho sobre los subprocesos en general) de la pieza de código anterior y el article series de. Entonces, como dice Jon, tiene mucho valor y, de hecho, es seguro y aplicable.

Sin embargo, a partir de.NET 4, hay un producer-consumer queue implementation in the framework. Solo lo encontré yo mismo, pero hasta este punto hace todo lo que necesito.