2010-06-21 21 views
11

Tenga en cuenta que me gustaría que varios oyentes de mensajes manejen mensajes sucesivos del tema al mismo tiempo. Además, me gustaría que cada detector de mensajes funcione de forma transaccional, de modo que una falla de procesamiento en un oyente de mensajes dado resulte en que el mensaje del oyente permanezca sobre el tema.¿Cómo puedo manejar múltiples mensajes al mismo tiempo desde un tema JMS (no en cola) con Java y Spring 3.0?

El muelle DefaultMessageListenerContainer parece admitir la concurrencia solo para las colas JMS.

¿Necesito crear una instancia de varios DefaultMessageListenerContainers?

Si el tiempo fluye hacia abajo el eje vertical:

ListenerA reads msg 1  ListenerB reads msg 2  ListenerC reads msg 3 
ListenerA reads msg 4  ListenerB reads msg 5  ListenerC reads msg 6 
ListenerA reads msg 7  ListenerB reads msg 8  ListenerC reads msg 9 
ListenerA reads msg 10  ListenerB reads msg 11  ListenerC reads msg 12 
... 

ACTUALIZACIÓN:
Gracias por su regeneración @ T.Rob y @skaffman.

Lo que terminé haciendo es crear múltiples DefaultMessageListenerContainers con concurrency=1 y luego poner la lógica en la escucha de mensajes de modo que sólo un hilo procesaría un mensaje identificador dado.

+0

¿Podría aclarar? Cuando veo "varios oyentes de mensajes que manejan mensajes sucesivos del tema al mismo tiempo", creo que significa que no desea que los oyentes obtengan una copia del mismo mensaje, sino que compitan entre sí por mensajes sobre el mismo tema. ¿Es eso correcto? –

+0

Esto parece útil: http://bsnyderblog.blogspot.com/2010/05/tuning-jms-message-consumption-in.html – skaffman

Respuesta

5

Usted no quieren múltiples DefaultMessageListenerContainer casos, no, pero sí es necesario para configurar el DefaultMessageListenerContainer a ser concurrentes, usando el concurrentConsumers property:

especificar el número de concurrentes los consumidores a crear. El valor predeterminado es 1.

Especificación de un valor más alto para esta configuración aumentará la nivel estándar de programados concurrentes consumidores en tiempo de ejecución: Este es efectivamente el número mínimo de consumidores simultáneos que será programado en cualquier momento dado . Esta es una configuración estática ; para la escala dinámica, considere especificar la configuración "maxConcurrentConsumers" en su lugar.

Elevar el número de concurrentes consumidores es recomendable con el fin de escala el consumo de mensajes que vienen de una cola. Sin embargo, note que cualquier garantía de pedido se pierde una vez que varios consumidores están registrados . En general, quédese con el consumidor 1 para colas de bajo volumen.

Sin embargo, hay gran advertencia en la parte inferior:

No eleve el número de consumidores simultáneos para un tema. Esto daría lugar al consumo simultáneo del mismo mensaje, que casi nunca es deseable.

Esto es interesante, y tiene sentido cuando lo piensas. Lo mismo ocurriría si tuviera múltiples instancias DefaultMessageListenerContainer.

Creo que quizás deba replantearse su diseño, aunque no estoy seguro de qué sugeriría. El consumo simultáneo de mensajes pub/sub parece una cosa perfectamente razonable, pero ¿cómo evitar que se entregue el mismo mensaje a todos sus consumidores al mismo tiempo?

1

Esta es una de esas ocasiones en las que las diferencias en los proveedores de transporte surgen a través de la abstracción de JMS. JMS desea proporcionar una copia del mensaje para cada suscriptor sobre un tema. Pero el comportamiento que desea es realmente el de una cola. Sospecho que hay otros requisitos que llevan esto a una solución de pub/sub que no se describieron; por ejemplo, otras cosas necesitan suscribirse al mismo tema independientemente de su aplicación.

Si tuviera que hacer esto en WebSphere MQ la solución sería crear una suscripción administrativa que daría lugar a una sola copia de cada mensaje sobre el tema dado para colocar en una cola. Entonces sus múltiples suscriptores podrían competir por los mensajes en esa cola. De esta manera, su aplicación podría tener múltiples hilos entre los cuales se distribuyen los mensajes y, al mismo tiempo, otros suscriptores independientes de esta aplicación podrían suscribirse de forma dinámica (no) al mismo tema.

Desafortunadamente, no existe una forma genérica de JMS portátil para hacer esto. Depende de la implementación del proveedor de transporte en gran medida. El único de estos con los que puedo hablar es WebSphere MQ, pero estoy seguro de que otros transportes lo admiten de una forma u otra y en diversos grados si eres creativo.

+0

Me gusta su idea. Supongo que podemos implementarlo sin vincularlo a un proveedor específico. Creamos un tema y solo un suscriptor para él. Ese suscriptor coloca el mensaje del tema en una cola y ahora los consumidores de múltiples colas pueden competir por él. Agrega un nivel de indirección, pero resuelve el problema de concurrencia para el tema en DMLC. – shrini1000

0

Me he encontrado con el mismo problema. Actualmente estoy investigando RabbitMQ, que parece ofrecer una solución perfecta en un patrón de diseño que denominan "colas de trabajo". Más información aquí: http://www.rabbitmq.com/tutorials/tutorial-two-java.html

Si no está totalmente vinculado a JMS puede que tenga en cuenta esto. También podría haber un puente JMS a AMQP, pero eso podría comenzar a parecer hacky.

Me estoy divirtiendo (leer: dificultades) para instalar y ejecutar RabbitMQ en mi Mac, pero creo que estoy cerca de que funcione, y volveré a publicar si puedo resolverlo.

+0

Lo intenté y RabbitMQ funciona como un encanto. No es JMS, pero estoy usando Spring y el soporte Rabbit/AMQP es lo suficientemente bueno para mí. – cobbzilla

+0

De todos modos, en mi experiencia rabbitmq tiene problemas para perder mensajes en un ecosistema agrupado – deFreitas

-2

Encontré esta pregunta. Mi configuración es:

Crear un bean con id="DefaultListenerContainer", agregar la propiedad name="concurrentConsumers" value="10" y la propiedad name="maxConcurrentConsumers" value ="50".

Funciona bien, hasta ahora. Imprimí la identificación del hilo y verifiqué que se crean y reutilizan múltiples hilos.

+0

Compruebe la advertencia que skaffman mencionó en su respuesta anterior. – shrini1000

+0

Esta respuesta contenía la promesa de agregar pruebas de rendimiento, ¡pero nunca se agregó! He eliminado ese texto, pero si desea agregarlo en algún momento, siéntase libre. – halfer

1

Aquí hay una posibilidad:

1) crear sólo una DCLM configurado con el frijol y el método para manejar el mensaje entrante. Establezca su concurrencia en 1.

2) Configure un ejecutor de tareas con sus # hilos igual a la concurrencia que desee. Cree un grupo de objetos para los objetos que en realidad se supone que procesan un mensaje. Proporcione una referencia del ejecutor de tareas y el grupo de objetos al bean que configuró en el n. ° 1. El grupo de objetos es útil si el bean de procesamiento de mensajes actual no es seguro para subprocesos.

3) Para un mensaje entrante, el bean en DMLC crea un Runnable personalizado, lo apunta al mensaje y al grupo de objetos, y lo entrega al ejecutor de tareas.

4) El método de ejecución de Runnable obtiene un bean del grupo de objetos y llama a su método 'process' con el mensaje dado.

# 4 se pueden administrar con un proxy y el grupo de objetos para que sea más fácil.

No he probado esta solución todavía, pero parece encajar en la factura. Tenga en cuenta que esta solución no es tan robusta como EJB MDB. Primavera, por ej. no descartará un objeto del conjunto si arroja una RuntimeException.

+2

¿Cómo se asegura de que los mensajes entrantes JMS no se agreguen hasta que Runnable se complete con éxito? –

1

Al menos en ActiveMQ lo que quiere es totalmente compatible, su nombre es VirtualTopic

el concepto es:

  1. Se crea un VirtualTopic (Basta con crear un tema utilizando el prefijo VirtualTopic.) por ejemplo, . VirtualTopic.Color
  2. Crear un consumidor suscribirse a este VirtualTopic que coincida con este patrón Consumer.<clientName>.VirtualTopic.<topicName> por ej. Consumer.client1.VirtualTopic.Color, haciéndolo, activemq creará una cola con ese nombre y esa cola se suscribirá a VirtualTopic.Color entonces cada mensaje publicado a este tema virtual será entregado a cliente1 cola, tenga en cuenta que funciona como intercambios RabbitMQ.
  3. haya terminado, ahora se puede consumir cliente1 cola como cada cola, con muchos consumidores, DLQ, la política de entrega de éstos, personalizado, etc.
  4. En este punto creo que se entiende que se puede crear client2, client3 y cuántos abonados desea, todos ellos recibirán una copia del mensaje publicado a VirtualTopic.Color

Aquí el código

@Component 
public class ColorReceiver { 

    private static final Logger LOGGER = LoggerFactory.getLogger(MailReceiver.class); 

    @Autowired 
    private JmsTemplate jmsTemplate; 

    // simply generating data to the topic 
    long id=0; 
    @Scheduled(fixedDelay = 500) 
    public void postMail() throws JMSException, IOException { 

     final Color colorName = new Color[]{Color.BLUE, Color.RED, Color.WHITE}[new Random().nextInt(3)]; 
     final Color color = new Color(++id, colorName.getName()); 
     final ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 
     message.setObject(color); 
     message.setProperty("color", color.getName()); 
     LOGGER.info("status=color-post, color={}", color); 
     jmsTemplate.convertAndSend(new ActiveMQTopic("VirtualTopic.color"), message); 
    } 

    /** 
    * Listen all colors messages 
    */ 
    @JmsListener(
     destination = "Consumer.client1.VirtualTopic.color", containerFactory = "colorContainer" 
     selector = "color <> 'RED'" 
    ) 
    public void genericReceiveMessage(Color color) throws InterruptedException { 
     LOGGER.info("status=GEN-color-receiver, color={}", color); 
    } 

    /** 
    * Listen only red colors messages 
    * 
    * the destination ClientId have not necessary exists (it means that his name can be a fancy name), the unique requirement is that 
    * the containers clientId need to be different between each other 
    */ 
    @JmsListener(
//  destination = "Consumer.redColorContainer.VirtualTopic.color", 
     destination = "Consumer.client1.VirtualTopic.color", 
     containerFactory = "redColorContainer", selector = "color='RED'" 
    ) 
    public void receiveMessage(ObjectMessage message) throws InterruptedException, JMSException { 
     LOGGER.info("status=RED-color-receiver, color={}", message.getObject()); 
    } 

    /** 
    * Listen all colors messages 
    */ 
    @JmsListener(
     destination = "Consumer.client2.VirtualTopic.color", containerFactory = "colorContainer" 
    ) 
    public void genericReceiveMessage2(Color color) throws InterruptedException { 
     LOGGER.info("status=GEN-color-receiver-2, color={}", color); 
    } 

} 

@SpringBootApplication 
@EnableJms 
@EnableScheduling 
@Configuration 
public class Config { 

    /** 
    * Each @JmsListener declaration need a different containerFactory because ActiveMQ requires different 
    * clientIds per consumer pool (as two @JmsListener above, or two application instances) 
    * 
    */ 
    @Bean 
    public JmsListenerContainerFactory<?> colorContainer(ActiveMQConnectionFactory connectionFactory, 
     DefaultJmsListenerContainerFactoryConfigurer configurer) { 

     final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 
     factory.setConnectionFactory(connectionFactory); 
     factory.setConcurrency("1-5"); 
     configurer.configure(factory, connectionFactory); 
     // container.setClientId("aId..."); lets spring generate a random ID 
     return factory; 
    } 

    @Bean 
    public JmsListenerContainerFactory<?> redColorContainer(ActiveMQConnectionFactory connectionFactory, 
     DefaultJmsListenerContainerFactoryConfigurer configurer) { 

     // necessary when post serializable objects (you can set it at application.properties) 
     connectionFactory.setTrustedPackages(Arrays.asList(Color.class.getPackage().getName())); 

     final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 
     factory.setConnectionFactory(connectionFactory); 
     factory.setConcurrency("1-2"); 
     configurer.configure(factory, connectionFactory); 
     return factory; 
    } 

} 

public class Color implements Serializable { 

    public static final Color WHITE = new Color("WHITE"); 
    public static final Color BLUE = new Color("BLUE"); 
    public static final Color RED = new Color("RED"); 

    private String name; 
    private long id; 

    // CONSTRUCTORS, GETTERS AND SETTERS 
} 
Cuestiones relacionadas