2010-06-13 12 views
5

Me di cuenta de que cuando estoy tratando de procesar elementos en una cola concurrente usando múltiples hilos mientras que varios hilos pueden poner elementos en él, la solución ideal sería usar las Extensiones reactivas con las estructuras de datos concurrentes.Cómo usar IObservable/IObserver con ConcurrentQueue o ConcurrentStack

Mi pregunta original se encuentra en:

While using ConcurrentQueue, trying to dequeue while looping through in parallel

así que estoy ansioso por ver si hay alguna manera de tener una consulta LINQ (o PLINQ) que continuamente se desencolado como artículos se ponen en ella.

Estoy tratando de hacer que esto funcione de una manera en la que pueda tener n cantidad de productores ingresando a la cola y un número limitado de hilos para procesar, por lo que no sobrecargo la base de datos.

Si pudiera usar Rx framework, entonces podría iniciarlo y si se colocan 100 elementos dentro de 100ms, los 20 hilos que forman parte de la consulta PLINQ simplemente procesarían a través de la cola.

Hay tres tecnologías que estoy tratando de trabajar juntos:

  1. Rx Marco (LINQ reactiva)
  2. PLING
  3. System.Collections.Concurrent estructuras
+0

¿Puede explicar cómo esperaba que Rx lo ayudara aquí? –

+0

@Richard Szalay - Como mencioné cerca del final, mi pensamiento es que no tengo que sondear para ver si hay algo en la cola, simplemente podría reaccionar cuando algo se coloque allí, así que si hay una gran cantidad de elementos de repente se introducen, podría tener varios hilos haciendo el procesamiento. Estoy tratando de evitar las encuestas, que es lo que estoy haciendo en este momento. –

Respuesta

3

I don' No sé cuál es la mejor manera de lograr esto con Rx, pero recomendaría simplemente usar BlockingCollection<T> y producer-consumer pattern. El hilo principal agrega elementos a la colección, que usa ConcurrentQueue<T> debajo de forma predeterminada. Luego, tiene un Task separado que gira delante del que usa Parallel::ForEach sobre el BlockingCollection<T> para procesar tantos elementos de la colección como tenga sentido para el sistema al mismo tiempo. Ahora, probablemente también desee analizar el uso del método GetConsumingPartitioner de la biblioteca ParallelExtensions para ser más eficiente ya que el particionador predeterminado creará más sobrecarga de lo que desea en este caso. Puede leer más sobre esto en this blog post.

Cuando se termina el hilo principal que llaman CompleteAdding en el BlockingCollection<T> y Task::Wait en el Task que hace girar hasta que esperar a que todos los consumidores para terminar de procesar todos los elementos de la colección.

+0

El truco principal para usar 'BlockingCollection' es que el hilo consumidor bloquea. Un patrón Observable solo tomaría el hilo cuando hubiera algo que procesar. –

6

Drew tiene razón, creo que el ConcurrentQueue aunque suene perfecto para el trabajo es en realidad la estructura de datos subyacente que utiliza BlockingCollection. Me parece muy de atrás para adelante también. Consulte el capítulo 7 de este libro * http://www.amazon.co.uk/Parallel-Programming-Microsoft-NET-Decomposition/dp/0735651590/ref=sr_1_1?ie=UTF8&qid=1294319704&sr=8-1 y le explicará cómo usar BlockingCollection y tendrá varios productores y múltiples consumidores sacando cada uno la "cola". Deberá consultar el método "GetConsumingEnumerable()" y posiblemente simplemente invocar .ToObservable() sobre eso.

* el resto del libro es bastante normal.

edición:

Aquí es un programa de ejemplo que creo que hace lo que quiere?

class Program 
{ 
    private static ManualResetEvent _mre = new ManualResetEvent(false); 
    static void Main(string[] args) 
    { 
     var theQueue = new BlockingCollection<string>(); 
     theQueue.GetConsumingEnumerable() 
      .ToObservable(Scheduler.TaskPool) 
      .Subscribe(x => ProcessNewValue(x, "Consumer 1", 10000000)); 

     theQueue.GetConsumingEnumerable() 
      .ToObservable(Scheduler.TaskPool) 
      .Subscribe(x => ProcessNewValue(x, "Consumer 2", 50000000)); 

     theQueue.GetConsumingEnumerable() 
      .ToObservable(Scheduler.TaskPool) 
      .Subscribe(x => ProcessNewValue(x, "Consumer 3", 30000000)); 


     LoadQueue(theQueue, "Producer A"); 
     LoadQueue(theQueue, "Producer B"); 
     LoadQueue(theQueue, "Producer C"); 

     _mre.Set(); 

     Console.WriteLine("Processing now...."); 

     Console.ReadLine(); 
    } 

    private static void ProcessNewValue(string value, string consumerName, int delay) 
    { 
     Thread.SpinWait(delay); 
     Console.WriteLine("{1} consuming {0}", value, consumerName); 
    } 

    private static void LoadQueue(BlockingCollection<string> target, string prefix) 
    { 
     var thread = new Thread(() => 
            { 
             _mre.WaitOne(); 
             for (int i = 0; i < 100; i++) 
             { 
              target.Add(string.Format("{0} {1}", prefix, i)); 
             } 
            }); 
     thread.Start(); 
    } 
} 
+0

Eso es realmente ... un hombre ingenioso ... conectando a Rx con una colección de bloqueo. Wow ... incluso puedes hacer un pipeline con esto: https://msdn.microsoft.com/en-us/library/ff963548.aspx – Oooogi

Cuestiones relacionadas