Я следующую конфигурацию:Spring Integration: как обрабатывать несколько сообщений за один раз?
<bean id="mongoDbMessageStore" class="org.springframework.integration.mongodb.store.MongoDbMessageStore">
<constructor-arg ref="mongoDbFactoryDefault"/>
</bean>
<!-- the queue capacity is unbounded as it uses a persistent store-->
<int:channel id="logEntryChannel">
<int:queue message-store="mongoDbMessageStore"/>
</int:channel>
<!-- the poller will process 10 messages every 6 seconds -->
<int:outbound-channel-adapter channel="logEntryChannel" ref="logEntryPostProcessorReceiver" method="handleMessage">
<int:poller max-messages-per-poll="10" fixed-rate="6000"/>
</int:outbound-channel-adapter>
И обработчик сообщений определяется как
@Override
public void handleMessage(Message<?> message) throws MessagingException {
Object payload = message.getPayload();
if (payload instanceof LogEntry) {
LogEntry logEntry = (LogEntry) payload;
String app = (String) message.getHeaders().get("app");
logger.info("LogEntry Received - " + app + " " + logEntry.getEntityType() + " " + logEntry.getAction() + " " + logEntry.getEventTime());
logEntryPostProcessService.postProcess(app, logEntry);
} else {
throw new MessageRejectedException(message, "Unknown data type has been received.");
}
}
То, что я хотел бы иметь что-то вроде
@Override
public void handleMessage(List<Message<?>> messages) throws MessagingException {
...
}
поэтому в основном голосующий посылает все 10 сообщений за один вызов вместо вызова метода 10 раз по одному на сообщение.
Причина этого заключается в том, чтобы иметь возможность массово обрабатывать все сообщения в куске, тем самым повышая производительность.
В моем случае я хотел бы, чтобы все опрошенные сообщения были как одна партия, потому что это будет быстрее обрабатывать их сразу. Однако я узнал что-то новое, способ обработки сообщений параллельно. – selvinsource
Добавлена информация 'aggregator' в ответ. –