2015-09-24 3 views
3

У меня есть поток интеграции, который выглядит следующим образом:AMQP Spring Integration обработка ошибки

@Bean 
public IntegrationFlow auditFlow(@Qualifier("eventLoggingConnectionFactory") ConnectionFactory connectionFactory, 
           @Qualifier("writeChannel") MessageChannel messageChannel, 
           @Qualifier("parseErrorChannel") MessageChannel errorChannel) { 
    return IntegrationFlows 
      .from(Amqp.inboundAdapter(connectionFactory, auditQueue) 
        .errorChannel(errorChannel) 
        .concurrentConsumers(numConsumers) 
        .messageConverter(new MongoMessageConverter())) // converts JSON to org.bson.Document 
      .enrichHeaders(e -> e.<Document>headerFunction(MongoWriteConfiguration.MONGO_COLLECTION_HEADER_KEY, 
        o -> getNamespace(o.getPayload()))) 
      .channel(messageChannel) 
      .get(); 
} 

Преобразователь сообщений может конечно ошибаться, если уродливое сообщение вводится, поднимая MessageConversionException. В этом случае я, конечно, не хочу, чтобы сообщение было запрошено, но я также не хочу, чтобы по умолчанию было отказано от отказа от запроса, как это можно сделать в AmqpInboundAdapterSpec. Каков правильный способ для меня не запрашивать сообщения, которые ошибочно принимают (и в противном случае переиздают их для целей отладки)?

В более общем плане, процессы в нисходящем потоке в одном потоке могут ошибаться для данных, которые более семантически искажены - еще раз я не хочу их запрашивать. В то время я мог бросить AmqpRejectAndDontRequeueException, но тогда я теряю разделение проблем, что составляет половину этого. Каков правильный способ действовать по этим исключениям - возможно, есть способ перевести на AmqpRejectAndDontRequeueException?

ответ

3

Обработчик ошибок по умолчанию в въездном адаптере SimpleMessageListenerContainerConditionalRejectingErrorHandler) не только, что (он обнаруживает MessageConversionException и бросает AmqpRejectAndDontRequeueException).

Вы можете настроить ConditionalRejectingErrorHandler, введя свой собственный FatalExceptionStrategy, чтобы искать другие типы исключений и обрабатывать их одинаково.

DefaultExceptionStrategy выглядит следующим образом ...

@Override 
public boolean isFatal(Throwable t) { 
    if (t instanceof ListenerExecutionFailedException 
      && t.getCause() instanceof MessageConversionException) { 
     if (logger.isWarnEnabled()) { 
      logger.warn("Fatal message conversion error; message rejected; " 
        + "it will be dropped or routed to a dead letter exchange, if so configured: " 
        + ((ListenerExecutionFailedException) t).getFailedMessage(), t); 
     } 
     return true; 
    } 
    return false; 
} 

Он вызывается только, если есть уже не в цепи причин AmqpRejectAndDontRequeueException.

Смежные вопросы