10

Ejecuto un modelo típico de productor/consumidor en diferentes tareas.¿Es este un trabajo para TPL Dataflow?

Tarea 1: Lee lotes de bytes [] de archivos binarios y inicia una nueva tarea para cada colección de matrices de bytes. (la operación se realiza por lotes para fines de administración de memoria).

Tarea 2-n: Esas son tareas de trabajo y cada una funciona en la colección pasada (de Tareas1) de matrices de bytes y deserializa matrices de bytes, las ordena según ciertos criterios y luego almacena una colección de objetos resultantes (cada matriz de bytes se deserializa en dicho objeto) en un diccionario concurrente.

Tarea (n + 1) Elegí un diccionario concurrente porque el trabajo de esta tarea es fusionar aquellas colecciones que se almacenan en el diccionario concurrente en el mismo orden que en el origen de Task1. Lo logro pasando un collectionID (es de tipo int e incrementado para cada nueva colección dentro de Task1) desde Task1 hasta esta tarea. Esta tarea básicamente verifica si el siguiente ID de colección esperado ya está almacenado en el diccionario concurrente y, en caso afirmativo, lo quita, lo agrega a una Cola final y busca la siguiente colección en el diccionario simultáneo.

Ahora, por lo que he leído y los videos que vi me parece que TPL Dataflow puede ser el candidato perfecto para dicho modelo de productor/consumidor. Simplemente no parezco ser capaz de diseñar un diseño y así comenzar porque nunca he trabajado con TPL Dataflow. En términos de rendimiento y latencia, ¿esta biblioteca está a la altura de la tarea? Actualmente proceso 2,5 millones de matrices de bytes y, por lo tanto, objetos por segundo en las colecciones resultantes. ¿Puede TPL Dataflow ayudar a simplificar? Estoy especialmente interesado en la respuesta a la siguiente pregunta: ¿TPL Dataflow puede conservar el orden de los lotes de recolección de la Tarea 1 cuando genera tareas de trabajo y las vuelve a fusionar una vez que las tareas de los trabajadores han hecho su trabajo? ¿Optimiza las cosas? Después de haber perfilado toda la estructura, creo que hay bastante tiempo perdido debido al giro y demasiadas colecciones concurrentes involucradas.

¿Alguna idea, pensamiento?

Respuesta

12

EDIT: Resulta que estaba muy equivocado. TransformBlockhace devuelve los artículos en el mismo orden en el que entraron, incluso si está configurado para el paralelismo. Debido a eso, el código en mi respuesta original es completamente inútil y en su lugar puede usarse TransformBlock normal.


Respuesta original:

Por lo que yo sé sólo un constructo paralelismo en .NET admite la devolución de artículos procesados ​​en el orden en que se produjo en: PLINQ con AsOrdered(). Pero me parece que PLINQ no encaja bien con lo que quieres.

TPL Dataflow, por otro lado, encaja bien, creo, pero no tiene un bloque que admita el paralelismo y devuelva elementos en orden al mismo tiempo (TransformBlock admite ambos, pero no en el Mismo tiempo). Afortunadamente, los bloques de Dataflow se diseñaron teniendo en cuenta la capacidad de compilación, por lo que podemos construir nuestro propio bloque que lo haga.

Pero primero, tenemos que averiguar cómo ordenar los resultados. Usar un diccionario simultáneo, como sugirió, junto con algún mecanismo de sincronización, ciertamente funcionaría. Pero creo que hay una solución más simple: use una cola de Task s. En la tarea de salida, dequeue Task, espere a que se complete (de forma asíncrona) y cuando lo haga, envíe su resultado.Todavía necesitamos alguna sincronización para el caso cuando la cola está vacía, pero podemos obtenerla de forma gratuita si elegimos qué cola usar inteligentemente.

Por lo tanto, la idea general es la siguiente: lo que estamos escribiendo será un IPropagatorBlock, con algunas entradas y algunos resultados. La forma más fácil de crear un IPropagatorBlock personalizado es crear un bloque que procese la entrada, otro bloque que produzca los resultados y tratarlos como uno usando DataflowBlock.Encapsulate().

El bloque de entrada deberá procesar los elementos entrantes en el orden correcto, por lo que no habrá paralelización allí. Creará un nuevo Task (en realidad, un TaskCompletionSource, para que podamos establecer el resultado del Task más adelante), lo agregue a la cola y luego envíe el elemento para su procesamiento, junto con alguna forma de establecer el resultado del Task correcto . Debido a que no necesitamos vincular este bloque a nada, podemos usar un ActionBlock.

El bloque de salida tendrá que tomar Task s de la cola, esperarlos de forma asincrónica y luego enviarlos. Pero como todos los bloques tienen una cola insertada en ellos, y los bloques que llevan delegados tienen espera asíncrona incorporada, esto será muy simple: new TransformBlock<Task<TOutput>, TOutput>(t => t). Este bloque funcionará tanto como la cola como el bloque de salida. Debido a esto, no tenemos que lidiar con ninguna sincronización.

La última pieza del rompecabezas es en realidad el procesamiento de los elementos en paralelo. Para esto, podemos usar otro ActionBlock, esta vez con MaxDegreeOfParallelism conjunto. Tomará la entrada, la procesará y establecerá el resultado del Task correcto en la cola.

ponen juntos, podría tener este aspecto:

public static IPropagatorBlock<TInput, TOutput> 
    CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform) 
{ 
    var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t); 

    var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
     tuple => tuple.Item2(transform(tuple.Item1)), 
     new ExecutionDataflowBlockOptions 
     { 
      MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded 
     }); 

    var enqueuer = new ActionBlock<TInput>(
     async item => 
     { 
      var tcs = new TaskCompletionSource<TOutput>(); 
      await processor.SendAsync(
       new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult)); 
      await queue.SendAsync(tcs.Task); 
     }); 

    enqueuer.Completion.ContinueWith(
     _ => 
     { 
      queue.Complete(); 
      processor.Complete(); 
     }); 

    return DataflowBlock.Encapsulate(enqueuer, queue); 
} 

Después de tanto hablar, eso es bastante pequeña cantidad de código, creo.

Parece que le importa mucho el rendimiento, por lo que es posible que necesite ajustar este código. Por ejemplo, podría tener sentido establecer MaxDegreeOfParallelism del bloque processor en algo como Environment.ProcessorCount, para evitar la sobresuscripción. Además, si la latencia es más importante que el rendimiento para usted, podría tener sentido establecer MaxMessagesPerTask del mismo bloque en 1 (u otro número pequeño) para que, cuando finalice el procesamiento de un elemento, se envíe a la salida inmediatamente.

Además, si desea acelerar los elementos entrantes, puede establecer BoundedCapacity de enqueuer.

+0

Wow un montón de golosinas que primero me gustaría digerir y probar. Muchas gracias por aquellos, por lo menos merecía un voto positivo ;-) Déjenme jugar con esas ideas y vuelvo. Tareas de cola tiene mucho sentido y me pregunto por qué no lo entendí antes. –

+0

ok Paso un tiempo revisando y leyendo en TPL Dataflow, aquí un par de preguntas para comprender completamente la solución que propone: (1) ¿por qué sugiere un IPropagatorBlock personalizado e IDataflowBlock.Encapsulate() dado que ya existe un Transformblock ? (2) No veo cómo planean realmente vincular los bloques. Hablas primero de ActionBlocks y luego de TransformBlocks. Según lo que leí, ¿el ActionBlock no sería el "punto final" de toda la arquitectura? –

+1

1. Eso se explica en el segundo párrafo: 'TransformBlock' no puede procesar los elementos en paralelo y devolverlos en orden al mismo tiempo. Puede hacer cualquiera de ellos, pero no ambos. – svick