2014-08-29 3 views
6

Я следующую конфигурацию:Spring Integration: как обрабатывать несколько сообщений за один раз?

<bean id="mongoDbMessageStore" class="org.springframework.integration.mongodb.store.MongoDbMessageStore"> 
    <constructor-arg ref="mongoDbFactoryDefault"/> 
</bean> 

<!-- the queue capacity is unbounded as it uses a persistent store--> 
<int:channel id="logEntryChannel"> 
    <int:queue message-store="mongoDbMessageStore"/> 
</int:channel> 

<!-- the poller will process 10 messages every 6 seconds --> 
<int:outbound-channel-adapter channel="logEntryChannel" ref="logEntryPostProcessorReceiver" method="handleMessage"> 
    <int:poller max-messages-per-poll="10" fixed-rate="6000"/> 
</int:outbound-channel-adapter> 

И обработчик сообщений определяется как

@Override 
public void handleMessage(Message<?> message) throws MessagingException { 
    Object payload = message.getPayload(); 
    if (payload instanceof LogEntry) { 
     LogEntry logEntry = (LogEntry) payload; 
     String app = (String) message.getHeaders().get("app"); 
     logger.info("LogEntry Received - " + app + " " + logEntry.getEntityType() + " " + logEntry.getAction() + " " + logEntry.getEventTime()); 
     logEntryPostProcessService.postProcess(app, logEntry); 
    } else { 
     throw new MessageRejectedException(message, "Unknown data type has been received."); 
    } 
} 

То, что я хотел бы иметь что-то вроде

@Override 
public void handleMessage(List<Message<?>> messages) throws MessagingException { 
... 
} 

поэтому в основном голосующий посылает все 10 сообщений за один вызов вместо вызова метода 10 раз по одному на сообщение.

Причина этого заключается в том, чтобы иметь возможность массово обрабатывать все сообщения в куске, тем самым повышая производительность.

ответ

4

Это правда, из-за (AbstractPollingEndpoint):

taskExecutor.execute(new Runnable() { 
    @Override 
    public void run() { 
     int count = 0; 
     while (initialized && (maxMessagesPerPoll <= 0 || count < maxMessagesPerPoll)) { 
... 
      if (!pollingTask.call()) { 
       break; 
      } 
... 
    } 
}); 

Таким образом, все ваши сообщения (max-messages-per-poll) обрабатываются в одном потоке. Однако они отправляются обработчику один за другим, а не как целая группа.

Для обработки параллельно вы должны использовать ExecutorChannel перед своим logEntryPostProcessorReceiver. Что-то вроде этого:

<channel id="executorChannel"> 
    <dispatcher task-executor="threadPoolExecutor"/> 
</channel> 

<bridge input-channel="logEntryChannel" output-channel="executorChannel"> 
    <poller max-messages-per-poll="10" fixed-rate="6000"/> 
</bridge> 

<outbound-channel-adapter channel="executorChannel" ref="logEntryPostProcessorReceiver" method="handleMessage"/> 

UPDATE

Для обработки сообщений в виде одной партии вы должны aggregate их. Поскольку все они являются результатом polling endpoint, в сообщениях нет sequenceDetails. Вы можете преодолеть это с каким-то фальшивым значением для correlationId:

<aggregator correlation-strategy-expression="T(Thread).currentThread().id" 
     release-strategy-expression="size() == 10"/> 

Где size() == 10 должно быть равна max-messages-per-poll.

После этого ваш logEntryPostProcessorReceiver должен применить list от payload s. Или просто одно сообщение, которое payload является списком в результате от <aggregator>.

+0

В моем случае я хотел бы, чтобы все опрошенные сообщения были как одна партия, потому что это будет быстрее обрабатывать их сразу. Однако я узнал что-то новое, способ обработки сообщений параллельно. – selvinsource

+0

Добавлена ​​информация 'aggregator' в ответ. –

4

Благодаря @Artem Билана, то здесь окончательное решение:

<bean id="mongoDbMessageStore" class="org.springframework.integration.mongodb.store.MongoDbMessageStore"> 
    <constructor-arg ref="mongoDbFactoryDefault"/> 
</bean> 

<!-- the queue capacity is unbounded as it uses a persistent store--> 
<int:channel id="logEntryChannel"> 
    <int:queue message-store="mongoDbMessageStore"/> 
</int:channel> 

<!-- 
    the poller will process 100 messages every minute 
    if the size of the group is 100 (the poll reached the max messages) or 60 seconds time out (poll has less than 100 messages) then the payload with the list of messages is passed to defined output channel 
--> 
<int:aggregator input-channel="logEntryChannel" output-channel="logEntryAggrChannel" 
    send-partial-result-on-expiry="true" 
    group-timeout="60000" 
    correlation-strategy-expression="T(Thread).currentThread().id" 
    release-strategy-expression="size() == 100"> 
    <int:poller max-messages-per-poll="100" fixed-rate="60000"/> 
</int:aggregator> 

<int:channel id="logEntryAggrChannel"/>   

<!-- the payload is a list of log entries as result of the aggregator --> 
<int:outbound-channel-adapter channel="logEntryAggrChannel" ref="logEntryPostProcessorReceiver" method="handleMessage"/> 

В соответствии с комментарием (в приведенном выше коде), я должен был установить группу тайм-аут/отправить-частичную результат-по-истечении, как некоторые группы были сформированы с идентификатором потока, но никогда не обрабатывались, потому что они не достигли состояния размера == 100.

Смежные вопросы