8

Todo esto sucede en un servicio de Windows.Procesamiento de n elementos a la vez utilizando simultáneamente la Biblioteca de tareas paralelas

Tengo un Queue<T> (en realidad un ConcurrentQueue<T>) con elementos a la espera de ser procesados. Pero, no quiero procesar solo uno a la vez, quiero procesar n elementos al mismo tiempo, donde n es un entero configurable.

¿Cómo hago esto al usar la Biblioteca de tareas paralelas?

Sé que TPL dividirá las colecciones en nombre del desarrollador para el procesamiento simultáneo, pero no estoy seguro si esa es la característica que estoy buscando. Soy nuevo en multiprocesamiento y TPL.

Respuesta

4

Uso BlockingCollection<T> en lugar de ConcurrentQueue<T>, entonces usted puede comenzar a cualquier número de hilos de consumo y el uso de Take método de la BlockingCollection. si la colección está vacía, el método Take se bloqueará automáticamente en el hilo de la persona que llama esperando que se agreguen los elementos, de lo contrario los hilos consumirán todos los elementos de la cola en paralelo. Sin embargo, como su pregunta mencionó el uso de TPL, resulta que Parallel.ForEach tiene algunos problemas cuando se utiliza con BlockingCollection, consulte la publicación this para obtener más información. por lo que debe administrar la creación de sus hilos de consumo usted mismo. new Thread(/*consumer method*/) o new Task() ...

+0

El BlockingCollection derrota el propósito de la cola. No puedo eliminar un elemento de una colección de bloqueo mientras se está iterando. –

+2

No, puede usar su [GetConsumingEnumerable] (http://msdn.microsoft.com/en-us/library/dd287186.aspx) en su lugar. como, 'foreach (Elemento item en _collection.GetConsumingEnumerable())' también bloqueará allí esperando que se agreguen elementos si la colección está vacía. –

4

Aquí hay una idea que implica crear un método de extensión para TaskFactory.

public static class TaskFactoryExtension 
{ 
    public static Task StartNew(this TaskFactory target, Action action, int parallelism) 
    { 
     var tasks = new Task[parallelism]; 
     for (int i = 0; i < parallelism; i++) 
     { 
      tasks[i] = target.StartNew(action); 
     } 
     return target.StartNew(() => Task.WaitAll(tasks)); 
    } 
} 

A continuación, el código de llamada se vería como la siguiente.

ConcurrentQueue<T> queue = GetQueue(); 
int n = GetDegreeOfParallelism(); 
var task = Task.Factory.StartNew(
() => 
    { 
    T item; 
    while (queue.TryDequeue(out item)) 
    { 
     ProcessItem(item); 
    } 
    }, n); 
task.Wait(); // Optionally wait for everything to finish. 

Aquí hay otra idea usando Parallel.ForEach. El problema con este enfoque es que sus grados de paralelismo pueden no ser necesariamente respetados. Solo está indicando la cantidad máxima permitida y no la cantidad absoluta.

ConcurrentQueue<T> queue = GetQueue(); 
int n = GetDegreeOfParallelism(); 
Parallel.ForEach(queue, new ParallelOptions { MaxDegreeOfParallelism = n }, 
    (item) => 
    { 
    ProcessItem(item);  
    }); 
1

También me gustaría recomendar el uso de un BlockingCollection en lugar de utilizar directamente un ConcurrentQueue.

He aquí un ejemplo:

public class QueuingRequestProcessor 
{ 
    private BlockingCollection<MyRequestType> queue; 

    public void QueuingRequestProcessor(int maxConcurrent) 
    { 
    this.queue = new BlockingCollection<MyRequestType>(maxConcurrent); 

    Task[] consumers = new Task[maxConcurrent]; 

    for (int i = 0; i < maxConcurrent; i++) 
    { 
     consumers[i] = Task.Factory.StartNew(() => 
     { 
     // Will wait when queue is empty, until CompleteAdding() is called 
     foreach (var request in this.queue.GetConsumingEnumerable()) 
     { 
      Process(request); 
     } 
     }); 
    } 
    } 

    public void Add(MyRequest request) 
    { 
    this.queue.Add(request); 
    } 

    public void Stop() 
    { 
    this.queue.CompleteAdding(); 
    } 

    private void Process(MyRequestType request) 
    { 
    // Do your processing here 
    } 
} 

Tenga en cuenta que maxConcurrent en el constructor define cuántas solicitudes serán procesadas simultáneamente.

Cuestiones relacionadas