2017-01-08 2 views
3

Если у меня есть поток Node js, скажем, например, от чего-то вроде process.stdin или от fs.createReadStream, как я могу преобразовать это как поток RxJs Observable, используя RxJs5?Как преобразовать удобочитаемый поток в RX наблюдаемый

Я вижу, что RxJs-Node имеет метод fromReadableStream, но похоже, что он не обновлялся в течение года.

+0

так это работает или? Кого волнует, как часто он обновляется, если он работает – smnbbrv

+0

@smnbbrv Несомненно, он работает нормально, но это RxJS4 и не совместим с RxJS5. – cartant

+2

Вы можете взглянуть на [источник] (https://github.com/Reactive-Extensions/rx-node/blob/master/index.js#L45-L83), чтобы посмотреть, что потребуется, чтобы преобразовать его самостоятельно - реализация довольно маленькая. – cartant

ответ

5

Для кто ищет это, следуя рекомендации Марка, I adapted rx-node fromStream implementation for rxjs5.

import { Observable } from 'rxjs'; 

// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52 
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') { 
    stream.pause(); 

    return new Observable((observer) => { 
    function dataHandler(data) { 
     observer.next(data); 
    } 

    function errorHandler(err) { 
     observer.error(err); 
    } 

    function endHandler() { 
     observer.complete(); 
    } 

    stream.addListener(dataEventName, dataHandler); 
    stream.addListener('error', errorHandler); 
    stream.addListener(finishEventName, endHandler); 

    stream.resume(); 

    return() => { 
     stream.removeListener(dataEventName, dataHandler); 
     stream.removeListener('error', errorHandler); 
     stream.removeListener(finishEventName, endHandler); 
    }; 
    }).share(); 
} 
+0

Я не тестировал это, так как я перешел от того, над чем работал, но если кто-то еще и сработал, я буду принимать этот ответ :) – JuanCaicedo

+0

Я использую его, и пока он работает хорошо. Я не тестировал устройство. –

2

Следующего должен работать как v4 и v5 (оговорки непроверенных):

fromStream: function (stream, finishEventName, dataEventName) { 
    stream.pause(); 

    finishEventName || (finishEventName = 'end'); 
    dataEventName || (dataEventName = 'data'); 

    return Observable.create(function (observer) { 

     // This is the "next" event 
     const data$ = Observable.fromEvent(stream, dataEventName); 

     // Map this into an error event 
     const error$ = Observable.fromEvent(stream, 'error') 
     .flatMap(err => Observable.throw(err)); 

     // Shut down the stream 
     const complete$ = Observable.fromEvent(stream, finishEventName); 

     // Put it all together and subscribe 
     const sub = data$ 
     .merge(error$) 
     .takeUntil(complete$) 
     .subscribe(observer); 

     // Start the underlying node stream 
     stream.resume(); 

     // Return a handle to destroy the stream 
     return sub; 
    }) 

    // Avoid recreating the stream on duplicate subscriptions 
    .share(); 
    }, 
Смежные вопросы