2015-09-09 3 views
1

Я хотел бы сериализовать выполнение функций, возвращающих наблюдаемые. Мне это удалось, но это не очень прагматичное решение. Что было бы более реактивным путем для достижения следующего?Сериализация Observables

import Rx from 'rx' 

export default class Executor { 

    constructor() { 
    this.queue = [] 
    this.draining = false 
    } 

    run(fn) { 
    return Rx.Observable.create(o => { 
     this.queue.push(Rx.Observable.defer(() => 
     fn() 
      .doOnNext((value) => o.onNext(value)) 
      .doOnError((err) => o.onError(err)) 
      .doOnCompleted(() => o.onCompleted()))) 

     async() => { 
     if (this.draining) { 
      return 
     } 

     this.draining = true 

     while (this.queue.length > 0) { 
      try { 
      await this.queue.shift().toPromise() 
      } catch(err) { 
      // Do nothing... 
      } 
     } 

     this.draining = false 
     }() 
    }) 
    } 
} 
+0

Не могли бы вы указать, что вы пытаетесь достичь немного больше? – Mosho

+0

Я хочу последовательно выполнять наблюдаемые функции, например. в клиенте <-> серверная связь клиенту не разрешено отправлять больше сообщений до завершения всего активного ответа. – ronag

+0

Возможно [этот] (https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/controlled.md) поможет? –

ответ

0

Не совсем уверен, что вы пытаетесь достичь, но это звучит, как вы после оператора flatMapWithMaxConcurrent.

import Rx from 'rx' 

export default class Executor { 

    constructor() { 
    // Queue of:() => Observable 
    this.queue = new Rx.Subject(); 
    this.queue 
     .flatMapWithMaxConcurrent(1, 
      fn => fn().catch(Rx.Observable.empty()) // Replace with proper error handling 
     ) 
     .subscribe(result => console.log(result)); 
    } 

    push(fn) { 
    this.queue.onNext(fn); 
    } 
}