2015-07-30 4 views
1

Ниже код принимает 2 сообщения, прежде чем переходить на исходящий канал.Срок службы агрегатора весны истекает - выпуск

<bean id="timeout" 
    class="org.springframework.integration.aggregator.TimeoutCountSequenceSizeReleaseStrategy"> 
    <constructor-arg name="threshold" value="2" /> 
    <constructor-arg name="timeout" value="7000" /> 
</bean> 

<int:aggregator ref="updateCreate" input-channel="filteredAIPOutput" 
     method="handleMessage" release-strategy="releaseStrategyBean" release-strategy-method="timeout"> 
</int:aggregator> 

Моим вариантом использования является сортировка всего сообщения в течение 10 минут и отправка его на исходящий канал. Не основано на количестве сообщений, как показано выше. Для реализации этой функции на основе времени, используемый ниже код:

<int:aggregator ref="updateCreate" input-channel="filteredAIPOutput" 
     method="handleMessage" 
       output-channel="outputappendFilenameinHeader" > 
</int:aggregator> 

<bean id="updateCreate" class="helper.UpdateCreateHelper"/> 

я передал 10 сообщений, метод PojoDateStrategyHelper canRelease вызывается 10 раз.

Пытался реализовать PojoDateStrategyHelper, с логикой разницы во времени, работает как и ожидалось. Через 10 минут вызывается класс UpdateCreateHelper, но он получил только 1 сообщение (последнее сообщение). Оставшиеся 9 сообщений не встречаются нигде. Я делаю что-то неправильно здесь? Сообщения не сортируются.

Я подозреваю, что в SI должно быть что-то встроенное, что может достичь этого, если я прохожу через 10 минут в качестве параметра, как только он истечет через 10 минут, он должен передать все сообщения исходящему каналу.

Это мой UpdateCreateHelper.java код:

public Message<?> handleMessage(List<Message<?>> flights){ 

    LOGGER.debug("orderItems list ::"+flights.size()); // this is always printing 1 

    MessageBuilder<?> messageWithHeader = MessageBuilder.withPayload(flights.get(0).getPayload().toString()); 
    messageWithHeader.setHeader("ftp_filename", ""); 

    return messageWithHeader.build(); 
} 

@CorrelationStrategy 
public String correlateBy(@Header("id") String id) { 
    return id; 
} 
@ReleaseStrategy 
public boolean canRelease(List<Message<?>> flights) { 
    LOGGER.debug("inside canRelease ::"+flights.size()); // This is called for each and every message 
    return compareTime(date.getTime(), new Date().getTime()); 
} 

Я новичок в SI (3.х), я искал много для времени, связанного связанный агрегатор, не мог найти какой-либо полезный источник, пожалуйста, предложить ,

спасибо!

ответ

0
private String correlationId = date.toString(); 

@CorrelationStrategy 
public String correlateBy(Message<?> message) { 
    **// Return the correlation ID which is the timestamp the current window started (all messages should have the same correlation id)** 
    return "same"; 
} 

Раньше я возвращал Идентификатор заголовка, который отличается от Message to Message. Надеюсь, это решение может помочь кому-то. Я потратил почти 2 дня, проигнорировав такую ​​небольшую концепцию.

1

Включите ведение журнала DEBUG, чтобы узнать, почему вы видите только одно сообщение.

Я подозреваю, что должно быть что-то с встроенной в СИ, который может достичь этого, ...

До версии 4.0 (и, по умолчанию, после), агрегатор является полностью пассивным компонент; при появлении нового сообщения, стратегия выпуска доступна только после консультации.

4.0 added group timeout capabilities, посредством которого частичные группы могут быть освобождены (или отброшены) после таймаута.

Однако, с любой версией вы можете настроить MessageGroupStoreReaper для выпуска частично полных групп после некоторого таймаута. См. the documentation.

Смежные вопросы