De la documentación de la Post
method en la (el énfasis es mío) DataflowBlock<T>
class:
Este método devolverá una vez que el bloque de destino ha decidido aceptar o rechazar el artículo,
Este significa que el objetivo puede elegir rechazar el bloque (que es el comportamiento que estás viendo).
Más adelante, se indica:
Para bloques de destino que son compatibles posponer mensajes ofrecidos, o para los bloques que pueden hacer más procesamiento en su aplicación Post, considerar el uso de SendAsync, que devolverá inmediatamente y permitirá a la objetivo posponer el mensaje publicado y luego consumirlo después de que SendAsync regrese.
Esto significa que usted puede tiene mejores resultados (en función del bloque de destino) en que su mensaje puede ser aplazada, pero sigue siendo procesados, en lugar de rechazada de plano.
que imaginar que las BoundedCapacity
property ajustes tanto en el BufferBlock<T>
y los tres ActionBlock<TInput>
casos tienen algo que ver con lo que está viendo:
Su máximo del búfer en el BufferBlock<T>
es 10000; una vez que coloque 10.000 elementos en la cola, rechazará el resto (consulte la segunda cita anterior), ya que no puede procesarlos (SendAsync
tampoco funcionará aquí, ya que no puede almacenar temporalmente el mensaje que se pospondrá).
Su búfer máximo en las instancias ActionBlock<TInput>
es 1, y tiene tres de ellas.
10.000 + (1 * 3) = 10.000 + 3 = 10.003
Para evitar esto, es necesario hacer algunas cosas.
En primer lugar, debe establecer un valor más razonable para MaxDegreeOfParallelism
propertyExecutionDataFlowBlockOptions
al crear las instancias ActionBlock<TInput>
.
De forma predeterminada, MaxDegreeOfParallelism
para ActionBlock<TInput>
está establecido en 1; esto garantiza que las llamadas se serializarán y no tendrá que preocuparse por la seguridad de las secuencias. Si quiere que el ActionBlock<T>
se preocupe por la seguridad del hilo, mantenga esta configuración.
Si el ActionBlock<TInput>
es seguro para subprocesos, entonces no hay razón para estrangular, y vosotros ponga MaxDegreeOfParallelism
a DataflowBlockOptions.Unbounded
.
probable que si usted está accediendo a algún tipo de recurso compartido en el ActionBlock<TInput>
que se puede acceder simultáneamente sobre una base limitada, entonces usted está probablemente hacer lo incorrecto.
Si tiene algún tipo de recurso compartido, entonces es probable que deba ejecutarlo a través de otro bloque y establecer el MaxDegreeOfParallelism
en que.
En segundo lugar, si le preocupa el rendimiento y está de acuerdo con los elementos caídos, entonces debe establecer la propiedad BoundedCapacity
.
También tenga en cuenta que indica "si los consumidores son lentos, duerma un rato"; no hay razón para hacer esto si conecta los bloques correctamente, solo debe permitir que los datos fluyan y colocar las restricciones solo donde las necesite. Su productor no debe ser responsable de estrangular al consumidor, deje que el consumidor sea responsable de la regulación.
Por último, su código no parece que necesite implementar las interfaces de bloques de flujo de datos usted mismo. Se podría construirlo así:
// The source, your read lines will be posted here.
var delimitedFileBlock = new BufferBlock<string>();
// The Action for the action blocks.
Action<string> action =
s => { /* Do something with the string here. */ };
// Create the action blocks, assuming that
// action is thread-safe, no need to have it process one at a time
// or to bound the capacity.
var solaceActionBlock1 = new ActionBlock<string>(action,
new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
});
var solaceActionBlock2 = new ActionBlock<string>(action,
new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
});
var solaceActionBlock3 = new ActionBlock<string>(action,
new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
});
// Link everything.
delimitedFileBlock.LinkTo(solaceTargetBlock1);
delimitedFileBlock.LinkTo(solaceTargetBlock2);
delimitedFileBlock.LinkTo(solaceTargetBlock3);
// Now read the file, and post to the BufferBlock<T>:
// Note: This is pseudo-code.
while (!eof)
{
// Read the row.
string row = ...;
delimitedFileBlock.Post(read);
}
También tenga en cuenta que el tener tres ActionBlock<TInput>
casos es uncessary, a menos que necesite para filtrar la salida a diferentes acciones (que no están haciendo aquí), lo que lo anterior realmente se reduce a (asumiendo que su acción no es seguro para subprocesos, por lo que vamos a aumentar MaxDegreeOfParallelism
-Unbounded
de todos modos):
// The source, your read lines will be posted here.
var delimitedFileBlock = new BufferBlock<string>();
// The Action for the action blocks.
Action<string> action =
s => { /* Do something with the string here. */ };
// Create the action blocks, assuming that
// action is thread-safe, no need to have it process one at a time
// or to bound the capacity.
var solaceActionBlock1 = new ActionBlock<string>(action,
new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
});
// Link everything.
delimitedFileBlock.LinkTo(solaceTargetBlock);
// Now read the file, and post to the BufferBlock<T>:
// Note: This is pseudo-code.
while (!eof)
{
// Read the row.
string row = ...;
delimitedFileBlock.Post(read);
}
todo esto depende de cómo es exactamente que ha implementado las interfaces de los bloques. Pero, a menos que esté haciendo algo más complicado, no tiene que (y probablemente no debería) implementar las interfaces usted mismo. Simplemente crea una configuración de bloque que necesites y eso es todo. – svick