Я делаю первые шаги с RX. Я прочитал несколько бит об этом, но я думал, что грязные руки станут лучшим способом. Поэтому я начал преобразовывать один из моих существующих кодов в код типа Rx.Создать горячий наблюдаемый
Цель: Я пытаюсь высмеять источник, который отправляет данные с определенной частотой (скажем, 60/с, видеокамера или что-то еще). У меня есть кадры, которые были записаны для имитации источника, в то время как источник недоступен. И мне нужен источник, чтобы начать отправку, даже если никто не слушает, потому что это то, что будет делать настоящий источник.
До Rx я пошел и сделал Runnable, который просто выполняет итерации над 15.000 элементами данных, отправляет элемент на мой сервер RabbitMQ и спит за 1/60 секунд, а затем отправляет следующий.
Теперь я хочу превратить эту логику в горячую наблюдаемую, просто для игры. До сих пор у меня есть это:
Observable.from(mDataItems)
.takeWhile(item -> mRunning)
.map(mGson::toJson)
.doOnNext(json -> {
try {
mChannel.basicPublish(EXCHANGE_NAME, "", null, json.getBytes());
} catch (IOException e) {
Logger.error(e, String.format("Could not publish to %s exchange", EXCHANGE_NAME));
}
try {
Thread.sleep(1/SENDING_FREQUENCY_IN_HZ);
} catch (InterruptedException e) {
Logger.error(e, String.format("Could not sleep for %d ms", (int) (1000/SENDING_FREQUENCY_IN_HZ)));
}
})
.doOnCompleted(() -> {
if (mRunning)
Logger.info("All data sent");
else
Logger.info("Interrupted while sending");
disconnect();
mRunning = false;
})
.subscribeOn(Schedulers.io())
.publish()
.connect();
И даже если он работает до сих пор, я не знаю, если это «хороший» способ создания горячей Наблюдаемые (или Наблюдаемые в целом по этому вопросу), который просто излучает Предметы. (Я также не знаю, использовать ли я предмет, а не Наблюдаемый, но это другой вопрос).