2017-01-23 4 views
1

Мы пытаемся перейти от 2.X до 3.X. https://github.com/reactor/reactor-core/issues/375 Мы использовали EventBus как диспетчер событий в нашем приложении (система с низкой задержкой FX), и он работает очень хорошо для нас.Процессоры проектных реакторов v3.X

После изменения мы решили взять каждый модуль и создать свой собственный процессор для обработки события. 1. Является ли это использование правильным с вашей точки зрения? Из-за отсутствия документа на текущем этапе и после обзора всего, что мы можем, мы действительно не знаем, что здесь делать 2. Мы попытались использовать Flux для выполнения действий каждые X интервал Например: рынок прибывает 1000 для 1 секунду, но мы хотим обработать обновление всего 4 раза за секунду. После обновления мы используем:

Процессор с буфером и отправка другому методу. В этом методе мы имеем Flux, которые получают список и пытаемся работать параллельно, чтобы выполнить свою задачу. У нас были 2 основные проблемы: 1. Иногда мы получили Null событие, которое мы не можем обнаружить, что наша система отправки я полагаю, может быть, мы перепутаны с помощью процессора

//Definition of processor 
    ReplayProcessor<Event> classAEventProcessor = ReplayProcessor.create(); 

    //Event handler subscribing 
    public void onMyEventX(Consumer<Event> consumer) { 
     Flux<Event> handler = classAEventProcessor .filter(event -> event.getType().equals(EVENT_X)); 
     handler.subscribe(consumer); 
    } 

в приведенном выше примере событие в обработчике иногда получают нуль .. Как только он перестает работать до тех пор, пока мы не перезапустим сервер (потому что только при перезапуске мы создаем процессор)

2. Мы пытались к нам параллельно, но иногда некоторые сообщения были исчезли, поэтому, возможно, мы злоупотребляем каркасом

//On constructor 
    tickProcessor.buffer(1024, Duration.of(250, ChronoUnit.MILLIS)).subscribe(markets -> 
    handleMarkets(markets)); 

    //Handler 
    Flux.fromIterable(getListToProcess()) 
     .parallel() 
    .runOn(Schedulers.parallel()) 
    .doOnNext(entryMap -> { 
     DoBlockingWork(entryMap); 
    }) 
    .sequential() 
    .subscribe(); 

Целью этого является то, что процессор разбудит каждые 250 мс и вызовет обработчик. Обработчик будет работать с Flux параллельно, чтобы улучшить и ускорить обработку. * В случае, если DoBlockingWork занимает более 250мс я не мог понять, что будет на поведение

UPDATE: EventBus был завернут нами и каждое событие подписался бросить завернутую менеджера событий. Теперь мы попытались создать процессор событий для каждого модуля, но он работает очень медленно. Мы использовали TopicProcessor с ThreadExecutor и все еще очень медленно. EventBus сделал ту же работу на высокой скорости. У кого-нибудь есть идеи? BTW, когда я пытался использовать DirectProcessor, кажется, работает намного лучше, чем TopicProcessor

ответ

0

Реактор 3 построен вокруг концепции, что вы должны избегать блокировки столько, сколько сможете, поэтому в вашем втором фрагменте DoBlockingWork выглядит не очень хорошо.

Как генерируются события? Возможно, у вас есть асинхронный API на основе слушателей, чтобы получить их? Если это так, вы можете попробовать использовать Flux.create.

Для вашего случая использования «у нас 1000 событий за 1 секунду, но мы хотим обработать 4», я бы связал оператора sample. Например, sample(Duration.ofMillis(250)) будет делить каждую секунду на 4 окна, из которых он будет излучать только последний элемент.

Написано справочное руководство, а также страница, на которой вы можете найти ссылки на внешние статьи и учебные материалы. Существует предварительный просмотр справочника WIP here и страницы учебных ресурсов here.

+1

Спасибо за ответ. Я стараюсь избегать, но это пример для «тяжелых действий», которые я должен каким-то образом его параллелировать. После перехода к v3.X шина события не может использоваться. Я попытался переключиться на TopicProcessor для каждого модуля вместо общего EventBus для всех модулей, но я видел, что он был очень медленным ... Пытался дать ему ThreadExecuter, но все же очень медленно. Итак, наши события очень медленные и параллельные, не очень хорошо работающие. Что вы предлагаете нам попробовать? У нас действительно нет подсказки, мы попробовали все, о чем мы можем думать .. Я читал все документы, которые я нашел – Aviad

+1

Считаете ли вы, что я делаю что-то не так с процессорами? Каковы различия между ними (например, DirectPrcessor)? Надеюсь, мне удалось объяснить, что я использую и нуждаюсь в помощи с лучшими практиками, чтобы избежать ошибок и по-прежнему иметь скорость и высокую пропускную способность, например, в EventBus – Aviad

Смежные вопросы