2011-03-08 6 views
8

Tenemos una cola JMS que recibe una gran cantidad de mensajes.Procesamiento JMS efectivo

oyente tiene que guardar el mensaje en la base de datos mediante una transacción de base de datos y luego confirmar la transacción JMS.

Entonces, ¿cómo puedo hacerlo más eficazmente cuando no tengo que hacer la base de datos & JMS confirmar en cada mensaje.

Respuesta

6

La premisa detrás de la mensajería asíncrona, especialmente cuando se usa un MDB, es que cada mensaje es atómico. Es decir, el resultado del procesamiento de cualquier mensaje se supone que es independiente del resultado del procesamiento de cualquier otro mensaje. La solución ideal para su problema preservará esta atomicidad de los mensajes.

Si se va a procesar varios mensajes en la misma unidad de trabajo, entonces se perdería esta atomicidad. Por ejemplo, suponga que decidió sincronizar cada 25 mensajes. Si el 25º mensaje tenía un error, como un problema de conversión de página de códigos que impedía que se recuperara de la cola, se revertiría todo el lote de mensajes. Entonces todos serían reentrenados. El recuento de nuevas entregas para los mensajes aumentaría con cada ciclo de lectura/retroceso. Una vez que el conteo de reentrega haya excedido el umbral establecido en su servidor de aplicaciones, se descartarán o pondrán en cola los 25 mensajes, dependiendo de su configuración. Cuanto mayor sea el lote, más mensajes se verán potencialmente afectados en una situación de error porque todo el lote vive o muere en conjunto. Establezca el tamaño de lote en 100 y 100 mensajes estarán en riesgo en el caso de un solo mensaje de envenenamiento.

Una solución alternativa es permitir que para muchos hilos de procesamiento en su MDB. Con JMS puede generar muchas sesiones bajo la misma conexión. Cada sesión puede administrar su propia unidad de trabajo, por lo tanto, cada sesión puede iniciar de forma independiente una transacción XA, recibir un mensaje, actualizar la base de datos y luego comprometer la transacción. Si un mensaje es incorrecto, solo ese mensaje y la actualización de la base de datos se verán afectados.

Existen excepciones a esto. Por ejemplo, si procesa un lote grande y todos los mensajes provienen del mismo productor, es común usar algo que no sea un MDB para recuperar muchos mensajes y actualizar muchas filas en la misma unidad de trabajo. De forma similar, si los mensajes son dependientes de la secuencia, entonces el procesamiento en paralelo no es posible porque no conservaría la secuencia. Pero, de nuevo, los mensajes dependientes de la secuencia no son atómicos. Nuevamente, en este caso, un MDB no es la solución ideal.

Dependiendo de su proveedor de transporte, el número de hilos soportados sólo puede limitarse mediante el almacenamiento de memoria. WebSphere MQ, por ejemplo, puede manejar fácilmente cientos de subprocesos getter simultáneos en una cola. Compruebe la sintonización de la configuración MDB de su servidor de aplicaciones para ver cuántos hilos puede girar y luego verifique que su transporte pueda manejar la carga. Luego juegue un poco para encontrar la cantidad óptima de hilos. El rendimiento aumentará dramáticamente a medida que los hilos aumentan de uno, pero solo hasta cierto punto. Pasado ese punto, generalmente se ve una meseta, luego un declive a medida que la administración general de subprocesos compensa los aumentos de rendimiento. Donde reside el punto swe3et depende de qué tanto se cargue el intermediario de mensajería y si está más limitado por CPU, memoria, disco o red.

8

No practico en cada mensaje, lo hacen en lotes. JMS admite transacciones al igual que su DB; inicia una transacción JMS, lee N mensajes. Inicie la transacción DB, inserte N mensajes. Comprométase con JMS, comprométase con DB.

Esto, obviamente, presenta una ventana para una carrera que se produzca (caída ocurre entre las dos confirmaciones). Lo tienes ahora, pero solo para un solo mensaje. Si quiere resolver ese problema, se enfrenta a mirar transacciones XA (confirmación en dos fases) o, al menos, a algún tipo de esquema de detección duplicado. Para alguna introducción a eso, échele un vistazo a: http://activemq.apache.org/should-i-use-xa.html

+0

método de devolución de llamada "onMessage" devuelve solo un mensaje a la vez, entonces, ¿cómo puedo obtener N mensajes? – changed

+3

No usaría la interfaz MessageListener y solo haría cosas cuando recibiera un mensaje. Usted PODRÍA hacer eso (realizar un seguimiento de la cantidad de mensajes que recibió a través de una variable miembro, iniciar y confirmar transacciones, etc.) pero está ampliando su ventana de condición de carrera porque está confiando en un mensaje para desencadenar cualquier acción. Realmente no es el mejor enfoque. Es mucho mejor hacer un ciclo tradicional donde lee mensajes de la cola (bloqueo de llamadas con tiempo de espera o sin bloque) y realiza sus confirmaciones cuando tiene N mensajes o ha pasado el tiempo Y. –

+1

Lo sentimos: específicamente, esto está utilizando los métodos de interfaz de MessageConsumer receive (timeout) y receiveNoWait() en lugar de registrar su MessageListener con setMessageListener(). –

0

Aquí hay un procesador jms que tomará mensajes de una cola, los agregará a una lista y los retrotraerá a otra cola. Se puede ajustar cómo se leen los valores y se agregan en los métodos respectivos:

public class JmsBatcher<T> { 
    final Session session; 
    private final MessageConsumer consumer; 
    private final MessageProducer producer; 
    private final int batchSize; 


    public JmsBatcher(final Connection connection, 
         final String sourceQueue, 
         final String destQueue, 
         final int batchSize) throws JMSException { 
     this.batchSize = batchSize; 
     session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); 
     final Queue source = session.createQueue(sourceQueue); 
     final Queue dest = session.createQueue(destQueue); 
     consumer = session.createConsumer(source); 
     producer = session.createProducer(dest); 
    } 

    public void processBatch() { 
     final List<T> values = new ArrayList<>(); 
     try { 
      while (values.size() < batchSize) { 
       final Message message = consumer.receive(); 
       values.add(readObject(message)); 
       message.acknowledge(); 
      } 
      producer.send(createAggregate(values)); 
      session.commit(); 
     } catch (Exception e) { 
      // Log the exception 
      try { 
       session.rollback(); 
      } catch (JMSException re) { 
       // Rollback failed, so something fataly wrong. 
       throw new IllegalStateException(re); 
      } 
     } 
    } 

    private Message createAggregate(final List<T> values) throws JMSException { 
     return session.createObjectMessage((Serializable) values); 
    } 

    private T readObject(final Message message) throws JMSException { 
     return (T) ((ObjectMessage) message).getObject(); 
    } 
} 

Esto se puede iniciar en un hilo separado, y tan sólo ejecute siempre:

final JmsBatcher jmsBatcher = 
    new JmsBatcher(connection, "single", "batch", 25); 
new Thread(() -> { 
    while (true) { 
     jmsBatcher.processBatch(); 
    } 
}).start(); 

A continuación, puede comprometerse con la base de datos en lotes de los resultados acumulados. Si hay fallas, la transacción se reintentará.