2012-09-20 16 views
5

que tienen un BufferBlock a la que enviar mensajes:TPL flujos de datos múltiples linkto consumidores no trabajan

public class DelimitedFileBlock : ISourceBlock<string> 
{ 
    private ISourceBlock<string> _source; 
    _source = new BufferBlock<string>(new DataflowBlockOptions() { BoundedCapacity = 10000 }); 

    //Read a file 
    While(!eof) 
     row = read one row 
    //if consumers are slow, then sleep for a while 
    while(!(_source as BufferBlock<string>).Post<string>(row)) 
    { 
     Thread.Sleep(5000); 
    } 
} 

Este es un archivo de 5 GB con 24 millones de filas.

ahora tengo un bloque objetivo que se utiliza una ActionBlock:

public class SolaceTargetBlock : ITargetBlock<string> 
     private ActionBlock<IBasicDataContract> _publishToSolaceBlock; 

     public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, string messageValue, ISourceBlock<string> source, bool consumeToAccept) 
    { 
     //post to another block to publish 
     bool success = _publishToSolaceBlock.Post(messageValue); 

Ahora en una aplicación de consola, que especifique:

SolaceTargetBlock solaceTargetBlock1 = new SolaceTargetBlock("someparam", 
      new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 }); 
SolaceTargetBlock solaceTargetBlock2 = new SolaceTargetBlock("someparam", 
      new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 }); 
SolaceTargetBlock solaceTargetBlock3 = new SolaceTargetBlock("someparam", 
      new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 10, BoundedCapacity = 1 }); 

DelimitedFileBlock delimitedFileBlock = new DelimitedFileBlock(csvFileInfo); 

he guardado capacidad limitada como 1 para probar solamente.

y ahora vincular estos tres consumidores a mi fuente utilizando LinkTo:

delimitedFileBlock.LinkTo(solaceTargetBlock1);  
delimitedFileBlock.LinkTo(solaceTargetBlock2);  
delimitedFileBlock.LinkTo(solaceTargetBlock3);  

Esto va a Thread.Sleep (5000) declaración después de 10003 filas y el Post en el bucle while siempre devuelve falso.

Esperaba que como tengo LinkTo, entonces solaceTargetBlocks cuando esté listo podrá seleccionar los próximos mensajes, pero LinkTo no borrará el BufferBlock. Entonces, ¿cómo puedo equilibrar la carga entre varios consumidores? ¿Tengo que recibir y escribir una lógica simple de equilibrio de carga para distribuir entre los consumidores?

+1

todo esto depende de cómo es exactamente que ha implementado las interfaces de los bloques. Pero, a menos que esté haciendo algo más complicado, no tiene que (y probablemente no debería) implementar las interfaces usted mismo. Simplemente crea una configuración de bloque que necesites y eso es todo. – svick

Respuesta

8

De la documentación de la Post method en la (el énfasis es mío) DataflowBlock<T> class:

Este método devolverá una vez que el bloque de destino ha decidido aceptar o rechazar el artículo,

Este significa que el objetivo puede elegir rechazar el bloque (que es el comportamiento que estás viendo).

Más adelante, se indica:

Para bloques de destino que son compatibles posponer mensajes ofrecidos, o para los bloques que pueden hacer más procesamiento en su aplicación Post, considerar el uso de SendAsync, que devolverá inmediatamente y permitirá a la objetivo posponer el mensaje publicado y luego consumirlo después de que SendAsync regrese.

Esto significa que usted puede tiene mejores resultados (en función del bloque de destino) en que su mensaje puede ser aplazada, pero sigue siendo procesados, en lugar de rechazada de plano.

que imaginar que las BoundedCapacity property ajustes tanto en el BufferBlock<T> y los tres ActionBlock<TInput> casos tienen algo que ver con lo que está viendo:

  • Su máximo del búfer en el BufferBlock<T> es 10000; una vez que coloque 10.000 elementos en la cola, rechazará el resto (consulte la segunda cita anterior), ya que no puede procesarlos (SendAsync tampoco funcionará aquí, ya que no puede almacenar temporalmente el mensaje que se pospondrá).

  • Su búfer máximo en las instancias ActionBlock<TInput> es 1, y tiene tres de ellas.

10.000 + (1 * 3) = 10.000 + 3 = 10.003

Para evitar esto, es necesario hacer algunas cosas.

En primer lugar, debe establecer un valor más razonable para MaxDegreeOfParallelism propertyExecutionDataFlowBlockOptions al crear las instancias ActionBlock<TInput>.

De forma predeterminada, MaxDegreeOfParallelism para ActionBlock<TInput> está establecido en 1; esto garantiza que las llamadas se serializarán y no tendrá que preocuparse por la seguridad de las secuencias. Si quiere que el ActionBlock<T> se preocupe por la seguridad del hilo, mantenga esta configuración.

Si el ActionBlock<TInput> es seguro para subprocesos, entonces no hay razón para estrangular, y vosotros ponga MaxDegreeOfParallelism a DataflowBlockOptions.Unbounded.

probable que si usted está accediendo a algún tipo de recurso compartido en el ActionBlock<TInput> que se puede acceder simultáneamente sobre una base limitada, entonces usted está probablemente hacer lo incorrecto.

Si tiene algún tipo de recurso compartido, entonces es probable que deba ejecutarlo a través de otro bloque y establecer el MaxDegreeOfParallelism en que.

En segundo lugar, si le preocupa el rendimiento y está de acuerdo con los elementos caídos, entonces debe establecer la propiedad BoundedCapacity.

También tenga en cuenta que indica "si los consumidores son lentos, duerma un rato"; no hay razón para hacer esto si conecta los bloques correctamente, solo debe permitir que los datos fluyan y colocar las restricciones solo donde las necesite. Su productor no debe ser responsable de estrangular al consumidor, deje que el consumidor sea responsable de la regulación.

Por último, su código no parece que necesite implementar las interfaces de bloques de flujo de datos usted mismo. Se podría construirlo así:

// The source, your read lines will be posted here. 
var delimitedFileBlock = new BufferBlock<string>(); 

// The Action for the action blocks. 
Action<string> action = 
    s => { /* Do something with the string here. */ }; 

// Create the action blocks, assuming that 
// action is thread-safe, no need to have it process one at a time 
// or to bound the capacity. 
var solaceActionBlock1 = new ActionBlock<string>(action, 
    new ExecutionDataflowBlockOptions { 
     MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded, 
    }); 
var solaceActionBlock2 = new ActionBlock<string>(action, 
    new ExecutionDataflowBlockOptions { 
     MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded, 
    }); 
var solaceActionBlock3 = new ActionBlock<string>(action, 
    new ExecutionDataflowBlockOptions { 
     MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded, 
    }); 

// Link everything. 
delimitedFileBlock.LinkTo(solaceTargetBlock1); 
delimitedFileBlock.LinkTo(solaceTargetBlock2); 
delimitedFileBlock.LinkTo(solaceTargetBlock3); 

// Now read the file, and post to the BufferBlock<T>: 
// Note: This is pseudo-code. 
while (!eof) 
{ 
    // Read the row. 
    string row = ...; 

    delimitedFileBlock.Post(read); 
} 

También tenga en cuenta que el tener tres ActionBlock<TInput> casos es uncessary, a menos que necesite para filtrar la salida a diferentes acciones (que no están haciendo aquí), lo que lo anterior realmente se reduce a (asumiendo que su acción no es seguro para subprocesos, por lo que vamos a aumentar MaxDegreeOfParallelism-Unbounded de todos modos):

// The source, your read lines will be posted here. 
var delimitedFileBlock = new BufferBlock<string>(); 

// The Action for the action blocks. 
Action<string> action = 
    s => { /* Do something with the string here. */ }; 

// Create the action blocks, assuming that 
// action is thread-safe, no need to have it process one at a time 
// or to bound the capacity. 
var solaceActionBlock1 = new ActionBlock<string>(action, 
    new ExecutionDataflowBlockOptions { 
     MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded, 
    }); 

// Link everything. 
delimitedFileBlock.LinkTo(solaceTargetBlock); 

// Now read the file, and post to the BufferBlock<T>: 
// Note: This is pseudo-code. 
while (!eof) 
{ 
    // Read the row. 
    string row = ...; 

    delimitedFileBlock.Post(read); 
} 
Cuestiones relacionadas