2011-06-10 16 views
6

Actualmente estoy trabajando en un proyecto, donde tenemos el desafío de procesar elementos en paralelo. Hasta ahora no es gran cosa;) Ahora para el problema. Tenemos una lista de ID, donde periódicamente (cada 2 segundos) lo que llamamos un StoredProcedure para cada ID. Los 2 segundos deben verificarse para cada elemento individualmente, ya que se agregan y eliminan durante el tiempo de ejecución. Además, queremos configurar el máximo grado de paralelismo, ya que la base de datos no se debe inundar con 300 subprocesos al mismo tiempo. Un elemento que se está procesando no debe reprogramarse para su procesamiento hasta que haya finalizado con la ejecución anterior. La razón es que queremos evitar hacer cola en una gran cantidad de artículos, en caso de retrasos en la base de datos.TPL architectural question

En este momento estamos utilizando un componente de desarrollo propio, que tiene un hilo principal, que comprueba periódicamente qué elementos deben programarse para su procesamiento. Una vez que tiene la lista, está descartando aquellos en un grupo de subprocesos personalizado basado en IOCP, y luego utiliza waithandles para esperar a que se procesen los elementos. Luego comienza la siguiente iteración. IOCP debido al robo de trabajo que proporciona.

Me gustaría reemplazar esta implementación personalizada con una versión TPL/.NET 4, y me gustaría saber cómo la resolvería (idealmente simple y bien legible/mantenible). Conozco este artículo: http://msdn.microsoft.com/en-us/library/ee789351.aspx, pero solo limita la cantidad de subprocesos que se utilizan. Deja el trabajo robando, ejecutando periódicamente los artículos ....

Idealmente se convertirá en un componente genérico, que se puede utilizar para algunas de las tareas que deben realizarse periódicamente para obtener una lista de elementos.

cualquiera de bienvenida de entrada, tia Martin

+0

reactiva Programación –

Respuesta

9

no creo que realmente necesita para conseguir abajo y sucio con TPL directa Tasks para esto. Para empezar, configuraría un BlockingCollection alrededor de un ConcurrentQueue (el valor predeterminado) sin BoundedCapacity establecido en el BlockingCollection para almacenar los ID que deben procesarse.

// Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service) 
BlockingCollection<string> idsToProcess = new BlockingCollection<string>(); 

A partir de ahí me acaba de utilizar Parallel::ForEach en la enumeración de regresar de la BlockingCollection::GetConsumingEnumerable. En la llamada ForEach configurará su ParallelOptions::MaxDegreeOfParallelism Dentro del cuerpo del ForEach, ejecutará su procedimiento almacenado.

Ahora, una vez que se completa la ejecución del procedimiento almacenado, está diciendo que no desea volver a programar la ejecución para por lo menos dos segundos. No hay problema, programe un System.Threading.Timer con una devolución de llamada que simplemente agregará la ID al BlockingCollection en la devolución de llamada proporcionada.

Parallel.ForEach(
    idsToProcess.GetConsumingEnumerable(), 
    new ParallelOptions 
    { 
     MaxDegreeOfParallelism = 4 // read this from config 
    }, 
    (id) => 
    { 
     // ... execute sproc ... 

     // Need to declare/assign this before the delegate so that we can dispose of it inside 
     Timer timer = null; 

     timer = new Timer(
      _ => 
      { 
       // Add the id back to the collection so it will be processed again 
       idsToProcess.Add(id); 

       // Cleanup the timer 
       timer.Dispose(); 
      }, 
      null, // no state, id wee need is "captured" in the anonymous delegate 
      2000, // probably should read this from config 
      Timeout.Infinite); 
    } 

Por último, cuando el proceso se está cerrando que llamarían BlockingCollection::CompleteAdding para que el enumerable que se procesa con el bloqueo de parada y completa y el paralelo :: ParaCada se cerrará. Si esto fuera un servicio de Windows, por ejemplo, haría esto en OnStop.

// When ready to shutdown you just signal you're done adding 
idsToProcess.CompleteAdding(); 

actualización

usted ha planteado una preocupación válida en su comentario de que es posible que se procesa una gran cantidad de identificadores en un momento dado y temen que no habría demasiada sobrecarga en un contador de tiempo por ID .Estoy absolutamente de acuerdo con eso. Entonces, en el caso de que esté lidiando con una gran lista de ID al mismo tiempo, cambiaría de utilizar un temporizador por ID a usar otra cola para mantener los ID "dormidos" que son monitoreados por un único temporizador de intervalo corto. En primer lugar se necesita un ConcurrentQueue en el que colocar los identificadores que están dormidos:

ConcurrentQueue<Tuple<string, DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string, DateTime>>(); 

Ahora, estoy usando una de dos partes Tuple aquí con fines ilustrativos, pero es posible que desee crear un mundo más fuertemente tipado struct for it (o al menos alias con una instrucción using) para una mejor legibilidad. La tupla tiene la identificación y un DateTime que representa cuando se colocó en la cola.

Ahora usted también querrá configurar el temporizador que supervisará esta cola:

Timer wakeSleepingIdsTimer = new Timer(
    _ => 
    { 
     DateTime utcNow = DateTime.UtcNow; 

     // Pull all items from the sleeping queue that have been there for at least 2 seconds 
     foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2)) 
     { 
      // Add this id back to the processing queue 
      idsToProcess.Enqueue(id); 
     } 
    }, 
    null, // no state 
    Timeout.Infinite, // no due time 
    100 // wake up every 100ms, probably should read this from config 
); 

Entonces sólo tendría que cambiar el Parallel::ForEach hacer lo siguiente en lugar de establecer un temporizador para cada uno:

(id) => 
{ 
     // ... execute sproc ... 

     sleepingIds.Enqueue(Tuple.Create(id, DateTime.UtcNow)); 
} 
+0

buena idea, pero ¿no cree que esto creará un poco de un problema de recursos? Quiero decir, si tengo, por ejemplo, 500 elementos en la lista, estoy un poco preocupado por la gran cantidad de temporizadores que se ejecutan ... –

+0

Pensé en eso, pero no me dio límites claros, así que estaba esperando una respuesta sobre si esto cumplía o no con su necesariamente. Puede remediarlo fácilmente con otra cola y un temporizador único que supervisa la cola en busca de entradas vencidas y las vuelve a mover a la cola de trabajos principal. Agregará detalles a mi respuesta. –

1

Esto es bastante similar al enfoque que dijiste que ya tenías en tu pregunta, pero lo hace con las tareas de TPL. Una tarea simplemente se agrega a una lista de cosas para programar cuando está hecha.

El uso de bloqueo en una lista simple es bastante fea en este ejemplo, probablemente desee una mejor recolección de mantener la lista de cosas para programar

// Fill the idsToSchedule 
for (int id = 0; id < 5; id++) 
{ 
    idsToSchedule.Add(Tuple.Create(DateTime.MinValue, id)); 
} 

// LongRunning will tell TPL to create a new thread to run this on 
Task.Factory.StartNew(SchedulingLoop, TaskCreationOptions.LongRunning); 

Eso pone en marcha el SchedulingLoop, que en realidad realiza la comprobar si su sido dos segundos ya que algo corría

// Tuple of the last time an id was processed and the id of the thing to schedule 
static List<Tuple<DateTime, int>> idsToSchedule = new List<Tuple<DateTime, int>>(); 
static int currentlyProcessing = 0; 
const int ProcessingLimit = 3; 

// An event loop that performs the scheduling 
public static void SchedulingLoop() 
{ 
    while (true) 
    { 
     lock (idsToSchedule) 
     { 
      DateTime currentTime = DateTime.Now; 
      for (int index = idsToSchedule.Count - 1; index >= 0; index--) 
      { 
       var scheduleItem = idsToSchedule[index]; 
       var timeSincePreviousRun = (currentTime - scheduleItem.Item1).TotalSeconds; 

       // start it executing in a background task 
       if (timeSincePreviousRun > 2 && currentlyProcessing < ProcessingLimit) 
       { 
        Interlocked.Increment(ref currentlyProcessing); 

        Console.WriteLine("Scheduling {0} after {1} seconds", scheduleItem.Item2, timeSincePreviousRun); 

        // Schedule this task to be processed 
        Task.Factory.StartNew(() => 
         { 
          Console.WriteLine("Executing {0}", scheduleItem.Item2); 

          // simulate the time taken to call this procedure 
          Thread.Sleep(new Random((int)DateTime.Now.Ticks).Next(0, 5000) + 500); 

          lock (idsToSchedule) 
          { 
           idsToSchedule.Add(Tuple.Create(DateTime.Now, scheduleItem.Item2)); 
          } 

          Console.WriteLine("Done Executing {0}", scheduleItem.Item2); 
          Interlocked.Decrement(ref currentlyProcessing); 
         }); 

        // remove this from the list of things to schedule 
        idsToSchedule.RemoveAt(index); 
       } 
      } 
     } 

     Thread.Sleep(100); 
    } 
}