У меня есть исследования и чтение документов, которые они не очень понятны. То, что я пытаюсь достичь, является следующей функциональностью:Реактивное противодавление потока с проектом весеннего реактора
Я использую проект Spring Reactor и используя eventBus. Моя шина событий бросает событие в модуль A.
Модуль A должен получить событие и вставить в «Горячий поток», который будет хранить уникальные значения. Каждые 250 миллисекунгов поток должен вытащить всю ценность и сделать калляцию на них .. и так далее.
Например: eventBus бросает событие с номером: 1,2,3,2,3,2
Поток должен получить и удерживать уникальные значения -> 1,2,3 после 250 милисекунд поток должен печатать число и пустые значения
У кого-нибудь есть идея, как начать? Я пробовал примеры, но ничего действительно не работает, и я думаю, что я ничего не понимаю. У кого-нибудь есть пример?
Tnx
EDIT:
При попытке сделать следующий я всегда получаю исключение:
Stream<List<Integer>> s = Streams.wrap(p).buffer(1, TimeUnit.SECONDS);
s.consume(i -> System.out.println(Thread.currentThread() + " data=" + i));
for (int i = 0; i < 10000; i++) {
p.onNext(i);
}
Исключение:
java.lang.IllegalStateException: The environment has not been initialized yet
at reactor.Environment.get(Environment.java:156) ~[reactor-core-2.0.7.RELEASE.jar:?]
at reactor.Environment.timer(Environment.java:184) ~[reactor-core-2.0.7.RELEASE.jar:?]
at reactor.rx.Stream.getTimer(Stream.java:3052) ~[reactor-stream-2.0.7.RELEASE.jar:?]
at reactor.rx.Stream.buffer(Stream.java:2246) ~[reactor-stream-2.0.7.RELEASE.jar:?]
at com.ta.ng.server.controllers.user.UserController.getUsersByOrgId(UserController.java:70) ~[classes/:?]
Как вы можете видеть, я не могу продолжить пытаясь не передавать эту проблему.
BY THE WAY: Это happeing только тогда, когда я использую buffer(1, TimeUnit.SECONDS)
Если я использую buffer(50)
, например, он работает. Хотя это не окончательное решение, это начало.