У меня есть Observable, исходящий от EventEmitter, который на самом деле является просто http-соединением, потоковым событиями.RxJs Динамически добавлять события из другого EventEmitter
Иногда мне приходится отключиться от основного потока и снова подключиться. Я не уверен, как справиться с этим с помощью rxjs.
Я не уверен, могу ли я закончить источник, а затем динамически добавить другой источник в источник, или если мне нужно что-то сделать, как у меня на самом дне.
var Rx = require('rx'),
EventEmitter = require('events').EventEmitter;
var eventEmitter = new EventEmitter();
var eventEmitter2 = new EventEmitter();
var source = Rx.Observable.fromEvent(eventEmitter, 'data')
var subscription = source.subscribe(function (data) {
console.log('data: ' + data);
});
setInterval(function() {
eventEmitter.emit('data', 'foo');
}, 500);
// eventEmitter stop emitting data, underlying connection closed
// now attach seconds eventemitter (new connection)
// something like this but obvouisly doesn't work
source
.fromEvent(eventEmitter2, 'data')
Puesdo код, который больше того, что я делаю, я создаю второе соединение потока перед тем, как закрыть первый, так что я не «потерять» данные. Здесь я не уверен, как остановить Observable без «проигрывания» записей из-за того, что onNext не вызывается из-за буфера.
var streams = [], notifiers = [];
// create initial stream
createNewStream();
setInterval(function() {
if (params of stream have changed) createNewStream();
}, $1minutes/3);
function createNewStream() {
var stream = new eventEmitterStream();
stream.once('connected', function() {
stopOthers();
streams.push(stream);
createSource(stream, 'name', 'id');
});
}
function stopOthers() {
while(streams.length > 0) {
streams.pop().stop(); // stop the old stream
}
while(notifiers.length > 0) {
// if i call this, the buffer may lose records, before onNext() called
//notifiers.pop()(Rx.Notification.createOnCompleted());
}
}
function createObserver(tag) {
return Rx.Observer.create(
function (x) {
console.log('Next: ', tag, x.length, x[0], x[x.length-1]);
},
function (err) {
console.log('Error: ', tag, err);
},
function() {
console.log('Completed', tag);
});
}
function createSource(stream, event, id) {
var source = Rx.Observable
.fromEvent(stream, event)
.bufferWithTimeOrCount(time, max);
var subscription = source.subscribe(createObserver(id));
var notifier = subscription.toNotifier();
notifiers.push(notifier);
}
Прочитайте часть о буферизации и противодавлении в документах. –
@BenjaminGruenbaum Я прочитал документы. Существует много примеров на общих страницах и на конкретных методах. Вот где я получил буферные методы. Есть ли определенная страница или часть, которые, по вашему мнению, будут наиболее полезными. – dre