Я реализую простые аналитические функции данных с помощью RXJava, где абонент темы асинхронно обрабатывает данные, опубликованные в теме, депонируя выход в Redis.Обработка RxJava-Aynchronous Stream
Когда сообщение получено, компонент Spring публикует его в Observable. Чтобы избежать блокировки представления, я использовал RxJava Async, чтобы сделать это асинхронно.
@Override
public void onMessage(final TransactionalMessage message) {
Async.start(new Func0<Void>() {
@Override
public Void call() {
analyser.process(message);
return null;
}
});
}
У меня есть два путаницы при реализации других обрабатывающих деталей; 1) создание асинхронного наблюдения с буферизацией 2) вычисление различных логик параллельно на основе типа сообщения в списке сообщений.
После долгих экспериментов я нашел два способа создания Async Observable и не уверен, какой из них является правильным и лучшим подходом.
Путь один,
private static final class Analyzer {
private Subscriber<? super TransactionalMessage> subscriber;
public Analyzer() {
OnSubscribe<TransactionalMessage> f = subscriber -> this.subscriber = subscriber;
Observable.create(f).observeOn(Schedulers.computation())
.buffer(5, TimeUnit.SECONDS, 5, Schedulers.io())
.skipWhile((list) -> list == null || list.isEmpty())
.subscribe(t -> compute(t));
}
public void process(TransactionalMessage message) {
subscriber.onNext(message);
}
}
Way два
private static final class Analyser {
private PublishSubject<TransactionalMessage> subject;
public Analyser() {
subject = PublishSubject.create();
Observable<List<TransactionalMessage>> observable = subject
.buffer(5, TimeUnit.SECONDS, 5, Schedulers.io())
.observeOn(Schedulers.computation());
observable.subscribe(new Observer<List<TransactionalMessage>>() {
@Override
public void onCompleted() {
log.debug("[Analyser] onCompleted(), completed!");
}
@Override
public void onError(Throwable e) {
log.error("[Analyser] onError(), exception, ", e);
}
@Override
public void onNext(List<TransactionalMessage> t) {
compute(t);
}
});
}
public void process(TransactionalMessage message) {
subject.onNext(message);
}
}
TransactionalMessage поставляется в различных типов, поэтому я хочу, чтобы выполнять различные вычисления, основанные на типах. Один из подходов, который я пробовал, - это фильтровать список на основе каждого типа и обрабатывать его отдельно, но это выглядит так плохо, и я думаю, что он не работает параллельно. Какой способ обрабатывать их параллельно?
protected void compute(List<TransactionalMessage> messages) {
Observable<TransactionalMessage> observable = Observable
.from(messages);
Observable<String> observable2 = observable
.filter(new Func1<TransactionalMessage, Boolean>() {
@Override
public Boolean call(TransactionalMessage t) {
return t.getMsgType()
.equals(OttMessageType.click.name());
}
}).flatMap(
new Func1<TransactionalMessage, Observable<String>>() {
@Override
public Observable<String> call(
TransactionalMessage t) {
return Observable.just(
t.getMsgType() + t.getAppId());
}
});
Observable<String> observable3 = observable
.filter(new Func1<TransactionalMessage, Boolean>() {
@Override
public Boolean call(TransactionalMessage t) {
return t.getMsgType()
.equals(OttMessageType.image.name());
}
}).flatMap(
new Func1<TransactionalMessage, Observable<String>>() {
@Override
public Observable<String> call(
TransactionalMessage t) {
return Observable.just(
t.getMsgType() + t.getAppId());
}
});
// I sense some code smell in filtering on type and processing it.
Observable.merge(observable2, observable3)
.subscribe(new Action1<String>() {
@Override
public void call(String t) {
// save it to redis
System.out.println(t);
}
});
}