Есть ли способ весной интеграции Java DSL для изменения существующего заголовка сообщения?Как увеличить заголовок сообщения
Я повторно реализую механизм повторной загрузки с использованием SI Java DSL и хочу увеличить заголовок сообщения, содержащий попытки загрузки при возникновении сбоя, до маршрутизации сообщения на основе количества попыток по сравнению с лимитом.
У меня есть маршрутизация, прекрасно работающая на основе тестов RouterTests, включенных в SI. С HeaderEnrichers я могу легко добавить заголовок, но я не вижу способа изменить существующий заголовок.
Благодаря
/**
* Unit test of {@link RetryRouter}.
*
* Based on {@link RouterTests#testMethodInvokingRouter2()}.
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@DirtiesContext
public class RetryRouterTests {
/** Failed download attempts are sent to this channel to be routed by {@link ContextConfiguration#failedDownloadRouting() } */
@Autowired
@Qualifier("failed")
private MessageChannel failed;
/** Retry attempts for failed downloads are sent to this channel by {@link ContextConfiguration#failedDownloadRouting() }*/
@Autowired
@Qualifier("retry-channel")
private PollableChannel retryChannel;
/** Failed download attempts which will not be retried, are sent to this channel by {@link ContextConfiguration#failedDownloadRouting() }*/
@Autowired
@Qualifier("exhausted-channel")
private PollableChannel exhaustedChannel;
/**
* Unit test of {@link ContextConfiguration#failedDownloadRouting() } and {@link RetryRouter}.
*/
@Test
public void retryRouting() {
final int limit = 2;
for (int attempt = 0 ; attempt <= limit + 1 ; attempt++){
this.failed.send(failed(attempt, limit));
if (attempt < limit){
assertEquals(payload(attempt) , this.retryChannel.receive().getPayload());
assertNull(this.exhaustedChannel.receive(0));
}else{
assertEquals(payload(attempt) , this.exhaustedChannel.receive().getPayload());
assertNotNull(this.exhaustedChannel.receive().getPayload());
}
}
}
private Message<String> failed(int retry , int limit) {
return MessageBuilder
.withPayload( payload(retry))
.setHeader("retries", new AtomicInteger(retry))
.setHeader("limit", limit)
.build();
}
private String payload (int retry){
return "retry attempt "+retry;
}
@Configuration
@EnableIntegration
public static class ContextConfiguration {
@Bean
public MessageChannel loggerChannel() {
return MessageChannels.direct().get();
}
@Bean(name = "retry-channel")
public MessageChannel retryChannel() {
return new QueueChannel();
}
@Bean(name = "exhausted-channel")
public MessageChannel exhaustedChannel() {
return new QueueChannel();
}
/**
* Decides if a failed download attempt can be retried or not, based upon the number of attempts already made
* and the limit to the number of attempts that may be made. Logic is in {@link RetryRouter}.
* <p>
* The number of download attempts already made is provided as a header {@link #attempts} from the connector doing the download,
* and the limit to the number of attempts is another header {@link #retryLimit} which is originally setup as
* a header by {@link DownloadDispatcher} from retry configuration.
* <p>
* Messages for failed download attempts are listened to on channel {@link #failed}.
* Retry attempts are routed to {@link #retryChannel()}
*
* @return
*/
@Bean
public IntegrationFlow failedDownloadRouting() {
return IntegrationFlows.from("failed")
.handle("headers.retries.getAndIncrement()")
.handle(logMessage ("failed"))
.route(new RetryRouter())
.get();
}
/**
* Decides if a failed download attempt can be retried or not, based upon the number of attempts already made
* and the limit to the number of attempts that may be made.
* <p>
*/
private static class RetryRouter {
@Router
public String routeByHeader(@Header("retries") AtomicInteger attempts , @Header("limit") Integer limit) {
if (attempts.intValue() < limit.intValue()){
return "retry-channel";
}
return "exhausted-channel";
}
/** This method is not used but is required by Spring Integration otherwise application context doesn't load because of
* {@code Caused by: java.lang.IllegalArgumentException: Target object of type
* [class org.springframework.integration.dsl.test.routers.RetryRouterTests$RetryRouter] has no eligible methods for handling Messages.}
*
* @throws UnsupportedOperationException if called
*/
@SuppressWarnings("unused")
public String routeMessage(Message<?> message) {
throw new UnsupportedOperationException("should not be used.");
}
}
}
Спасибо за ответ. Я не уверен в значении первого «первого» первого подписчика классификатора на канале публикации-подписчика. Я попробовал это в своем тестовом коде, который я добавил в исходный вопрос, но получаю сообщение об ошибке, с которым я не могу продвинуться, добавит в следующий комментарий. –
Вызвано: java.lang.IllegalArgumentException: найден неоднозначный тип параметра [класс java.lang.String] для соответствия метода: [public byte [] ..... java.lang.String java.lang.String.valueOf (long)] \t на org.springframework.util.Assert.isNull (Assert.java:92) \t в org.springframework.integration.util.MessagingMethodInvokerHelper.findHandlerMethodsForTarget (MessagingMethodInvokerHelper.java:497) –
К сожалению, это была моя ошибка: '.handle()' не поддерживает вариант 'expression'. См. Обновление в моем ответе. Вы должны предоставить реализацию 'MessageHandler', чтобы получить доступ к' MessageHeaders'. –