2009-02-07 9 views
7

¿Hay alguna manera de hacer foreach iteración de estilo sobre enumerables paralelos en C#? Para las listas subscripibles, sé que se podría usar un ciclo normal de for iterando un int sobre el rango de índice, pero realmente prefiero foreach a for por varias razones.iteración paralela en C#?

Los puntos de bonificación si funciona en C# 2.0

+0

wouldn' ¿Es posible que un bucle for sea una solución más simple, más corta y legible en lugar de la respuesta Combine a continuación? ¿Cuáles son sus razones para preferir foreach en este caso? – Gishu

+0

También la iteración paralela acaba de salir a la luz en Ruby 1.9, así que apostaría que no está en C# a partir de ahora ... LISP lo tuvo :) – Gishu

+0

No estoy seguro de entender el pregunta correcta ¿Está tratando de iterar en múltiples enumerables en paralelo, o está tratando de recorrer un enumerable, procesando diferentes elementos en paralelo? –

Respuesta

9

Respuesta corta, no. foreach funciona en solo un enumerable a la vez.

Sin embargo, si combina sus enumeraciones paralelas en una sola, puede foreach en combinación. No estoy al tanto de cualquier fácil, construido en el método de hacer esto, pero lo siguiente debe funcionar (aunque no lo he probado):

public IEnumerable<TSource[]> Combine<TSource>(params object[] sources) 
{ 
    foreach(var o in sources) 
    { 
     // Choose your own exception 
     if(!(o is IEnumerable<TSource>)) throw new Exception(); 
    } 

    var enums = 
     sources.Select(s => ((IEnumerable<TSource>)s).GetEnumerator()) 
     .ToArray(); 

    while(enums.All(e => e.MoveNext())) 
    { 
     yield return enums.Select(e => e.Current).ToArray(); 
    } 
} 

A continuación, puede foreach sobre el vuelto enumerables:

foreach(var v in Combine(en1, en2, en3)) 
{ 
    // Remembering that v is an array of the type contained in en1, 
    // en2 and en3. 
} 
+0

¿Por qué eligió object [] en lugar de IEnumerable como tipo de parámetro? Eliminaría la excepción, ¿no? – mafu

+0

Había al menos una versión del lenguaje C# que no compilaría params con otra cosa que no fuera objeto []. Teniendo en cuenta que esta respuesta tiene cinco años, supongo que la única versión que había usado en ese momento era una que no funcionaría con 'parametros IEnumerable [] sources'. En estos días, por supuesto usaría la tipificación más explícita. Probablemente también use el método de extensión 'IEnumerable .Zip' para dos enumerables, bastante seguro de que esto tampoco existía hace cinco años. – Zooba

0

¿Funcionaría para usted?

public static class Parallel 
{ 
    public static void ForEach<T>(IEnumerable<T>[] sources, 
            Action<T> action) 
    { 
     foreach (var enumerable in sources) 
     { 
      ThreadPool.QueueUserWorkItem(source => { 
       foreach (var item in (IEnumerable<T>)source) 
        action(item); 
      }, enumerable); 
     } 
    } 
} 

// sample usage: 
static void Main() 
{ 
    string[] s1 = { "1", "2", "3" }; 
    string[] s2 = { "4", "5", "6" }; 
    IEnumerable<string>[] sources = { s1, s2 }; 
    Parallel.ForEach(sources, s => Console.WriteLine(s)); 
    Thread.Sleep(0); // allow background threads to work 
} 

Para C# 2.0, debe convertir las expresiones lambda anteriores en delegadas.

Nota: Este método de utilidad utiliza subprocesos de fondo. Es posible que desee modificarlo para utilizar subprocesos de primer plano, y probablemente desee esperar hasta que finalicen todos los subprocesos. Si lo hace, le sugiero que cree subprocesos sources.Length - 1, y use el subproceso de ejecución actual para la última (o primera) fuente.

(Me gustaría poder incluir a la espera de hilos para terminar en mi código, pero lo siento que no sé cómo hacer eso todavía. Creo que se debe utilizar un WaitHandleThread.Join().)

11

. BlockingCollection de .NET 4 hace esto bastante fácil. Crea un BlockingCollection, devuelve su .GetConsumingEnumerable() en el método enumerable. Entonces el foreach simplemente se agrega a la colección de bloqueo.

E.g.

private BlockingCollection<T> m_data = new BlockingCollection<T>(); 

public IEnumerable<T> GetData(IEnumerable<IEnumerable<T>> sources) 
{ 
    Task.Factory.StartNew(() => ParallelGetData(sources)); 
    return m_data.GetConsumingEnumerable(); 
} 

private void ParallelGetData(IEnumerable<IEnumerable<T>> sources) 
{ 
    foreach(var source in sources) 
    { 
     foreach(var item in source) 
     { 
      m_data.Add(item); 
     }; 
    } 

    //Adding complete, the enumeration can stop now 
    m_data.CompleteAdding(); 
} 

Espero que esto ayude. BTW posted a blog about this anoche

Andre

+1

Esto debería marcarse como correcto ahora que .net 4.0 está fuera –

+0

Esto no responde la pregunta, que es sobre enumerar múltiples enumerables juntos. –

+0

Tienes toda la razón, Arthur, lo arreglaré – Andre

3

me escribió una implementación de EachParallel() de la biblioteca paralela .NET4. Es compatible con .NET 3.5: Parallel ForEach Loop in C# 3.5 Uso:

string[] names = { "cartman", "stan", "kenny", "kyle" }; 
names.EachParallel(name => 
{ 
    try 
    { 
     Console.WriteLine(name); 
    } 
    catch { /* handle exception */ } 
}); 

Implementación:

/// <summary> 
/// Enumerates through each item in a list in parallel 
/// </summary> 
public static void EachParallel<T>(this IEnumerable<T> list, Action<T> action) 
{ 
    // enumerate the list so it can't change during execution 
    list = list.ToArray(); 
    var count = list.Count(); 

    if (count == 0) 
    { 
     return; 
    } 
    else if (count == 1) 
    { 
     // if there's only one element, just execute it 
     action(list.First()); 
    } 
    else 
    { 
     // Launch each method in it's own thread 
     const int MaxHandles = 64; 
     for (var offset = 0; offset < list.Count()/MaxHandles; offset++) 
     { 
      // break up the list into 64-item chunks because of a limitiation    // in WaitHandle 
      var chunk = list.Skip(offset * MaxHandles).Take(MaxHandles); 

      // Initialize the reset events to keep track of completed threads 
      var resetEvents = new ManualResetEvent[chunk.Count()]; 

      // spawn a thread for each item in the chunk 
      int i = 0; 
      foreach (var item in chunk) 
      { 
       resetEvents[i] = new ManualResetEvent(false); 
       ThreadPool.QueueUserWorkItem(new WaitCallback((object data) => 
       { 
        int methodIndex = (int)((object[])data)[0]; 

        // Execute the method and pass in the enumerated item 
        action((T)((object[])data)[1]); 

        // Tell the calling thread that we're done 
        resetEvents[methodIndex].Set(); 
       }), new object[] { i, item }); 
       i++; 
      } 

      // Wait for all threads to execute 
      WaitHandle.WaitAll(resetEvents); 
     } 
    } 
} 
1

Si usted quiere meter a los fundamentos - Reescribí la respuesta aceptada actualmente de una manera más simple:

public static IEnumerable<TSource[]> Combine<TSource> (this IEnumerable<IEnumerable<TSource>> sources) 
    { 
     var enums = sources 
      .Select (s => s.GetEnumerator()) 
      .ToArray(); 

     while (enums.All (e => e.MoveNext())) { 
      yield return enums.Select (e => e.Current).ToArray(); 
     } 
    } 

    public static IEnumerable<TSource[]> Combine<TSource> (params IEnumerable<TSource>[] sources) 
    { 
     return sources.Combine(); 
    }