EDIT: Resulta que estaba muy equivocado. TransformBlock
hace devuelve los artículos en el mismo orden en el que entraron, incluso si está configurado para el paralelismo. Debido a eso, el código en mi respuesta original es completamente inútil y en su lugar puede usarse TransformBlock
normal.
Respuesta original:
Por lo que yo sé sólo un constructo paralelismo en .NET admite la devolución de artículos procesados en el orden en que se produjo en: PLINQ con AsOrdered()
. Pero me parece que PLINQ no encaja bien con lo que quieres.
TPL Dataflow, por otro lado, encaja bien, creo, pero no tiene un bloque que admita el paralelismo y devuelva elementos en orden al mismo tiempo (TransformBlock
admite ambos, pero no en el Mismo tiempo). Afortunadamente, los bloques de Dataflow se diseñaron teniendo en cuenta la capacidad de compilación, por lo que podemos construir nuestro propio bloque que lo haga.
Pero primero, tenemos que averiguar cómo ordenar los resultados. Usar un diccionario simultáneo, como sugirió, junto con algún mecanismo de sincronización, ciertamente funcionaría. Pero creo que hay una solución más simple: use una cola de Task
s. En la tarea de salida, dequeue Task
, espere a que se complete (de forma asíncrona) y cuando lo haga, envíe su resultado.Todavía necesitamos alguna sincronización para el caso cuando la cola está vacía, pero podemos obtenerla de forma gratuita si elegimos qué cola usar inteligentemente.
Por lo tanto, la idea general es la siguiente: lo que estamos escribiendo será un IPropagatorBlock
, con algunas entradas y algunos resultados. La forma más fácil de crear un IPropagatorBlock
personalizado es crear un bloque que procese la entrada, otro bloque que produzca los resultados y tratarlos como uno usando DataflowBlock.Encapsulate()
.
El bloque de entrada deberá procesar los elementos entrantes en el orden correcto, por lo que no habrá paralelización allí. Creará un nuevo Task
(en realidad, un TaskCompletionSource
, para que podamos establecer el resultado del Task
más adelante), lo agregue a la cola y luego envíe el elemento para su procesamiento, junto con alguna forma de establecer el resultado del Task
correcto . Debido a que no necesitamos vincular este bloque a nada, podemos usar un ActionBlock
.
El bloque de salida tendrá que tomar Task
s de la cola, esperarlos de forma asincrónica y luego enviarlos. Pero como todos los bloques tienen una cola insertada en ellos, y los bloques que llevan delegados tienen espera asíncrona incorporada, esto será muy simple: new TransformBlock<Task<TOutput>, TOutput>(t => t)
. Este bloque funcionará tanto como la cola como el bloque de salida. Debido a esto, no tenemos que lidiar con ninguna sincronización.
La última pieza del rompecabezas es en realidad el procesamiento de los elementos en paralelo. Para esto, podemos usar otro ActionBlock
, esta vez con MaxDegreeOfParallelism
conjunto. Tomará la entrada, la procesará y establecerá el resultado del Task
correcto en la cola.
ponen juntos, podría tener este aspecto:
public static IPropagatorBlock<TInput, TOutput>
CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
Func<TInput, TOutput> transform)
{
var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);
var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
tuple => tuple.Item2(transform(tuple.Item1)),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var enqueuer = new ActionBlock<TInput>(
async item =>
{
var tcs = new TaskCompletionSource<TOutput>();
await processor.SendAsync(
new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
await queue.SendAsync(tcs.Task);
});
enqueuer.Completion.ContinueWith(
_ =>
{
queue.Complete();
processor.Complete();
});
return DataflowBlock.Encapsulate(enqueuer, queue);
}
Después de tanto hablar, eso es bastante pequeña cantidad de código, creo.
Parece que le importa mucho el rendimiento, por lo que es posible que necesite ajustar este código. Por ejemplo, podría tener sentido establecer MaxDegreeOfParallelism
del bloque processor
en algo como Environment.ProcessorCount
, para evitar la sobresuscripción. Además, si la latencia es más importante que el rendimiento para usted, podría tener sentido establecer MaxMessagesPerTask
del mismo bloque en 1 (u otro número pequeño) para que, cuando finalice el procesamiento de un elemento, se envíe a la salida inmediatamente.
Además, si desea acelerar los elementos entrantes, puede establecer BoundedCapacity
de enqueuer
.
Wow un montón de golosinas que primero me gustaría digerir y probar. Muchas gracias por aquellos, por lo menos merecía un voto positivo ;-) Déjenme jugar con esas ideas y vuelvo. Tareas de cola tiene mucho sentido y me pregunto por qué no lo entendí antes. –
ok Paso un tiempo revisando y leyendo en TPL Dataflow, aquí un par de preguntas para comprender completamente la solución que propone: (1) ¿por qué sugiere un IPropagatorBlock personalizado e IDataflowBlock.Encapsulate() dado que ya existe un Transformblock? (2) No veo cómo planean realmente vincular los bloques. Hablas primero de ActionBlocks y luego de TransformBlocks. Según lo que leí, ¿el ActionBlock no sería el "punto final" de toda la arquitectura? –
1. Eso se explica en el segundo párrafo: 'TransformBlock' no puede procesar los elementos en paralelo y devolverlos en orden al mismo tiempo. Puede hacer cualquiera de ellos, pero no ambos. – svick