Я пытаюсь сделать реактивное приложение, которое слушает сетевой сокет в отдельном потоке по ценам и немного озадачивается тем, как именно построить Observable
. Большая часть интерфейсов, которые у меня есть, ограничена API, который я использую, и поэтому не могу изменить. Я дистиллировал то, что я пытаюсь сделать в качестве теста ниже, но я не вижу, как заполнить тело метода getPriceReactive()
, чтобы цены были распечатаны на консоли подписчиком (см. Комментарий в коде).Связывание обратного вызова API с наблюдаемым RxJava
public class PriceObservableTest {
// This interface is defined externally and used by the API
private interface ITickHandler {
void priceReceived(double price);
}
// Stores the price (currently just one double for illustration)
private class Tick {
double price = Double.NaN;
}
// Implementation of handler called by API when it receives a price
private class TickHandler implements ITickHandler {
private final Tick tick;
TickHandler() { this.tick = new Tick(); }
@Override public void priceReceived(double x) { tick.price = x; }
}
// This class emulates the API delivering prices from the socket
private class PriceSource {
private final Thread thread;
PriceSource(final ITickHandler handler) {
thread = new Thread(new Runnable() {
final Random r = new Random();
@Override public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(100);
handler.priceReceived(r.nextDouble() * 100);
} catch (InterruptedException e) {
break;
}
}
System.out.println("Price thread closed");
}
});
}
void subscribe() { thread.start(); }
void unsubscribe() { thread.interrupt(); }
}
@Test
public void simpleTest() throws Exception {
final ITickHandler handler = new TickHandler();
// Simulate some prices received periodically from a socket
PriceSource prices = new PriceSource(handler);
Observable<Tick> reactive = getPriceReactive(handler);
reactive.subscribe(new Subscriber<Tick>() {
@Override public void onCompleted() { }
@Override public void onError(Throwable e) { }
@Override public void onNext(Tick tick) {
System.out.println("Received price: " + tick.price);
}});
// Observe prices for 1 second. The subscriber should print them to console
prices.subscribe();
Thread.sleep(1000);
prices.unsubscribe();
}
// Returns an observable that reacts to price changes
private Observable<Tick> getPriceReactive(ITickHandler handler) {
return Observable.create(new Observable.OnSubscribe<Tick>() {
@Override public void call(Subscriber<? super Tick> subscriber) {
// How to call subscriber.onNext() whenever
// priceReceived() is called with a new price?
}
});
}
}
Как-то subscriber.onNext()
должен вызываться всякий раз, когда API вызовы priceReceived()
, но я не могу вполне понять, как достичь этого. Конечно, я мог бы хранить ссылку на подписчика в TickHandler
, но этот вид поражения имеет цель иметь Observable
, не так ли?
Интересное использование темы, но я немного подозрительно отношусь к ним, так как rx docs, похоже, предполагают, что они нужны только в редких случаях. На самом деле, выясняется, что уже был задан очень похожий вопрос, решение не включает темы: http://stackoverflow.com/questions/20552598/creating-observable-from-normal-java-events/ – ScarletPumpernickel
@ScarletPumpernickel Я бы не хотел, скажите, что вопрос или любой из ответов имеют отношение к вашему вопросу (если вы не хотите загрязнять свой код с помощью настраиваемых событий). Я также хотел бы сказать, что это правильный вариант использования для темы «Subject» – supertopi
. Я использую темы, которые являются превосходными, когда вы хотите управлять Наблюдаемым с помощью внешних событий, возможно, не полностью под вашим контролем. Observable.create() также работает, но он будет немного более громоздким, и вам нужно будет обрабатывать (или предотвращать) несколько подписчиков. –