2012-07-04 5 views
7

Necesito generar N hilos de consumo, que procesan el mismo InputStream simultáneamente, por ejemplo, transformarlo de alguna manera, calcular suma de comprobación o firma digital, etc. Estos consumidores no dependen el uno del otro y todos ellos están utilizando bibliotecas de terceros, que aceptan InputStream como fuente de datos.Procesamiento concurrente de InputStream individual con consumidores independientes

Entonces, ¿qué puedo hacer es - crear alguna implementación de InputStream, que se

  • lectura fragmento de datos de "padre" corriente
  • consumidores desbloquear
  • espera hasta que todos los consumidores leer todo el trozo
  • leído siguiente parte

al tiempo que busca sencilla, puede elevarse varios problemas como livelo ck cuando muere un determinado consumidor, implementa todos los métodos de InputStream, controla el tenedor/únete a los consumidores usando barreras/pestillos, etc.

Un amigo me dijo que es media hora para implementarlo, hizo mi noche.

Prefiero usar algo lo suficientemente maduro (google no vino con resultados así, mi google-fu no es lo suficientemente bueno?) O no molestar y copiar toda la secuencia de "fuente" en un archivo temporal y usarlo como fuente de datos. La última solución parece ser más confiable, pero puede terminar en la creación de archivos de gigabyte (al procesar audio de transmisión, por ejemplo).

+0

se puede escribir los datos en un archivo y desovar N FileInputStreams? –

+0

@JonLin Como dijo hacia el final de la pregunta, él puede. –

Respuesta

3

De la forma en que lo veo, debe tener al menos algún tipo de almacenamiento en memoria intermedia para que los diferentes consumidores puedan moverse a través del flujo a un ritmo diferente sin que el consumidor actualmente más lento lo empantane. Eso básicamente garantiza el peor rendimiento posible y muy poco beneficio de concurrencia.

Podría, por ejemplo, etiquetar cada fragmento con los consumidores que lo han utilizado hasta el momento y luego eliminar aquellos que están completamente agotados. Quizás esto podría lograrse con cada consumidor que tenga una referencia a cada fragmento que aún no haya utilizado, lo que permitiría que GC se ocupe automáticamente de los trozos usados. El productor puede mantener una lista de WeakReference s en los trozos, por lo que tiene un control sobre la cantidad de trozos que aún no se han utilizado y basa su regulación en eso.

También estoy pensando en tener una instancia de InputStream por subproceso, que se comunica internamente con el productor InputStream. De esta forma, tiene una solución fácil para su riesgo de bloqueo directo: try ... finally { is.close(); } - el consumidor moribundo cierra su propia corriente de entrada. Esto se comunica al productor.

Tengo algunas ideas con el uso de ArrayBlockingQueue por consumidor. Habría algunas dificultades para garantizar que todos los consumidores reciban una alimentación adecuada, sin hacer que el productor bloquee o esté ocupado: espere.

+0

No diría que es muy poco beneficio - que tiene 5 consumidores que trabajan para 1 secons y un consumidor trabajando durante 2 segundos, la invocación simultánea dará a 2 segundos, mientras secuencial dará a 7 segundos. ¿O me estoy perdiendo algo aquí? Con el etiquetado de fragmentos y búferes voy a golpear el consumo de memoria, lo que me gustaría evitar. – jdevelop

+0

Sí, lo que dices es ineludible. Sin embargo, si los consumidores tienen un promedio equilibrado, pero su rendimiento es muy variable, perderá la oportunidad de concurrir si siempre espera a cada consumidor que actualmente está rezagado. El almacenamiento en búfer ayudaría allí. Y si introduce el equilibrio de prioridad de subprocesos, en realidad podría lograr esa situación. –

0

¿Ha considerado utilizar corrientes de tubería? Su productor puede tener uno o más PipedOuputStream en el que arroja lo que lee del archivo. En el otro lado de las tuberías, tiene diferentes hilos de consumo que leen en un correspondiente PipedInputstream (que es un InputStream que puede compartir con sus bibliotecas).

Su hilo de productor puede decidir a través de cuál de los datos de los tubos debe enviarse, por medio de esto, proporcionar datos que se procesarán para una lectura de hilo de consumidor dada en el otro lado del tubo.

Si necesita obtener datos de vuelta de los hilos de consumo, a continuación, puede crear otra tubería, en la dirección opuesta, para enviar los datos de nuevo a usted.

+1

Un 'PipedOutputStream' bloqueará al productor tan pronto como un consumidor quede rezagado, muriendo de hambre a todos los demás consumidores. –

0

Puede probar algunos implementación Java Messaging Service (JMS) como Apache ActiveMQ.

En su caso necesitaría crear un llamado Tema (vea Topics vs. Queues). El productor crea un tema y se publica a N consumidores, que pueden ejecutarse al mismo tiempo, y cada consumidor recibe exactamente los mismos datos.

Dado que desea utilizar InputStream s hay un capítulo sobre cómo send messages are streams.

supongo, por lo general, los productores y los consumidores estarían procesos separados, probablemente se ejecutan en diferentes equipos de la red. Aunque creo que puedes configurarlo para que se ejecute completamente en una sola JVM. Esto dependería de la implementación de JMS. Estos también son bastante famosos: HornetQ by JBoss, RabbitMQ, y un montón de otros.