2015-06-04 9 views
1

Я делаю первые шаги с RX. Я прочитал несколько бит об этом, но я думал, что грязные руки станут лучшим способом. Поэтому я начал преобразовывать один из моих существующих кодов в код типа Rx.Создать горячий наблюдаемый

Цель: Я пытаюсь высмеять источник, который отправляет данные с определенной частотой (скажем, 60/с, видеокамера или что-то еще). У меня есть кадры, которые были записаны для имитации источника, в то время как источник недоступен. И мне нужен источник, чтобы начать отправку, даже если никто не слушает, потому что это то, что будет делать настоящий источник.

До Rx я пошел и сделал Runnable, который просто выполняет итерации над 15.000 элементами данных, отправляет элемент на мой сервер RabbitMQ и спит за 1/60 секунд, а затем отправляет следующий.

Теперь я хочу превратить эту логику в горячую наблюдаемую, просто для игры. До сих пор у меня есть это:

Observable.from(mDataItems) 
       .takeWhile(item -> mRunning) 
       .map(mGson::toJson) 
       .doOnNext(json -> { 
        try { 
         mChannel.basicPublish(EXCHANGE_NAME, "", null, json.getBytes()); 
        } catch (IOException e) { 
         Logger.error(e, String.format("Could not publish to %s exchange", EXCHANGE_NAME)); 
        } 

        try { 
         Thread.sleep(1/SENDING_FREQUENCY_IN_HZ); 
        } catch (InterruptedException e) { 
         Logger.error(e, String.format("Could not sleep for %d ms", (int) (1000/SENDING_FREQUENCY_IN_HZ))); 
        } 
       }) 
       .doOnCompleted(() -> { 
        if (mRunning) 
         Logger.info("All data sent"); 
        else 
         Logger.info("Interrupted while sending"); 

        disconnect(); 
        mRunning = false; 
       }) 
       .subscribeOn(Schedulers.io()) 
       .publish() 
       .connect(); 

И даже если он работает до сих пор, я не знаю, если это «хороший» способ создания горячей Наблюдаемые (или Наблюдаемые в целом по этому вопросу), который просто излучает Предметы. (Я также не знаю, использовать ли я предмет, а не Наблюдаемый, но это другой вопрос).

ответ

1

Да, есть альтернатива:

int delay = 1000/frequency; 
Observable o = Observable.from(dataItems) 
.zipWith(
    Observable.timer(delay, delay, TimeUnit.MILLISECONDS) 
     .onBackpressureDrop(), 
    (s, t) -> s) 
.map(mGson::toJson) 
// other ops as necessary 
.subscribeOn(Schedulers.io()) 
.publish(); 

o.connect(); 

o.subscribe(...); 
Смежные вопросы