2012-03-26 13 views
6

En la siguiente prueba estoy tratando de simular el siguiente escenario: se inicia¿Cómo simular la entrega de mensajes en AUTO_ACKNOWLEDGE JMS Session Scenario?

  1. Una cola de mensajes.
  2. Se inicia un consumidor diseñado para fallar durante el procesamiento del mensaje.
  3. Aparece un mensaje.
  4. El consumidor comienza a procesar el mensaje.
  5. Durante el procesamiento, se lanza una excepción para simular una falla en el procesamiento del mensaje. El consumidor que falla se detiene.
  6. Se inicia otro consumidor con la intención de recoger el mensaje reenviado.

Pero mi prueba falla y el mensaje no se vuelve a entregar al nuevo consumidor. Apreciaré cualquier pista sobre esto.

MessageProcessingFailureAndReprocessingTest.java

@ContextConfiguration(locations="com.prototypo.queue.MessageProcessingFailureAndReprocessingTest$ContextConfig", 
     loader=JavaConfigContextLoader.class) 
public class MessageProcessingFailureAndReprocessingTest extends AbstractJUnit4SpringContextTests { 
    @Autowired 
    private FailureReprocessTestScenario testScenario; 

    @Before 
    public void setUp() { 
     testScenario.start(); 
    } 

    @After 
    public void tearDown() throws Exception { 
     testScenario.stop(); 
    } 

    @Test public void 
    should_reprocess_task_after_processing_failure() { 
     try { 
      Thread.sleep(20*1000); 

      assertThat(testScenario.succeedingWorker.processedTasks, is(Arrays.asList(new String[]{ 
        "task-1", 
      }))); 
     } catch (InterruptedException e) { 
      fail(); 
     } 
    } 

    @Configurable 
    public static class FailureReprocessTestScenario { 
     @Autowired 
     public BrokerService broker; 

     @Autowired 
     public MockTaskProducer mockTaskProducer; 

     @Autowired 
     public FailingWorker failingWorker; 

     @Autowired 
     public SucceedingWorker succeedingWorker; 

     @Autowired 
     public TaskScheduler scheduler; 

     public void start() { 
      Date now = new Date(); 
      scheduler.schedule(new Runnable() { 
       public void run() { failingWorker.start(); } 
      }, now); 

      Date after1Seconds = new Date(now.getTime() + 1*1000); 
      scheduler.schedule(new Runnable() { 
       public void run() { mockTaskProducer.produceTask(); } 
      }, after1Seconds); 

      Date after2Seconds = new Date(now.getTime() + 2*1000); 
      scheduler.schedule(new Runnable() { 
       public void run() { 
        failingWorker.stop(); 
        succeedingWorker.start(); 
       } 
      }, after2Seconds); 
     } 

     public void stop() throws Exception { 
      succeedingWorker.stop(); 
      broker.stop(); 
     } 
    } 

    @Configuration 
    @ImportResource(value={"classpath:applicationContext-jms.xml", 
      "classpath:applicationContext-task.xml"}) 
    public static class ContextConfig { 
     @Autowired 
     private ConnectionFactory jmsFactory; 

     @Bean 
     public FailureReprocessTestScenario testScenario() { 
      return new FailureReprocessTestScenario(); 
     } 

     @Bean 
     public MockTaskProducer mockTaskProducer() { 
      return new MockTaskProducer(); 
     } 

     @Bean 
     public FailingWorker failingWorker() { 
      TaskListener listener = new TaskListener(); 
      FailingWorker worker = new FailingWorker(listenerContainer(listener)); 
      listener.setProcessor(worker); 
      return worker; 
     } 

     @Bean 
     public SucceedingWorker succeedingWorker() { 
      TaskListener listener = new TaskListener(); 
      SucceedingWorker worker = new SucceedingWorker(listenerContainer(listener)); 
      listener.setProcessor(worker); 
      return worker; 
     } 

     private DefaultMessageListenerContainer listenerContainer(TaskListener listener) { 
      DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer(); 
      listenerContainer.setConnectionFactory(jmsFactory); 
      listenerContainer.setDestinationName("tasksQueue"); 
      listenerContainer.setMessageListener(listener); 
      listenerContainer.setAutoStartup(false); 
      listenerContainer.initialize(); 
      return listenerContainer; 
     } 

    } 

    public static class FailingWorker implements TaskProcessor { 
     private Logger LOG = Logger.getLogger(FailingWorker.class.getName()); 

     private final DefaultMessageListenerContainer listenerContainer; 

     public FailingWorker(DefaultMessageListenerContainer listenerContainer) { 
      this.listenerContainer = listenerContainer; 
     } 

     public void start() { 
      LOG.info("FailingWorker.start()"); 
      listenerContainer.start(); 
     } 

     public void stop() { 
      LOG.info("FailingWorker.stop()"); 
      listenerContainer.stop(); 
     } 

     @Override 
     public void processTask(Object task) { 
      LOG.info("FailingWorker.processTask(" + task + ")"); 
      try { 
       Thread.sleep(1*1000); 
       throw Throwables.propagate(new Exception("Simulate task processing failure")); 
      } catch (InterruptedException e) { 
       LOG.log(Level.SEVERE, "Unexpected interruption exception"); 
      } 
     } 
    } 

    public static class SucceedingWorker implements TaskProcessor { 
     private Logger LOG = Logger.getLogger(SucceedingWorker.class.getName()); 

     private final DefaultMessageListenerContainer listenerContainer; 

     public final List<String> processedTasks; 

     public SucceedingWorker(DefaultMessageListenerContainer listenerContainer) { 
      this.listenerContainer = listenerContainer; 
      this.processedTasks = new ArrayList<String>(); 
     } 

     public void start() { 
      LOG.info("SucceedingWorker.start()"); 
      listenerContainer.start(); 
     } 

     public void stop() { 
      LOG.info("SucceedingWorker.stop()"); 
      listenerContainer.stop(); 
     } 

     @Override 
     public void processTask(Object task) { 
      LOG.info("SucceedingWorker.processTask(" + task + ")"); 
      try { 
       TextMessage taskText = (TextMessage) task; 
       processedTasks.add(taskText.getText()); 
      } catch (JMSException e) { 
       LOG.log(Level.SEVERE, "Unexpected exception during task processing"); 
      } 
     } 
    } 

} 

TaskListener.java

public class TaskListener implements MessageListener { 

    private TaskProcessor processor; 

    @Override 
    public void onMessage(Message message) { 
     processor.processTask(message); 
    } 

    public void setProcessor(TaskProcessor processor) { 
     this.processor = processor; 
    } 

} 

MockTaskProducer.java

@Configurable 
public class MockTaskProducer implements ApplicationContextAware { 
    private Logger LOG = Logger.getLogger(MockTaskProducer.class.getName()); 

    @Autowired 
    private JmsTemplate jmsTemplate; 

    private Destination destination; 

    private int taskCounter = 0; 

    public void produceTask() { 
     LOG.info("MockTaskProducer.produceTask(" + taskCounter + ")"); 

     taskCounter++; 

     jmsTemplate.send(destination, new MessageCreator() { 
      @Override 
      public Message createMessage(Session session) throws JMSException { 
       TextMessage message = session.createTextMessage("task-" + taskCounter); 
       return message; 
      } 
     }); 
    } 

    @Override 
    public void setApplicationContext(ApplicationContext applicationContext) 
      throws BeansException { 
     destination = applicationContext.getBean("tasksQueue", Destination.class); 
    } 
} 
+1

Cuando configuro 'listenerContainer.setSessionTransacted (true)' veo que el mensaje se vuelve a entregar, pero solo al 'FailingWorker'. Evento después de detener el contenedor de escucha correspondiente, el 'SucceedingWorker' nunca recibe el mensaje reenviado. –

+1

Parece que 'listenerContainer.stop()' -method no cierra la conexión a las ofertas, por lo que el proveedor de JMS continúa intentando volver a enviar el mensaje fallido al mismo consumidor. Para evitar que el consumidor que falla deba llamar 'listenerContainer.shutdown()' en algún momento. –

Respuesta

7

Al parecer, la fuente de documentación que estaba buscando ayer Creating Robust JMS Applications me engañó de alguna manera (o podría haberlo entendido incorrectamente). Sobre todo que extracto:

Hasta un mensaje JMS se ha reconocido, no se considera que es consumida éxito. El consumo exitoso de un mensaje normalmente tiene lugar en tres etapas.

  1. El cliente recibe el mensaje.
  2. El cliente procesa el mensaje.
  3. El mensaje es reconocido. El acuse de recibo es iniciado por el proveedor de JMS o por el cliente, dependiendo del modo de reconocimiento de la sesión .

que supone AUTO_ACKNOWLEDGE hace exactamente eso - reconoció el mensaje después de que el método detector devuelve un resultado. Pero de acuerdo con la especificación JMS, es un poco diferente y los contenedores de escucha de Spring como se espera no intentan alterar el comportamiento de la especificación JMS.Esto es lo que el javadoc de AbstractMessageListenerContainer tiene que decir - que he hecho hincapié en la frases importantes:

El contenedor oyente ofrece las siguientes opciones de mensaje de reconocimiento :

  • "sessionAcknowledgeMode" en " AUTO_ACKNOWLEDGE "(predeterminado): Reconocimiento automático de mensajes antes de la ejecución del detector; no se vuelve a entregar en caso de excepción lanzada.
  • "sessionAcknowledgeMode" establecido en "CLIENT_ACKNOWLEDGE": Reconocimiento automático de mensajes después de la ejecución exitosa de escucha; no reentrega en caso de excepción lanzada.
  • "sessionAcknowledgeMode" establecido en "DUPS_OK_ACKNOWLEDGE": reconocimiento de mensajes vagos durante o después de la ejecución del oyente; potencial reentrega en caso de excepción lanzada.
  • "sessionTransacted" establecido en "true": Acuse de recibo transaccional después de la ejecución exitosa de la escucha; devolución garantizada en caso de excepción lanzada.

Así que la clave de mi solución es listenerContainer.setSessionTransacted(true);

Otro problema que tuvimos fue que el proveedor de JMS mantiene redelivering mensaje fallado de nuevo al mismo consumidor que había fallado durante el procesamiento del mensaje. No sé si la especificación JMS da una prescripción de lo que debe hacer el proveedor en tales situaciones, pero lo que funcionó para mí fue usar listenerContainer.shutdown(); para desconectar al consumidor que falla y permitirle al proveedor volver a enviar el mensaje y darle una oportunidad a otro consumidor.

Cuestiones relacionadas