2015-04-24 3 views
3

У меня есть Observable, исходящий от EventEmitter, который на самом деле является просто http-соединением, потоковым событиями.RxJs Динамически добавлять события из другого EventEmitter

Иногда мне приходится отключиться от основного потока и снова подключиться. Я не уверен, как справиться с этим с помощью rxjs.

Я не уверен, могу ли я закончить источник, а затем динамически добавить другой источник в источник, или если мне нужно что-то сделать, как у меня на самом дне.

var Rx = require('rx'), 
    EventEmitter = require('events').EventEmitter; 

    var eventEmitter = new EventEmitter(); 
    var eventEmitter2 = new EventEmitter(); 

    var source = Rx.Observable.fromEvent(eventEmitter, 'data') 

    var subscription = source.subscribe(function (data) { 
    console.log('data: ' + data); 
    }); 

    setInterval(function() { 
    eventEmitter.emit('data', 'foo'); 
    }, 500); 

    // eventEmitter stop emitting data, underlying connection closed 
    // now attach seconds eventemitter (new connection) 

    // something like this but obvouisly doesn't work 
    source 
    .fromEvent(eventEmitter2, 'data') 

Puesdo код, который больше того, что я делаю, я создаю второе соединение потока перед тем, как закрыть первый, так что я не «потерять» данные. Здесь я не уверен, как остановить Observable без «проигрывания» записей из-за того, что onNext не вызывается из-за буфера.

var streams = [], notifiers = []; 

    // create initial stream 
    createNewStream(); 

    setInterval(function() { 
    if (params of stream have changed) createNewStream(); 
    }, $1minutes/3); 

    function createNewStream() { 
    var stream = new eventEmitterStream(); 

    stream.once('connected', function() { 
     stopOthers(); 

     streams.push(stream); 
     createSource(stream, 'name', 'id'); 
    }); 
    } 

    function stopOthers() { 
    while(streams.length > 0) { 
     streams.pop().stop(); // stop the old stream 
    } 

    while(notifiers.length > 0) { 
     // if i call this, the buffer may lose records, before onNext() called 
     //notifiers.pop()(Rx.Notification.createOnCompleted()); 
    } 
    } 

    function createObserver(tag) { 
    return Rx.Observer.create(
     function (x) { 
     console.log('Next: ', tag, x.length, x[0], x[x.length-1]); 
     }, 
     function (err) { 
     console.log('Error: ', tag, err); 
     }, 
     function() { 
     console.log('Completed', tag); 
     }); 
    } 

    function createSource(stream, event, id) { 
    var source = Rx.Observable 
     .fromEvent(stream, event) 
     .bufferWithTimeOrCount(time, max); 

    var subscription = source.subscribe(createObserver(id)); 
    var notifier = subscription.toNotifier(); 
    notifiers.push(notifier); 
    } 
+1

Прочитайте часть о буферизации и противодавлении в документах. –

+0

@BenjaminGruenbaum Я прочитал документы. Существует много примеров на общих страницах и на конкретных методах. Вот где я получил буферные методы. Есть ли определенная страница или часть, которые, по вашему мнению, будут наиболее полезными. – dre

ответ

8

Первый и формост, вы должны убедиться, что вы можете удалить всех слушателей из вашего ранее «мертвого» излучателя. В противном случае вы создадите негерметичное приложение.

Кажется, что единственный способ узнать, что EventEmitter умер, - следить за частотой, если у вас нет события, которое срабатывает при ошибке или завершении (для отключений). Последний намного, гораздо более предпочтительный.

Независимо от того, Секретный соус Rx гарантирует, что ваш поток данных будет создан и срыв в вашем наблюдаемом. Если оберните создание излучателя в наблюдаемом вами виде, а также средство его разорвать, вы сможете использовать такие удивительные вещи, как оператор retry, чтобы воссоздать это наблюдаемое.

Так что, если у вас нет возможности узнать, если он умер, и вы хотите, чтобы восстановить его, вы можете использовать что-то вроде этого:

// I'll presume you have some function to get an EventEmitter that 
// is already set up 
function getEmitter() { 
    var emitter = new EventEmitter(); 
    setInterval(function(){ 
    emitter.emit('data', 'foo'); 
    }, 500) 
    return emitter; 
} 


var emitterObservable = Observable.create(function(observer) { 
    // setup the data stream 
    var emitter = getEmitter(); 
    var handler = function(d) { 
    observer.onNext(d); 
    }; 
    emitter.on('data', handler); 

    return function() { 
    // tear down the data stream in your disposal function 
    emitter.removeListener('on', handler); 
    }; 
}); 

// Now you can do Rx magic! 
emitterObservable 
    // if it doesn't emit in 700ms, throw a timeout error 
    .timeout(700) 
    // catch all* errors and retry 
    // this means the emitter will be torn down and recreated 
    // if it times out! 
    .retry() 
    // do something with the values 
    .subscribe(function(x) { console.log(x); }); 

* Примечание: попробуйте уловы все ошибки, так что вы можете хотите добавить catch над ним, чтобы обрабатывать ошибки, не связанные с таймаутом. Вам решать.

+0

Спасибо, что сообщили мне о 'повторной попытке. Я пытаюсь это сделать сейчас. FYI: когда я запускаю, я получаю 'RangeError: максимальный размер стека вызовов'. Я думаю, что это было просто потому, что ничего не возвращалось из 'getEmitter' – dre

+0

Поведение, которое я пытаюсь использовать при повторном подключении, заключается в« повторном подключении »на моих поворотах без« потери данных ». Для этого требуется подключение сначала к другому eventEmitter, а затем отсоединение первого. – dre

+0

Интересно, мне нужно больше контекста, какие данные вы теряете между соединениями? –

Смежные вопросы