2011-12-19 10 views
6

Tengo un código que usa Parallel.ForEach, probablemente basado en una versión anterior de extensiones Rx o en la Biblioteca de tareas paralelas. Instalé una versión actual de las extensiones Rx pero no puedo encontrar Parallel.ForEach. No estoy usando cualquier otra materia de lujo de la biblioteca y sólo quieren procesar algunos datos en paralelo de esta manera:Extensiones de Rx: ¿Dónde está Parallel.ForEach?

Parallel.ForEach(records, ProcessRecord); 

He encontrado this question, pero no me gustaría que depender de un viejo versiones de Rx. Pero no pude encontrar algo similar para Rx, entonces, ¿cuál es la forma más sencilla y actual de hacerlo utilizando una versión actual de Rx? El proyecto está usando .NET 3.5.

+2

¿Desea actualizar a .NET 4.0? – dtb

+0

posible duplicado de [Parallel.ForEach faltante de Extensiones reactivas para .Net 3.5] (http://stackoverflow.com/questions/7398962/parallel-foreach-missing-from-reactive-extensions-for-net-3-5) –

+1

@Hasan: He vinculado a la pregunta que mencionas, por lo que obviamente era consciente de ello. Pero la respuesta es proponer usar una versión anterior de Rx, que no quiero usar. – Achim

Respuesta

24

No hay necesidad de hacer todo esto goosery tonta si tiene Rx:

records.ToObservable() 
    .SelectMany(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler)) 
    .ToList() 
    .First(); 

(O bien, si desea que el orden de los elementos mantenidos en el costa de la eficiencia):

records.ToObservable() 
    .Select(x => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler)) 
    .Concat() 
    .ToList() 
    .First(); 

O si desea limitar el número de productos al mismo tiempo:

records.ToObservable() 
    .Select(x => Observable.Defer(() => Observable.Start(() => ProcessRecord(x), Scheduler.ThreadPoolScheduler))) 
    .Merge(5 /* at a time */) 
    .ToList() 
    .First(); 
+0

¿Qué le parece estrangular esto para procesar solo n elementos en un momento dado? Estoy probando esto con cientos de elementos y estoy tratando de hacerlos todos a la vez, lo que está generando enormes problemas de memoria en mi aplicación. Me gustaría poder restringirlo a hacer 5 a la vez, por ejemplo. – Clint

+2

Actualizado para incluir concurrencia limitante –

1

Aquí hay una simple sustitución:

class Parallel 
{ 
    public static void ForEach<T>(IEnumerable<T> source, Action<T> body) 
    { 
     if (source == null) 
     { 
      throw new ArgumentNullException("source"); 
     } 
     if (body == null) 
     { 
      throw new ArgumentNullException("body"); 
     } 
     var items = new List<T>(source); 
     var countdown = new CountdownEvent(items.Count); 
     WaitCallback callback = state => 
     { 
      try 
      { 
       body((T)state); 
      } 
      finally 
      { 
       countdown.Signal(); 
      } 
     }; 
     foreach (var item in items) 
     { 
      ThreadPool.QueueUserWorkItem(callback, item); 
     } 
     countdown.Wait(); 
    } 
} 
+0

Parece prometedor, pero ¿cómo sabe el código cuando se procesaron todos los artículos? Parallel.ForEach bloquea hasta que se hayan procesado todos los elementos, por lo que no tengo que preocuparme por eso. – Achim

+1

Es obviamente una simplificación de lo que hace Parallel.ForEach. Expandí mi respuesta para bloquear hasta que se hayan procesado todos los elementos. – dtb

+2

Vale la pena considerar exactamente cuál es el trade-off aquí en comparación con la actualización a 4.0. Los autores de 'Parallel.ForEach' pusieron mucho trabajo en lograr un buen equilibrio entre el grado de concurrencia y la cantidad de sobrecarga. @dtb, por otro lado, dio una buena respuesta que ocupa alrededor de 30LoC. Si encuentra que esta respuesta hace todo lo que necesita, entonces días felices. Si te encuentras retocando y ajustando y el código aquí crece a cientos de LoC, entonces ya no tienes una buena versión corta, sino una mala versión larga en la que básicamente estás reescribiendo más y más de lo que ya está el '4.0 Paralelo' ha probado. –

Cuestiones relacionadas