У меня есть поток интеграции, который выглядит следующим образом:AMQP Spring Integration обработка ошибки
@Bean
public IntegrationFlow auditFlow(@Qualifier("eventLoggingConnectionFactory") ConnectionFactory connectionFactory,
@Qualifier("writeChannel") MessageChannel messageChannel,
@Qualifier("parseErrorChannel") MessageChannel errorChannel) {
return IntegrationFlows
.from(Amqp.inboundAdapter(connectionFactory, auditQueue)
.errorChannel(errorChannel)
.concurrentConsumers(numConsumers)
.messageConverter(new MongoMessageConverter())) // converts JSON to org.bson.Document
.enrichHeaders(e -> e.<Document>headerFunction(MongoWriteConfiguration.MONGO_COLLECTION_HEADER_KEY,
o -> getNamespace(o.getPayload())))
.channel(messageChannel)
.get();
}
Преобразователь сообщений может конечно ошибаться, если уродливое сообщение вводится, поднимая MessageConversionException. В этом случае я, конечно, не хочу, чтобы сообщение было запрошено, но я также не хочу, чтобы по умолчанию было отказано от отказа от запроса, как это можно сделать в AmqpInboundAdapterSpec. Каков правильный способ для меня не запрашивать сообщения, которые ошибочно принимают (и в противном случае переиздают их для целей отладки)?
В более общем плане, процессы в нисходящем потоке в одном потоке могут ошибаться для данных, которые более семантически искажены - еще раз я не хочу их запрашивать. В то время я мог бросить AmqpRejectAndDontRequeueException
, но тогда я теряю разделение проблем, что составляет половину этого. Каков правильный способ действовать по этим исключениям - возможно, есть способ перевести на AmqpRejectAndDontRequeueException
?