Я реализовал простой RxEventBus, который начинает излучать события, даже если подписчиков нет. Я хочу кэшировать последнее испущенное событие, так что, если абонент первого/следующего подписчика подписывается, он получает только один (последний) элемент.Кэш RxJava последний элемент для будущих подписчиков
Я создал тестовый класс, который описывает мою проблему:
public class RxBus {
ApplicationsRxEventBus applicationsRxEventBus;
public RxBus() {
applicationsRxEventBus = new ApplicationsRxEventBus();
}
public static void main(String[] args) {
RxBus rxBus = new RxBus();
rxBus.start();
}
private void start() {
ExecutorService executorService = Executors.newScheduledThreadPool(2);
Runnable runnable0 =() -> {
while (true) {
long currentTime = System.currentTimeMillis();
System.out.println("emiting: " + currentTime);
applicationsRxEventBus.emit(new ApplicationsEvent(currentTime));
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Runnable runnable1 =() -> applicationsRxEventBus
.getBus()
.subscribe(new Subscriber<ApplicationsEvent>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(ApplicationsEvent applicationsEvent) {
System.out.println("runnable 1: " + applicationsEvent.number);
}
});
Runnable runnable2 =() -> applicationsRxEventBus
.getBus()
.subscribe(new Subscriber<ApplicationsEvent>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onNext(ApplicationsEvent applicationsEvent) {
System.out.println("runnable 2: " + applicationsEvent.number);
}
});
executorService.execute(runnable0);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.execute(runnable1);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.execute(runnable2);
}
private class ApplicationsRxEventBus {
private final Subject<ApplicationsEvent, ApplicationsEvent> mRxBus;
private final Observable<ApplicationsEvent> mBusObservable;
public ApplicationsRxEventBus() {
mRxBus = new SerializedSubject<>(BehaviorSubject.<ApplicationsEvent>create());
mBusObservable = mRxBus.cache();
}
public void emit(ApplicationsEvent event) {
mRxBus.onNext(event);
}
public Observable<ApplicationsEvent> getBus() {
return mBusObservable;
}
}
private class ApplicationsEvent {
long number;
public ApplicationsEvent(long number) {
this.number = number;
}
}
}
runnable0 излучает события, даже если нет подписчиков. runnable1 подписывается через 3 секунды и получает последний элемент (и это нормально). Но runnable2 подписывается через 3 секунды после runnable1 и получает все элементы, полученные runnable1. Мне нужен только последний элемент, который нужно получить для runnable2. Я попытался события кэша в RxBus:
private class ApplicationsRxEventBus {
private final Subject<ApplicationsEvent, ApplicationsEvent> mRxBus;
private final Observable<ApplicationsEvent> mBusObservable;
private ApplicationsEvent event;
public ApplicationsRxEventBus() {
mRxBus = new SerializedSubject<>(BehaviorSubject.<ApplicationsEvent>create());
mBusObservable = mRxBus;
}
public void emit(ApplicationsEvent event) {
this.event = event;
mRxBus.onNext(event);
}
public Observable<ApplicationsEvent> getBus() {
return mBusObservable.doOnSubscribe(() -> emit(event));
}
}
Но проблема в том, что когда runnable2 подписывается, runnable1 получает событие дважды:
emiting: 1447183225122
runnable 1: 1447183225122
runnable 1: 1447183225122
runnable 2: 1447183225122
emiting: 1447183225627
runnable 1: 1447183225627
runnable 2: 1447183225627
Я уверен, что есть оператор RxJava для этого. Как достичь этого?
Это решило мою проблему. Я также читал о «ReplaySubject» с «createWithSize (1)», кажется, что оба дают одинаковые результаты. – Bresiu