2012-01-19 28 views
15

Tengo un programa jar autoejecutable que se basa en gran medida en Spring Integration. El problema que tengo es que el programa finaliza antes de que los otros granos de primavera hayan terminado por completo.Esperando a que terminen todos los hilos en Spring Integration

A continuación se muestra una versión reducida del código que estoy usando, puedo suministrar más código/configuración si es necesario. El punto de entrada es un método main(), que Bootstraps primavera y comienza el proceso de importación:

public static void main(String[] args) { 
    ctx = new ClassPathXmlApplicationContext("flow.xml"); 
    DataImporter importer = (DataImporter)ctx.getBean("MyImporterBean"); 
    try { 
     importer.startImport(); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } finally { 
     ctx.close(); 
    } 
} 

El DataImporter contiene un bucle simple que dispara mensajes a una pasarela de integración de primavera. Esto ofrece un enfoque de "empuje" activo para el flujo, en lugar del enfoque común de sondeo de datos. Aquí es donde mi problema viene en:

public void startImport() throws Exception { 
    for (Item item : items) { 
     gatewayBean.publish(item); 
     Thread.sleep(200); // Yield period 
    } 
} 

Para completar, el XML flujo es como la siguiente:

<gateway default-request-channel="inChannel" service-interface="GatewayBean" /> 

<splitter input-channel="inChannel" output-channel="splitChannel" /> 

<payload-type-router input-channel="splitChannel"> 
    <mapping type="Item" channel="itemChannel" /> 
    <mapping type="SomeOtherItem" channel="anotherChannel" /> 
</payload-type-router> 

<outbound-channel-adapter channel="itemChannel" ref="DAOBean" method="persist" /> 

el flujo se inicia y se procesa elementos con eficacia, pero una vez que el bucle startImport() termina el el hilo principal finaliza y deshace inmediatamente todos los hilos de integración de muelles. Esto da como resultado una condición de carrera, los últimos (n) elementos no se procesan completamente cuando finaliza el programa.

Tengo una idea de mantener un recuento de referencia de los elementos que estoy procesando, pero esto está resultando bastante complicado, ya que el flujo a menudo divide/dirige los mensajes a activadores de servicio múltiple, lo que significa que es difícil determinar si cada artículo ha "terminado".

Lo que creo que necesito es una forma de comprobar que no se están ejecutando Spring beans, o de señalar que todos los elementos enviados a la puerta de enlace se han procesado por completo antes de finalizar.

Mi pregunta es, ¿cómo podría yo ir haciendo cualquiera de ellos, o hay una mejor aproximación a mi problema que no he pensado?

Respuesta

9

No está usando un patrón de solicitud-respuesta aquí.

outbound-channel-adapter es una acción de disparo y olvido, si desea esperar la respuesta, debe usar una puerta de enlace saliente que esperará la respuesta y conectará la respuesta a la puerta de enlace original, luego en java sendAndReceive no solo publicar

+1

Este fue de hecho mi problema. Modifiqué mi flujo para usar en lugar de , y conecté el resultado a una puerta de enlace asíncrona. Ahora puedo esperar para completar usando Future.isDone(). ¡Gracias por tu ayuda! – seanhodges

3

Si usted puede conseguir un Item para determinar, si todavía es necesaria o no (processingFinished() o algo similar ejecutado en el back-end-etapas), se puede registrar todos los Item s a una autoridad central, que realiza un seguimiento del número de Item s sin terminar y determina de forma efectiva una condición de terminación.

Si este enfoque es factible, incluso podría pensar en empaquetar los artículos en FutureTask -objetos o hacer uso de conceptos similares en java.util.concurrent.

Edición: Segunda idea:

¿Usted ha pensado en hacer los canales más inteligente? Un emisor cierra el canal una vez que no envía más datos. En este escenario, los beans-worker no tienen que ser hilos deamon pero pueden determinar su criterio de terminación basado en un canal de entrada cerrado y vacío.

+0

Algunas buenas ideas allí.Sin embargo, en mi proyecto, la clase "Artículo" es demasiado genérica (parte de una capa de modelo) para agregar lógica de importación. Podría buscar formas de hacer algo similar con los encabezados de los mensajes. FutureTask y sus colegas estarían bien, pero estoy luchando por ver una forma de utilizar el marco de trabajo simultáneo para mi problema específico ... – seanhodges

Cuestiones relacionadas