2009-02-12 13 views
38

Disculpe si hay una pregunta redundante. Sin embargo, he encontrado muchas soluciones a mi problema, pero ninguna de ellas está muy bien explicada. Espero que quede claro, aquí.Espere a que se completen los subprocesos

El hilo principal de la aplicación My C# genera 1..n trabajadores de fondo que usan ThreadPool. Deseo que el hilo original se bloquee hasta que todos los trabajadores hayan terminado. Investigué el ManualResetEvent en particular, pero no tengo claro su uso.

En seudo:

foreach(var o in collection) 
{ 
    queue new worker(o); 
} 

while(workers not completed) { continue; } 

Si es necesario, voy a saber el número de trabajadores que están a punto de ser puesto en cola antes de la mano.

+0

Hola echar un vistazo a puesto similar aquí http://stackoverflow.com/questions/358721/be-notified-when-all-background-threadpool-threads-are-finished – Valentin

Respuesta

54

probar esto. La función incluye una lista de delegados de Acción. Se agregará una entrada de trabajador de ThreadPool para cada elemento de la lista. Esperará a que se complete cada acción antes de regresar.

public static void SpawnAndWait(IEnumerable<Action> actions) 
{ 
    var list = actions.ToList(); 
    var handles = new ManualResetEvent[actions.Count()]; 
    for (var i = 0; i < list.Count; i++) 
    { 
     handles[i] = new ManualResetEvent(false); 
     var currentAction = list[i]; 
     var currentHandle = handles[i]; 
     Action wrappedAction =() => { try { currentAction(); } finally { currentHandle.Set(); } }; 
     ThreadPool.QueueUserWorkItem(x => wrappedAction()); 
    } 

    WaitHandle.WaitAll(handles); 
} 
+6

WaitHandle.WaitTodo falla si el número de identificadores es mayor que el sistema lo permite En mi servidor Win2k3, ese número es 64, así que recibo una excepción cuando intento generar más de 64 elementos ... –

+1

@Eran, intente escribir SpawAndWaitHelper, que esencialmente tiene el código anterior. Usa SpawAndWait para dividir el enumerable en 64 trozos de tamaño y llama al ayudante para cada fragmento. – JaredPar

+0

ah ... http://stackoverflow.com/questions/1045980/is-there-a-better-way-to-wait-for-queued-threads/1074770#1074770 –

13

Primero, ¿cuánto tiempo llevan a cabo los trabajadores? los subprocesos de grupo generalmente se deben usar para tareas de corta duración; si van a ejecutarse por un tiempo, considere los subprocesos manuales.

Re el problema; ¿realmente necesitas bloquear el hilo principal? ¿Puedes usar una devolución de llamada en su lugar? Si es así, algo así como:

int running = 1; // start at 1 to prevent multiple callbacks if 
      // tasks finish faster than they are started 
Action endOfThread = delegate { 
    if(Interlocked.Decrement(ref running) == 0) { 
     // ****run callback method**** 
    } 
}; 
foreach(var o in collection) 
{ 
    var tmp = o; // avoid "capture" issue 
    Interlocked.Increment(ref running); 
    ThreadPool.QueueUserWorkItem(delegate { 
     DoSomeWork(tmp); // [A] should handle exceptions internally 
     endOfThread(); 
    }); 
} 
endOfThread(); // opposite of "start at 1" 

Ésta es una (no hay primitivas OS) forma bastante ligero de seguimiento de los trabajadores.

Si necesita bloquear, puede hacer lo mismo usando un Monitor (de nuevo, evitando un objeto OS):

object syncLock = new object(); 
    int running = 1; 
    Action endOfThread = delegate { 
     if (Interlocked.Decrement(ref running) == 0) { 
      lock (syncLock) { 
       Monitor.Pulse(syncLock); 
      } 
     } 
    }; 
    lock (syncLock) { 
     foreach (var o in collection) { 
      var tmp = o; // avoid "capture" issue 
      ThreadPool.QueueUserWorkItem(delegate 
      { 
       DoSomeWork(tmp); // [A] should handle exceptions internally 
       endOfThread(); 
      }); 
     } 
     endOfThread(); 
     Monitor.Wait(syncLock); 
    } 
    Console.WriteLine("all done"); 
+2

Su código esperará infinitamente si uno de los delegados arroja una excepción. – JaredPar

+2

Si uno de esos delegados lanza una excepción, voy a perder todo el proceso, por lo que es bastante arbitrario ... supongo que no se lanzará, pero lo haré explícito ;-p –

+0

los trabajadores procesarán operaciones costosas, incluyendo la lectura y escritura de archivos y la realización de selecciones e inserciones de SQL que involucren columnas Binarias/Imagen. Es poco probable que vivan lo suficiente como para requerir hilos explícitos, pero el rendimiento podría obtenerse al permitirles ejecutar en paralelo. – Kivin

1

Creo que estaba en el camino correcto con el ManualResetEvent. Este link tiene un ejemplo de código que coincide exactamente con lo que intenta hacer. La clave es usar WaitHandle.WaitAll y pasar una serie de eventos de espera. Cada hilo necesita establecer uno de estos eventos de espera.

// Simultaneously calculate the terms. 
    ThreadPool.QueueUserWorkItem(
     new WaitCallback(CalculateBase)); 
    ThreadPool.QueueUserWorkItem(
     new WaitCallback(CalculateFirstTerm)); 
    ThreadPool.QueueUserWorkItem(
     new WaitCallback(CalculateSecondTerm)); 
    ThreadPool.QueueUserWorkItem(
     new WaitCallback(CalculateThirdTerm)); 

    // Wait for all of the terms to be calculated. 
    WaitHandle.WaitAll(autoEvents); 

    // Reset the wait handle for the next calculation. 
    manualEvent.Reset(); 

Editar:

Asegúrese de que su trabajador en ruta de código hilo de configurar el evento (es decir autoEvents 1 .Ajustar();). Una vez que todos estén señalados, la espera volverá.

void CalculateSecondTerm(object stateInfo) 
{ 
    double preCalc = randomGenerator.NextDouble(); 
    manualEvent.WaitOne(); 
    secondTerm = preCalc * baseNumber * 
     randomGenerator.NextDouble(); 
    autoEvents[1].Set(); 
} 
29

Aquí hay un enfoque diferente: encapsulado; por lo que el código podría ser tan simple como:

Forker p = new Forker(); 
    foreach (var obj in collection) 
    { 
     var tmp = obj; 
     p.Fork(delegate { DoSomeWork(tmp); }); 
    } 
    p.Join(); 

Cuando se da la clase Forker a continuación (Me aburrí en el tren ;-P) ... otra vez, esto evita los objetos del sistema operativo, pero envuelve las cosas muy claramente (OMI):

using System; 
using System.Threading; 

/// <summary>Event arguments representing the completion of a parallel action.</summary> 
public class ParallelEventArgs : EventArgs 
{ 
    private readonly object state; 
    private readonly Exception exception; 
    internal ParallelEventArgs(object state, Exception exception) 
    { 
     this.state = state; 
     this.exception = exception; 
    } 

    /// <summary>The opaque state object that identifies the action (null otherwise).</summary> 
    public object State { get { return state; } } 

    /// <summary>The exception thrown by the parallel action, or null if it completed without exception.</summary> 
    public Exception Exception { get { return exception; } } 
} 

/// <summary>Provides a caller-friendly wrapper around parallel actions.</summary> 
public sealed class Forker 
{ 
    int running; 
    private readonly object joinLock = new object(), eventLock = new object(); 

    /// <summary>Raised when all operations have completed.</summary> 
    public event EventHandler AllComplete 
    { 
     add { lock (eventLock) { allComplete += value; } } 
     remove { lock (eventLock) { allComplete -= value; } } 
    } 
    private EventHandler allComplete; 
    /// <summary>Raised when each operation completes.</summary> 
    public event EventHandler<ParallelEventArgs> ItemComplete 
    { 
     add { lock (eventLock) { itemComplete += value; } } 
     remove { lock (eventLock) { itemComplete -= value; } } 
    } 
    private EventHandler<ParallelEventArgs> itemComplete; 

    private void OnItemComplete(object state, Exception exception) 
    { 
     EventHandler<ParallelEventArgs> itemHandler = itemComplete; // don't need to lock 
     if (itemHandler != null) itemHandler(this, new ParallelEventArgs(state, exception)); 
     if (Interlocked.Decrement(ref running) == 0) 
     { 
      EventHandler allHandler = allComplete; // don't need to lock 
      if (allHandler != null) allHandler(this, EventArgs.Empty); 
      lock (joinLock) 
      { 
       Monitor.PulseAll(joinLock); 
      } 
     } 
    } 

    /// <summary>Adds a callback to invoke when each operation completes.</summary> 
    /// <returns>Current instance (for fluent API).</returns> 
    public Forker OnItemComplete(EventHandler<ParallelEventArgs> handler) 
    { 
     if (handler == null) throw new ArgumentNullException("handler"); 
     ItemComplete += handler; 
     return this; 
    } 

    /// <summary>Adds a callback to invoke when all operations are complete.</summary> 
    /// <returns>Current instance (for fluent API).</returns> 
    public Forker OnAllComplete(EventHandler handler) 
    { 
     if (handler == null) throw new ArgumentNullException("handler"); 
     AllComplete += handler; 
     return this; 
    } 

    /// <summary>Waits for all operations to complete.</summary> 
    public void Join() 
    { 
     Join(-1); 
    } 

    /// <summary>Waits (with timeout) for all operations to complete.</summary> 
    /// <returns>Whether all operations had completed before the timeout.</returns> 
    public bool Join(int millisecondsTimeout) 
    { 
     lock (joinLock) 
     { 
      if (CountRunning() == 0) return true; 
      Thread.SpinWait(1); // try our luck... 
      return (CountRunning() == 0) || 
       Monitor.Wait(joinLock, millisecondsTimeout); 
     } 
    } 

    /// <summary>Indicates the number of incomplete operations.</summary> 
    /// <returns>The number of incomplete operations.</returns> 
    public int CountRunning() 
    { 
     return Interlocked.CompareExchange(ref running, 0, 0); 
    } 

    /// <summary>Enqueues an operation.</summary> 
    /// <param name="action">The operation to perform.</param> 
    /// <returns>The current instance (for fluent API).</returns> 
    public Forker Fork(ThreadStart action) { return Fork(action, null); } 

    /// <summary>Enqueues an operation.</summary> 
    /// <param name="action">The operation to perform.</param> 
    /// <param name="state">An opaque object, allowing the caller to identify operations.</param> 
    /// <returns>The current instance (for fluent API).</returns> 
    public Forker Fork(ThreadStart action, object state) 
    { 
     if (action == null) throw new ArgumentNullException("action"); 
     Interlocked.Increment(ref running); 
     ThreadPool.QueueUserWorkItem(delegate 
     { 
      Exception exception = null; 
      try { action(); } 
      catch (Exception ex) { exception = ex;} 
      OnItemComplete(state, exception); 
     }); 
     return this; 
    } 
} 
+0

(HI MARC! ¿Recuerda esta publicación?) Por curiosidad, ¿por qué es necesario var tmp = obj? Lo implementé simplemente pasando mi objeto y obtuve resultados locos. Cambiarlo a usar var terminó funcionando. ¡Claramente no estoy entendiendo algo! Gracias, y mira si puedes recordar después de solo dos años :) – DanTheMan

+1

@user La respuesta a eso es un poco complicada, pero en resumen, es porque C# no hace silenciosamente exactamente lo que querías sin siquiera darte cuenta. En general, es bastante bueno para hacer esto sin ambigüedades en todos los lugares correctos, pero no en este caso. –

+4

Debe comprender que el código 'delegate {DoSomeWork (tmp); } '* captura * la variable' tmp'. Cada llamada a este código captura una variable * diferente * cada vez que gira alrededor del bucle, porque el alcance de 'tmp' está limitado al cuerpo del bucle. Sin embargo, la variable 'foreach' es la * misma * variable cada vez que pasa el ciclo, por lo que todas las llamadas a' delegan {DoSomeWork (tmp); } 'capturar lo mismo. Esto realmente no necesita ser así; limitar el alcance de la variable foreach habría hecho que muchos códigos "solo funcionaran" sin que la gente se diera cuenta de la dificultad de la situación. –

1

Usando .NET 4.Clase 0 Barrie r:

 Barrier sync = new Barrier(1); 

     foreach(var o in collection) 
     { 
      WaitCallback worker = (state) => 
      { 
       // do work 
       sync.SignalAndWait(); 
      }; 

      sync.AddParticipant(); 
      ThreadPool.QueueUserWorkItem(worker, o); 
     } 

     sync.SignalAndWait(); 
+1

Existe un límite superior en la cantidad de participantes que se pueden usar. :( –

8

He estado usando la nueva biblioteca de tareas en paralelo en CTP here:

 Parallel.ForEach(collection, o => 
      { 
       DoSomeWork(o); 
      }); 
+0

¡Buena sugerencia! También es más fácil cuando se trata de manejar excepciones. Consulte: http://msdn.microsoft.com/en-us/library/dd991486.aspx – Joop

+0

Tenga especial cuidado ya que esto usa el ThreadPool y no es posible para obligarlo a utilizar subprocesos dedicados (no administrados). Incluso el uso de la opción TaskFactory subyacente con LongRunning solo proporciona una pista sobre el planificador, pero no es una garantía para un subproceso dedicado. – eduncan911

3

Aquí es una solución utilizando la clase CountdownEvent.

var complete = new CountdownEvent(1); 
foreach (var o in collection) 
{ 
    var capture = o; 
    ThreadPool.QueueUserWorkItem((state) => 
    { 
     try 
     { 
     DoSomething(capture); 
     } 
     finally 
     { 
     complete.Signal(); 
     } 
    }, null); 
} 
complete.Signal(); 
complete.Wait(); 

Por supuesto, si usted tiene acceso a la clase CountdownEvent entonces usted tiene todo el TPL para trabajar. La clase Parallel se ocupa de la espera.

Parallel.ForEach(collection, o => 
    { 
    DoSomething(o); 
    }); 
Cuestiones relacionadas