2011-01-12 12 views
14

Sobre el tema de esperar hasta que se completen las tareas y la sincronización de la secuencia.Parallel.ForEach - Elegante cancelación

Actualmente tengo una iteración que he adjuntado dentro de un Parallel.ForEach. En el siguiente ejemplo he planteado algunas preguntas en los comentarios sobre la mejor manera de manejar la terminación graciosa del ciclo (.NET 4.0);

private void myFunction() 
    { 

     IList<string> iListOfItems = new List<string>(); 
     // populate iListOfItems 

     CancellationTokenSource cts = new CancellationTokenSource(); 

     ParallelOptions po = new ParallelOptions(); 
     po.MaxDegreeOfParallelism = 20; // max threads 
     po.CancellationToken = cts.Token; 

     try 
     { 
      var myWcfProxy = new myWcfClientSoapClient(); 

      if (Parallel.ForEach(iListOfItems, po, (item, loopsate) => 
      { 
       try 
       { 
        if (_requestedToStop) 
         loopsate.Stop(); 
        // long running blocking WS call, check before and after 
        var response = myWcfProxy.ProcessIntervalConfiguration(item); 
        if (_requestedToStop) 
         loopsate.Stop(); 

        // perform some local processing of the response object 
       } 
       catch (Exception ex) 
       { 
        // cannot continue game over. 
        if (myWcfProxy.State == CommunicationState.Faulted) 
        { 
         loopsate.Stop(); 
         throw; 
        } 
       } 

       // else carry on.. 
       // raise some events and other actions that could all risk an unhanded error. 

      } 
      ).IsCompleted) 
      { 
       RaiseAllItemsCompleteEvent(); 
      } 
     } 
     catch (Exception ex) 
     { 
      // if an unhandled error is raised within one of the Parallel.ForEach threads, do all threads in the 
      // ForEach abort? or run to completion? Is loopsate.Stop (or equivalent) called as soon as the framework raises an Exception? 
      // Do I need to call cts.Cancel here? 

      // I want to wait for all the threads to terminate before I continue at this point. Howe do we achieve that? 

      // do i need to call cts.Dispose() ? 

      MessageBox.Show(Logging.FormatException(ex)); 
     } 
     finally 
     { 

      if (myWcfProxy != null) 
      { 
      // possible race condition with the for-each threads here unless we wait for them to terminate. 
       if (myWcfProxy.State == System.ServiceModel.CommunicationState.Faulted) 
        myWcfProxy.Abort(); 

       myWcfProxy.Close(); 
      } 

      // possible race condition with the for-each threads here unless we wait for them to terminate. 
      _requestedToStop = false; 

     } 

    } 

Cualquier ayuda sería muy apreciada. La documentación de MSDN habla de ManualResetEventSlim's y cancellationToken.WaitHandle's. pero no estoy seguro de cómo conectarlos con esto, parece estar luchando por comprender los ejemplos de MSDN ya que la mayoría no se aplican.

Respuesta

8

Me he burlado de algunos códigos a continuación que pueden responder a su pregunta. El punto básico es que obtiene un paralelismo fork/join con Parallel.ForEach, por lo que no necesita preocuparse por las condiciones de carrera fuera de la tarea paralela (el hilo de llamada bloquea hasta que las tareas se hayan completado, con éxito o de otra manera). Solo quiere asegurarse de usar la variable LoopState (el segundo argumento para la lambda) para controlar su estado de bucle.

Si cualquier iteración del bucle arrojó una excepción no controlada, el bucle global aumentará la AggregateException capturada al final.

Otros enlaces que mencionan este tema:

Parallel.ForEach throws exception when processing extremely large sets of data

http://msdn.microsoft.com/en-us/library/dd460720.aspx

Does Parallel.ForEach limits the number of active threads?

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Text; 
using System.Threading; 
using System.Threading.Tasks; 
using System.ServiceModel; 

namespace Temp 
{ 
    public class Class1 
    { 
     private class MockWcfProxy 
     { 
      internal object ProcessIntervalConfiguration(string item) 
      { 
       return new Object(); 
      } 

      public CommunicationState State { get; set; } 
     } 

     private void myFunction() 
     { 

      IList<string> iListOfItems = new List<string>(); 
      // populate iListOfItems 

      CancellationTokenSource cts = new CancellationTokenSource(); 

      ParallelOptions po = new ParallelOptions(); 
      po.MaxDegreeOfParallelism = 20; // max threads 
      po.CancellationToken = cts.Token; 

      try 
      { 
       var myWcfProxy = new MockWcfProxy(); 

       if (Parallel.ForEach(iListOfItems, po, (item, loopState) => 
        { 
         try 
         { 
          if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional) 
           loopState.Stop(); 

          // long running blocking WS call, check before and after 
          var response = myWcfProxy.ProcessIntervalConfiguration(item); 

          if (loopState.ShouldExitCurrentIteration || loopState.IsExceptional) 
           loopState.Stop(); 

          // perform some local processing of the response object 
         } 
         catch (Exception ex) 
         { 
          // cannot continue game over. 
          if (myWcfProxy.State == CommunicationState.Faulted) 
          { 
           loopState.Stop(); 
           throw; 
          } 

          // FYI you are swallowing all other exceptions here... 
         } 

         // else carry on.. 
         // raise some events and other actions that could all risk an unhanded error. 
        } 
       ).IsCompleted) 
       { 
        RaiseAllItemsCompleteEvent(); 
       } 
      } 
      catch (AggregateException aggEx) 
      { 
       // This section will be entered if any of the loops threw an unhandled exception. 
       // Because we re-threw the WCF exeption above, you can use aggEx.InnerExceptions here 
       // to see those (if you want). 
      } 
      // Execution will not get to this point until all of the iterations have completed (or one 
      // has failed, and all that were running when that failure occurred complete). 
     } 

     private void RaiseAllItemsCompleteEvent() 
     { 
      // Everything completed... 
     } 
    } 
} 
+0

gracias por la comprensión. Debo decir que en el momento en que con razón me señala "tragando todas las demás excepciones aquí", estoy haciendo una llamada de registro que registrará el servicio web o la excepción WCF del lado del cliente. La intención es que el ciclo continúe si la excepción no da como resultado un proxy WCF invalidado. Anticipo errores de tiempo de espera o excepciones de fallas en el lado del servidor. sin el cual para esta funcionalidad particular, necesita alguna funcionalidad atenuante en la captura. sin embargo, estaríamos revisando el archivo de registro y se investigaría cualquier excepción de este tipo. – Terry

+0

Lo que me confundió sobre el Parallel.ForEach fue que yo también asumí que debería ser una llamada de bloqueo hasta que todos los hilos en el conjunto se completen (excepciones almacenadas o no), sin embargo, el número de hilos informados como ejecutados en un punto de interrupción establecido en, por ejemplo su bloque catch (AggregateException aggEx) informaría como 20 hilos en VS 2010 thread viewer. Así que saqué sysinternals y miré el ejecutable vshost que se estaba depurando y también se mostraron 22 subprocesos, incluyendo la interfaz de usuario y la bomba de mensajes. – Terry

+0

Además, los eventos que se generaron dentro del ciclo, arrojaban más excepciones después de que la función se haya ejecutado y se haya ejecutado finalmente el bloque. – Terry