2016-11-10 2 views
0

У меня естьбуферные несколько абонентов rx.js наблюдаемым

var subject = new rx.Subject(); 
    var stream =  rx.Observable.fromEvent(blah, 'event') 
        .filter(blah) 
        .map(blah) 
        .subscribe(subject); 

       return subject; 

тогда я прохожу с учетом нескольких обработчиков, которые собираются обрабатывать событие по-разному и с разной скоростью.
так, что у меня есть в каждом обработчике

subject.subscribe(async function (x) { 
     const func = self[x.eventName]; 
     if (func) { 
      await eventHandlerWrapper(self.handlerName, func, x); 
     } 
     }) 

У меня есть два вопроса, а) если события приходят очень быстро это обработчик будет обрабатывать их синхронно и в правильном порядке, учитывая то, как у меня есть Это? и б) если разные обработчики обрабатывают событие на разных скоростях, все они будут ждать до тех пор, пока самый медленный обработчик не пройдет до следующего события? или они будут все виды буферов и ручек в своем собственном темпе?

спасибо, R

ответ

1

Прежде всего, создание субъекта можно упростить следующим образом:

метод
const subject = rx.Observable.fromEvent(blah, 'event') 
       .filter(blah) 
       .map(blah) 
       .share(); 

доля будет создавать Subject из потока. Если вы вернете этот экземпляр объекта каждому подписчику, вы получите такое же поведение, и оно будет выглядеть лучше.

a) if the events come in super fast is the handler going to process 
them synchronously and in the right order given the way I have it? 

События будут проходить по всей цепочке по очереди и в правильном порядке. Смысл, событие, которое приходит через «fromEvent», будет проталкиваться по всей цепочке до момента, когда вы подписались на него, перед тем, как обработать следующее значение (если между оператором асинхронного взаимодействия нет :)). Бен Леш объяснил это на угловом подключении 2015: https://www.youtube.com/watch?v=KOOT7BArVHQ (вы можете посмотреть весь разговор, но около 17 минут он сравнивает массивы с наблюдаемыми).

b) if the different handlers handle the event at different speeds are 
they all going to wait till the slowest handler is through before the  
next event is provided? or will they all sort of buffer and handle at 
they're own pace? 

Они будут обрабатывать события в своем собственном темпе. Проверьте следующий пример:

let interval$ = Rx.Observable.interval(1000).share(); 

interval$.concatMap((val) => { 
    console.log('called'); 
    return Rx.Observable.of(val).delay(3000) 
    }) 
    .subscribe((val) => console.log("slow ", val)); 

interval$.subscribe((val) => console.log("fast ", val)); 

Здесь я использую наблюдаемый интервал, который я преобразовываю в тему. Поэтому он будет отправлять события каждую секунду. У меня есть одна подписка, которая принимает значение, обрабатывая это значение (которое занимает 2 секунды), а затем берет следующий (с помощью concatMap). И еще одна подписка, которая обрабатывает их немедленно. Если вы запустите этот код (jsbin здесь: https://jsbin.com/zekalab/edit?js,console), вы увидите, что они оба обрабатывают события в своем собственном темпе.

Таким образом, они не ждут самого медленного обработчика, и он будет буферизирован внутри страны.

Ситуация, о которой вы описываете, может иметь потенциально опасную ситуацию, если самый медленный процессор медленнее, чем частота, с которой происходят события. В этом случае ваш буфер будет продолжать расти, и в конечном итоге ваше приложение будет разбиваться. Это концепция, называемая противодавлением. Вы получаете события быстрее, чем вы их обрабатываете. В этом случае вам необходимо использовать такие операторы, как «буфер» или «окно» на самых медленных процессорах, чтобы избежать этой ситуации.

+0

awesome. отличный ответ. Спасибо. Я посмотрел на буфер, и это, казалось, группировало события. То, что мне нужно, это в основном очередь в конце, которую я могу подключить к Субъекту и который будет буферизовать. Будут ли для этого работать буферы? – Raif

+0

Yep должен делать трюк – KwintenP

Смежные вопросы