2016-03-30 5 views
1

Я пытаюсь сделать реактивное приложение, которое слушает сетевой сокет в отдельном потоке по ценам и немного озадачивается тем, как именно построить Observable. Большая часть интерфейсов, которые у меня есть, ограничена API, который я использую, и поэтому не могу изменить. Я дистиллировал то, что я пытаюсь сделать в качестве теста ниже, но я не вижу, как заполнить тело метода getPriceReactive(), чтобы цены были распечатаны на консоли подписчиком (см. Комментарий в коде).Связывание обратного вызова API с наблюдаемым RxJava

public class PriceObservableTest { 

    // This interface is defined externally and used by the API 
    private interface ITickHandler { 
     void priceReceived(double price); 
    } 

    // Stores the price (currently just one double for illustration) 
    private class Tick { 
     double price = Double.NaN; 
    } 

    // Implementation of handler called by API when it receives a price 
    private class TickHandler implements ITickHandler { 
     private final Tick tick; 

     TickHandler() { this.tick = new Tick(); } 

     @Override public void priceReceived(double x) { tick.price = x; } 
    } 

    // This class emulates the API delivering prices from the socket 
    private class PriceSource { 
     private final Thread thread; 

     PriceSource(final ITickHandler handler) { 
      thread = new Thread(new Runnable() { 
       final Random r = new Random(); 
       @Override public void run() { 
        while (!Thread.currentThread().isInterrupted()) { 
         try { 
          Thread.sleep(100); 
          handler.priceReceived(r.nextDouble() * 100); 
         } catch (InterruptedException e) { 
          break; 
         } 
        } 
        System.out.println("Price thread closed"); 
       } 
     }); 
     } 

     void subscribe() { thread.start(); } 

     void unsubscribe() { thread.interrupt(); } 
    } 

    @Test 
    public void simpleTest() throws Exception { 

     final ITickHandler handler = new TickHandler(); 

     // Simulate some prices received periodically from a socket 
     PriceSource prices = new PriceSource(handler); 

     Observable<Tick> reactive = getPriceReactive(handler); 

     reactive.subscribe(new Subscriber<Tick>() { 
      @Override public void onCompleted() { } 
      @Override public void onError(Throwable e) { } 
      @Override public void onNext(Tick tick) { 
       System.out.println("Received price: " + tick.price); 
      }}); 

     // Observe prices for 1 second. The subscriber should print them to console 
     prices.subscribe(); 
     Thread.sleep(1000); 
     prices.unsubscribe(); 
    } 

    // Returns an observable that reacts to price changes 
    private Observable<Tick> getPriceReactive(ITickHandler handler) { 
     return Observable.create(new Observable.OnSubscribe<Tick>() { 
      @Override public void call(Subscriber<? super Tick> subscriber) { 

       // How to call subscriber.onNext() whenever 
       // priceReceived() is called with a new price? 

      } 
     }); 
    } 
} 

Как-то subscriber.onNext() должен вызываться всякий раз, когда API вызовы priceReceived(), но я не могу вполне понять, как достичь этого. Конечно, я мог бы хранить ссылку на подписчика в TickHandler, но этот вид поражения имеет цель иметь Observable, не так ли?

ответ

1

Переход к Observable в ITickHandler осуществления. Вы не контролируют подписчика (ов), но издатель

private class TickHandler implements ITickHandler { 
    private final Tick tick; 
    private final PublishSubject<Tick> priceSubject; 

    TickHandler() { 
     this.tick = new Tick(); 
     this.priceSubject = PublishSubject.create(); 
    } 

    @Override public void priceReceived(double x) 
    { 
     tick.price = x; 
     priceSubject.onNext(tick); 
    } 

    public Observable<Tick> priceReceivedObservable() 
    { 
     return priceSubject.asObservable(); 
    } 
} 

И вы можете использовать его в своих тестах, как:

final ITickHandler handler = new TickHandler(); 
PriceSource prices = new PriceSource(handler); 

handler.priceReceivedObservable() 
     .subscribe(new Subscriber<Tick>() { 
      @Override public void onCompleted() { } 
      @Override public void onError(Throwable e) { } 
      @Override public void onNext(Tick tick) { 
       System.out.println("Received price: " + tick.price); 
      }}); 

Я предупреждаю вас, это не проверял, так как я не делать много Java :)

+0

Интересное использование темы, но я немного подозрительно отношусь к ним, так как rx docs, похоже, предполагают, что они нужны только в редких случаях. На самом деле, выясняется, что уже был задан очень похожий вопрос, решение не включает темы: http://stackoverflow.com/questions/20552598/creating-observable-from-normal-java-events/ – ScarletPumpernickel

+0

@ScarletPumpernickel Я бы не хотел, скажите, что вопрос или любой из ответов имеют отношение к вашему вопросу (если вы не хотите загрязнять свой код с помощью настраиваемых событий). Я также хотел бы сказать, что это правильный вариант использования для темы «Subject» – supertopi

+1

. Я использую темы, которые являются превосходными, когда вы хотите управлять Наблюдаемым с помощью внешних событий, возможно, не полностью под вашим контролем. Observable.create() также работает, но он будет немного более громоздким, и вам нужно будет обрабатывать (или предотвращать) несколько подписчиков. –

Смежные вопросы