2016-01-09 3 views
2

Есть ли способ весной интеграции Java DSL для изменения существующего заголовка сообщения?Как увеличить заголовок сообщения

Я повторно реализую механизм повторной загрузки с использованием SI Java DSL и хочу увеличить заголовок сообщения, содержащий попытки загрузки при возникновении сбоя, до маршрутизации сообщения на основе количества попыток по сравнению с лимитом.

У меня есть маршрутизация, прекрасно работающая на основе тестов RouterTests, включенных в SI. С HeaderEnrichers я могу легко добавить заголовок, но я не вижу способа изменить существующий заголовок.

Благодаря

/** 
* Unit test of {@link RetryRouter}. 
* 
* Based on {@link RouterTests#testMethodInvokingRouter2()}. 
*/ 
@ContextConfiguration 
@RunWith(SpringJUnit4ClassRunner.class) 
@DirtiesContext 
public class RetryRouterTests { 

    /** Failed download attempts are sent to this channel to be routed by {@link ContextConfiguration#failedDownloadRouting() } */ 
    @Autowired 
    @Qualifier("failed") 
    private MessageChannel failed; 

    /** Retry attempts for failed downloads are sent to this channel by {@link ContextConfiguration#failedDownloadRouting() }*/ 
    @Autowired 
    @Qualifier("retry-channel") 
    private PollableChannel retryChannel; 

    /** Failed download attempts which will not be retried, are sent to this channel by {@link ContextConfiguration#failedDownloadRouting() }*/ 
    @Autowired 
    @Qualifier("exhausted-channel") 
    private PollableChannel exhaustedChannel; 

    /** 
    * Unit test of {@link ContextConfiguration#failedDownloadRouting() } and {@link RetryRouter}. 
    */ 
    @Test 
    public void retryRouting() { 

     final int limit = 2; 

     for (int attempt = 0 ; attempt <= limit + 1 ; attempt++){ 

      this.failed.send(failed(attempt, limit)); 

      if (attempt < limit){ 

       assertEquals(payload(attempt) , this.retryChannel.receive().getPayload()); 
       assertNull(this.exhaustedChannel.receive(0)); 

      }else{ 

       assertEquals(payload(attempt) , this.exhaustedChannel.receive().getPayload()); 
       assertNotNull(this.exhaustedChannel.receive().getPayload()); 
      } 
     } 

    } 

    private Message<String> failed(int retry , int limit) { 

     return MessageBuilder 
      .withPayload( payload(retry)) 
      .setHeader("retries", new AtomicInteger(retry)) 
      .setHeader("limit", limit) 
      .build(); 
    } 

    private String payload (int retry){ 
     return "retry attempt "+retry; 
    } 


    @Configuration 
    @EnableIntegration 
    public static class ContextConfiguration { 

     @Bean 
     public MessageChannel loggerChannel() { 
      return MessageChannels.direct().get(); 
     } 

     @Bean(name = "retry-channel") 
     public MessageChannel retryChannel() { 
      return new QueueChannel(); 
     } 

     @Bean(name = "exhausted-channel") 
     public MessageChannel exhaustedChannel() { 
      return new QueueChannel(); 
     } 


     /** 
     * Decides if a failed download attempt can be retried or not, based upon the number of attempts already made 
     * and the limit to the number of attempts that may be made. Logic is in {@link RetryRouter}. 
     * <p> 
     * The number of download attempts already made is provided as a header {@link #attempts} from the connector doing the download, 
     * and the limit to the number of attempts is another header {@link #retryLimit} which is originally setup as 
     * a header by {@link DownloadDispatcher} from retry configuration. 
     * <p> 
     * Messages for failed download attempts are listened to on channel {@link #failed}. 
     * Retry attempts are routed to {@link #retryChannel()} 
     * 
     * @return 
     */ 
     @Bean 
     public IntegrationFlow failedDownloadRouting() { 

      return IntegrationFlows.from("failed") 

       .handle("headers.retries.getAndIncrement()") 
       .handle(logMessage ("failed")) 
       .route(new RetryRouter()) 
       .get(); 
     } 

     /** 
     * Decides if a failed download attempt can be retried or not, based upon the number of attempts already made 
     * and the limit to the number of attempts that may be made. 
     * <p> 
     */ 
     private static class RetryRouter { 

      @Router 
      public String routeByHeader(@Header("retries") AtomicInteger attempts , @Header("limit") Integer limit) { 

       if (attempts.intValue() < limit.intValue()){ 
        return "retry-channel"; 
       } 
       return "exhausted-channel"; 
      } 

      /** This method is not used but is required by Spring Integration otherwise application context doesn't load because of 
      * {@code Caused by: java.lang.IllegalArgumentException: Target object of type 
      * [class org.springframework.integration.dsl.test.routers.RetryRouterTests$RetryRouter] has no eligible methods for handling Messages.} 
      * 
      * @throws UnsupportedOperationException if called 
      */ 
      @SuppressWarnings("unused") 
      public String routeMessage(Message<?> message) { 

       throw new UnsupportedOperationException("should not be used."); 
      } 
     } 
    } 

ответ

1

Существует способ делать то, что вам нужно без изменений для заголовков:

.enrichHeaders(h -> h.header("downloadRetries", new AtomicInteger())) 

Затем, когда вам нужно, чтобы увеличить его, вы должны сделать именно это:

.handle(m -> m.getHeaders().get("downloadRetries", AtomicInteger.class).getAndIncrement()) 

и этот дескриптор в качестве первого одностороннего первого абонента на канале публикации-подписчика для повторной попытки вице.

UPDATE

является одним из способов 'MessageHandler' и он не подходит для настройки "outputChannel. Это конец интеграционного потока.

Благодарим за использование конфигурации по этому вопросу: теперь я проблема, и вы ошибаетесь. Решение должно быть таким:

 return IntegrationFlows.from("failed") 

      .publishSubscribeChannel(s -> s 
       .subscribe(f -> f 
         .handle(m -> m.getHeaders().get("downloadRetries", 
            AtomicInteger.class).getAndIncrement())) 
      .handle(logMessage ("failed")) 
      .route(new RetryRouter()) 
      .get(); 
    } 

где мы имеем PublishSubscribeChannel, то .subscribe() в суб-поток является первым абонентом для первого и .handle(logMessage ("failed")) в основном потоке является вторым абонентом. Последний не будет вызываться до окончания работы первого абонента.

См. Интеграция весны Reference Manual и Java DSL Manual для получения дополнительной информации.

+0

Спасибо за ответ. Я не уверен в значении первого «первого» первого подписчика классификатора на канале публикации-подписчика. Я попробовал это в своем тестовом коде, который я добавил в исходный вопрос, но получаю сообщение об ошибке, с которым я не могу продвинуться, добавит в следующий комментарий. –

+0

Вызвано: java.lang.IllegalArgumentException: найден неоднозначный тип параметра [класс java.lang.String] для соответствия метода: [public byte [] ..... java.lang.String java.lang.String.valueOf (long)] \t на org.springframework.util.Assert.isNull (Assert.java:92) \t в org.springframework.integration.util.MessagingMethodInvokerHelper.findHandlerMethodsForTarget (MessagingMethodInvokerHelper.java:497) –

+0

К сожалению, это была моя ошибка: '.handle()' не поддерживает вариант 'expression'. См. Обновление в моем ответе. Вы должны предоставить реализацию 'MessageHandler', чтобы получить доступ к' MessageHeaders'. –

0

Следующий код делает работу

.handle(new GenericHandler<Message<String>>() { 

    @Override 
    public Object handle(Message<String> payload , Map<String,Object> headers) { 

     ((AtomicInteger)headers.get("retries")).getAndIncrement(); 
     return payload; 
    }}) 
Смежные вопросы