Я изучаю Reactor, и мне интересно, как добиться определенного поведения. Предположим, у меня есть поток входящих сообщений. Каждое сообщение связано с определенной сущностью и содержит некоторые данные.Параллельная отправка групп groupBy в реакторе
interface Message {
String getEntityId();
Data getData();
}
Сообщения, относящиеся к различным объектам, могут обрабатываться параллельно. Однако сообщения, относящиеся к любому отдельному объекту, должны обрабатываться по одному за раз, то есть обработка сообщения 2 для объекта "abc"
не может начинаться до тех пор, пока обработка сообщения 1 для объекта "abc"
не будет завершена. Пока обработка сообщения продолжается, дополнительные сообщения для этого entiy должны быть буферизованы. Сообщение для других объектов может быть беспрепятственным. Можно думать о нем, как там быть на резьбе в сущности работает следующий код:
public void run() {
for (;;) {
// Blocks until there's a message available
Message msg = messageQueue.nextMessageFor(this.entityId);
// Blocks until processing is finished
processMessage(msg);
}
}
Как я могу добиться этого с React без блокировки? Общая скорость сообщения может быть высокой, но скорость передачи сообщений для одного объекта будет очень низкой. Набор объектов может быть очень большим и не обязательно известен заранее.
Я думаю, это может выглядеть примерно так, но я не знаю.
{
incomingMessages()
.groupBy(Message::getEntityId)
.flatMap(entityStream -> entityStream
/* ... */
.map(msg -> /* process the message */)))
/* ... */
}
public static Stream<Message> incomingMessages() { /* ... */ }