2012-09-28 16 views
5

No estoy seguro de si esto es posible, pero si lo es, probablemente no lo esté haciendo bien. Supongamos que tengo un búfer compartido que está vinculado a muchos consumidores (ActionBlocks). Cada consumidor debe consumir datos que satisfagan un predicado utilizado para vincularlo al búfer. Por ejemplo, ActionBlock1 debe consumir los números que satisfacen x => x % 5 == 0, ActionBlock2 debe consumir solamente x => x % 5 == 1 etc.Vinculación de ActionBlocks creados dinámicamente a un BufferBlock

Esto es lo que tengo:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines) 
{ 
    var productionQueue = new BufferBlock<int>(); 

    for (int i = 0; i < NumProductionLines; i++) 
    { 
     ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", i + 1, num)); 

     productionQueue.LinkTo(productionLine, x => x % NumProductionLines == i); 
    } 

    return productionQueue; 
} 

Y entonces me llaman:

Random rnd = new Random(); 

ITargetBlock<int> temp = BuildPipeline(5); 

while (true) 
{ 
    temp.Post(rnd.Next(255)); 
} 

Sin embargo, esta No funciona. No se muestra ninguna salida en la consola. Si modifico BuildPipeline método:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines) 
{ 
    var productionQueue = new BufferBlock<int>(); 

    ActionBlock<int> productionLine1 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 1, num)); 
    ActionBlock<int> productionLine2 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 2, num)); 
    ActionBlock<int> productionLine3 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 3, num)); 
    ActionBlock<int> productionLine4 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 4, num)); 
    ActionBlock<int> productionLine5 = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", 5, num)); 

    productionQueue.LinkTo(productionLine1, x => x % 5 == 0); 
    productionQueue.LinkTo(productionLine2, x => x % 5 == 1); 
    productionQueue.LinkTo(productionLine3, x => x % 5 == 2); 
    productionQueue.LinkTo(productionLine4, x => x % 5 == 3); 
    productionQueue.LinkTo(productionLine5, x => x % 5 == 4); 

    return productionQueue; 
} 

el código hace lo que se espera que haga.

¿Alguien puede arrojar luz sobre por qué dinámicamente crear y vincular bloques de acción no funciona?

P.S. Si introduzco el código inmediatamente después de ITargetBlock<int> temp = BuildPipeline(5);, la temperatura muestra que hay 5 objetivos vinculados al búfer. Y Id de cada objetivo es diferente.

Gracias de antemano

EDIT: Añadido cambios sugeridos por svick pero todavía no es bueno:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines) 
{ 
    var productionQueue = new BufferBlock<int>(); 
    var opt = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }; 

    for (int i = 0; i < NumProductionLines; i++) 
    { 
     ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", i + 1, num)); 

     int j = i; 
     productionQueue.LinkTo(productionLine, x => x % NumProductionLines == j); 
    } 

    ActionBlock<int> discardedLine = new ActionBlock<int>(num => Console.WriteLine("Discarded: {0}", num)); 
    productionQueue.LinkTo(discardedLine); 

    return productionQueue; 
} 

Ahora sólo procesa los datos segunda línea de producción (el que está satisfaciendo x 5% == 1 predicado). Y los datos no satisface el predicado, es decir consigo los números que terminan en 9 y 7.

EDIT: el código de trabajo sería algo como lo siguiente:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines) 
{ 
    var productionQueue = new BufferBlock<int>(); 
    var opt = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }; 

    for (int i = 0; i < NumProductionLines; i++) 
    { 
     int j = i; 
     ActionBlock<int> productionLine = new ActionBlock<int>(num => Console.WriteLine("Processed by line {0}: {1}", j + 1, num)); 

     productionQueue.LinkTo(productionLine, x => x % NumProductionLines == j); 
    } 

    productionQueue.LinkTo(DataflowBlock.NullTarget<int>()); 

    return productionQueue; 
} 

Respuesta

5

El problema es que en su primera versión, estás usando el mismo predicado para cada bloque objetivo. En otras palabras, el predicado no depende de i.

Pero incluso si lo hiciera, su código no funcionaría, porque la variable i se comparte entre los predicados, por lo que todos usarán el último valor. La solución para eso es copiar i en una variable local y usar eso en el predicado.

El código podría tener este aspecto:

private static ITargetBlock<int> BuildPipeline(int NumProductionLines) 
{ 
    var productionQueue = new BufferBlock<int>(); 

    for (int i = 0; i < NumProductionLines; i++) 
    { 
     int iCopy = i; 

     ActionBlock<int> productionLine = new ActionBlock<int>(
      num => Console.WriteLine("Processed by line {0}: {1}", iCopy + 1, num)); 

     productionQueue.LinkTo(
      productionLine, x => x % NumProductionLines == iCopy); 
    } 

    return productionQueue; 
} 

Si estás preguntando ¿por qué no el proceso de código de al menos los números de x % 5 == 1, eso es porque el generador aleatorio probablemente generará un número que no lo hace coincide con ese predicado, por lo que ninguno de los ActionBlock s lo aceptará. Debido a eso, el número permanecerá en la cola de salida del bloque fuente y otros números no podrán pasar.

Si, en su código real, puede ocurrir una situación similar y desea descartar todos los números que no se ajustan a ninguno de los predicados, puede vincular el bloque de origen a a block that does nothing y aceptar todo, después de vincularlo a todos sus bloques útiles:

productionQueue.LinkTo(DataflowBlock.NullTarget<int>()); 
+0

gracias, copiando i a la variable local lo resolvió. – Dimitri

+0

@Dimitri Como probablemente habrás notado, debes usar la copia en el bloque lambda también. Arreglé el código en mi respuesta. – svick

+0

Sí, reemplacé todas las apariciones de i, gracias. Además, mi código contenía errores en el bucle for: en lugar del predicado codificado, depende de la variable. – Dimitri

Cuestiones relacionadas