2016-05-23 4 views
1

У меня есть исследования и чтение документов, которые они не очень понятны. То, что я пытаюсь достичь, является следующей функциональностью:Реактивное противодавление потока с проектом весеннего реактора

Я использую проект Spring Reactor и используя eventBus. Моя шина событий бросает событие в модуль A.

Модуль A должен получить событие и вставить в «Горячий поток», который будет хранить уникальные значения. Каждые 250 миллисекунгов поток должен вытащить всю ценность и сделать калляцию на них .. и так далее.

Например: eventBus бросает событие с номером: 1,2,3,2,3,2

Поток должен получить и удерживать уникальные значения -> 1,2,3 после 250 милисекунд поток должен печатать число и пустые значения

У кого-нибудь есть идея, как начать? Я пробовал примеры, но ничего действительно не работает, и я думаю, что я ничего не понимаю. У кого-нибудь есть пример?

Tnx

EDIT:

При попытке сделать следующий я всегда получаю исключение:

 Stream<List<Integer>> s = Streams.wrap(p).buffer(1, TimeUnit.SECONDS); 

     s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i)); 

     for (int i = 0; i < 10000; i++) { 
      p.onNext(i); 
     } 

Исключение:

java.lang.IllegalStateException: The environment has not been initialized yet 
    at reactor.Environment.get(Environment.java:156) ~[reactor-core-2.0.7.RELEASE.jar:?] 
    at reactor.Environment.timer(Environment.java:184) ~[reactor-core-2.0.7.RELEASE.jar:?] 
    at reactor.rx.Stream.getTimer(Stream.java:3052) ~[reactor-stream-2.0.7.RELEASE.jar:?] 
    at reactor.rx.Stream.buffer(Stream.java:2246) ~[reactor-stream-2.0.7.RELEASE.jar:?] 
    at com.ta.ng.server.controllers.user.UserController.getUsersByOrgId(UserController.java:70) ~[classes/:?] 

Как вы можете видеть, я не могу продолжить пытаясь не передавать эту проблему.

BY THE WAY: Это happeing только тогда, когда я использую buffer(1, TimeUnit.SECONDS) Если я использую buffer(50), например, он работает. Хотя это не окончательное решение, это начало.

ответ

0

Ну после повторного чтения документа я пропустил это:

static { 
     Environment.initialize(); 
    } 

Это решило проблему. Tnx