2015-07-02 2 views
3

Я изучаю Reactor, и мне интересно, как добиться определенного поведения. Предположим, у меня есть поток входящих сообщений. Каждое сообщение связано с определенной сущностью и содержит некоторые данные.Параллельная отправка групп groupBy в реакторе

interface Message { 
    String getEntityId(); 
    Data getData(); 
} 

Сообщения, относящиеся к различным объектам, могут обрабатываться параллельно. Однако сообщения, относящиеся к любому отдельному объекту, должны обрабатываться по одному за раз, то есть обработка сообщения 2 для объекта "abc" не может начинаться до тех пор, пока обработка сообщения 1 для объекта "abc" не будет завершена. Пока обработка сообщения продолжается, дополнительные сообщения для этого entiy должны быть буферизованы. Сообщение для других объектов может быть беспрепятственным. Можно думать о нем, как там быть на резьбе в сущности работает следующий код:

public void run() { 
    for (;;) { 
     // Blocks until there's a message available 
     Message msg = messageQueue.nextMessageFor(this.entityId); 

     // Blocks until processing is finished 
     processMessage(msg); 
    } 
} 

Как я могу добиться этого с React без блокировки? Общая скорость сообщения может быть высокой, но скорость передачи сообщений для одного объекта будет очень низкой. Набор объектов может быть очень большим и не обязательно известен заранее.

Я думаю, это может выглядеть примерно так, но я не знаю.

{ 
    incomingMessages() 
      .groupBy(Message::getEntityId) 
      .flatMap(entityStream -> entityStream 
        /* ... */ 
        .map(msg -> /* process the message */))) 
        /* ... */ 
} 

public static Stream<Message> incomingMessages() { /* ... */ } 

ответ

2

С ProjectReactor вы можете решить таким образом:

@Test 
public void testMessages() { 
    Flux.fromStream(incomingMessages()) 
      .groupBy(Message::getEntityId) 
      .map(g -> g.publishOn(Schedulers.newParallel("groupByPool", 16))) //create new publisher for groups of messages 
      .subscribe(//create consumer for main stream 
        stream -> 
          stream.subscribe(this::processMessage) // create consumer for group stream 
      ); 
} 

public Stream<Message> incomingMessages() { 
    return IntStream.range(0, 100).mapToObj(i -> new Message(i, i % 10)); 
} 

public void processMessage(Message message) { 
    System.out.println(String.format("Message: %s processed by the thread: %s", message, Thread.currentThread().getName())); 
} 

private static class Message { 
    private final int id; 
    private final int entityId; 

    public Message(int id, int entityId) { 
     this.id = id; 
     this.entityId = entityId; 
    } 

    public int getId() { 
     return id; 
    } 

    public int getEntityId() { 
     return entityId; 
    } 

    @Override 
    public String toString() { 
     return "Message{" + 
       "id=" + id + 
       ", entityId=" + entityId + 
       '}'; 
    } 
} 

Я думаю, что подобное решение может быть в RxJava

Смежные вопросы