2011-12-07 15 views
21

que desea procesar algo con bucle en paralelo como esto:Parallel.Foreach + yield return?

public void FillLogs(IEnumerable<IComputer> computers) 
{ 
    Parallel.ForEach(computers, cpt=> 
    { 
     cpt.Logs = cpt.GetRawLogs().ToList(); 
    }); 

} 

Ok, funciona bien. Pero ¿Cómo hacer si quiero que el método FillLogs devuelva un IEnumerable?

public IEnumerable<IComputer> FillLogs(IEnumerable<IComputer> computers) 
{ 
    Parallel.ForEach(computers, cpt=> 
    { 
     cpt.Logs = cpt.GetRawLogs().ToList(); 
     yield return cpt // KO, don't work 
    }); 

} 

EDITAR

No parece ser posible ... pero yo uso algo como esto:

public IEnumerable<IComputer> FillLogs(IEnumerable<IComputer> computers) 
{ 
    return computers.AsParallel().Select(cpt => cpt); 
} 

Pero donde pongo la instrucción cpt.Logs = cpt.GetRawLogs().ToList();

+0

Su tipo de retorno 'IEnumerable 'no tomará la carga. –

Respuesta

12

Versión corta - no, eso no es posible a través de un bloque iterador; la versión más larga probablemente involucra la cola/dequeue sincronizada entre el hilo iterador de la persona que llama (haciendo la dequeue) y los trabajadores paralelos (haciendo la cola); pero como nota al margen: los registros generalmente están vinculados a IO, y las cosas en paralelo que están vinculadas a IO a menudo no funcionan muy bien.

Si la persona que llama se va a tomar algún tiempo para consumen cada uno, entonces puede haber algún mérito a un enfoque que sólo procesa un tronco a la vez, pero puede hacer que mientras la persona que llama está consumiendo la anterior Iniciar sesión; es decir, comienza a Taskpara el próximo artículo antes del yield, y espera su finalización después del yield ... pero eso es nuevamente bastante complejo. Como un ejemplo simplificado:

static void Main() 
{ 
    foreach(string s in Get()) 
    { 
     Console.WriteLine(s); 
    } 
} 

static IEnumerable<string> Get() { 
    var source = new[] {1, 2, 3, 4, 5}; 
    Task<string> outstandingItem = null; 
    Func<object, string> transform = x => ProcessItem((int) x); 
    foreach(var item in source) 
    { 
     var tmp = outstandingItem; 

     // note: passed in as "state", not captured, so not a foreach/capture bug 
     outstandingItem = new Task<string>(transform, item); 
     outstandingItem.Start(); 

     if (tmp != null) yield return tmp.Result; 
    } 
    if (outstandingItem != null) yield return outstandingItem.Result; 
} 
static string ProcessItem(int i) 
{ 
    return i.ToString(); 
} 
+0

No exactamente pero tuve un problema similar (demasiado pasado por alto :)) con el resultado del rendimiento de retorno en paralelo.foreach. El contexto diferente en el que pensé podría ayudar a alguien. http://stackoverflow.com/questions/32183463/why-i-get-duplicated-values-during-a-parallel-task-execution-using-an-iterator – Spock

4

Pongo No quiero ser ofensivo, pero tal vez hay una falta de comprensión. Parallel.ForEach significa que el TPL ejecutará el foreach de acuerdo con el hardware disponible en varios subprocesos. ¡Pero eso significa que es posible hacer ese trabajo en paralelo! yield return le da la oportunidad de obtener algunos valores de una lista (o cualquier otra) y devolverlos uno por uno a medida que se necesiten. Evita la necesidad de encontrar primero todos los elementos que coincidan con la condición y luego iterar sobre ellos. Esa es de hecho una ventaja de rendimiento, pero no se puede hacer en paralelo.

+1

Pero si cada generación de los valores de retorno de rendimiento toma algo de tiempo, ¿no habría casos en los que desee procesar el siguiente valor de retorno de rendimiento en paralelo para que pueda obtenerlo más rápido? Supongo que me gustaría tener un búfer o algo así? No sé si hay una falta de comprensión, pero puedo imaginar (incluso entendiendo) casos en los que uno querría ceder más rápido. Supongo que la intención del rendimiento es procesar en el momento de la necesidad, por lo que tal vez el retorno del rendimiento no encaje estrictamente. Pero ciertamente puedo imaginar lo que se desea ... –

0

¿Qué tal

  Queue<string> qu = new Queue<string>(); 
      bool finished = false; 
      Task.Factory.StartNew(() => 
      { 
       Parallel.ForEach(get_list(), (item) => 
       { 
        string itemToReturn = heavyWorkOnItem(item);   
        lock (qu) 
         qu.Enqueue(itemToReturn);       
       }); 
       finished = true; 
      }); 

      while (!finished) 
      { 
       lock (qu) 
        while (qu.Count > 0) 
         yield return qu.Dequeue(); 
       //maybe a thread sleep here? 
      } 

Editar: creo que esto es mejor:

 public static IEnumerable<TOutput> ParallelYieldReturn<TSource, TOutput>(this IEnumerable<TSource> source, Func<TSource, TOutput> func) 
     { 
      ConcurrentQueue<TOutput> qu = new ConcurrentQueue<TOutput>(); 
      bool finished = false; 
      AutoResetEvent re = new AutoResetEvent(false); 
      Task.Factory.StartNew(() => 
      { 
       Parallel.ForEach(source, (item) => 
       { 
        qu.Enqueue(func(item)); 
        re.Set(); 
       }); 
       finished = true; 
       re.Set(); 
      }); 

      while (!finished) 
      { 
       re.WaitOne(); 
       while (qu.Count > 0) 
       { 
        TOutput res; 
        if (qu.TryDequeue(out res)) 
         yield return res; 
       } 
      } 
     } 

Edit2: Estoy de acuerdo con el corto n respuesta. Este código es inútil; no puedes romper el ciclo de rendimiento.

Cuestiones relacionadas