2015-11-10 4 views
2

Я реализовал простой RxEventBus, который начинает излучать события, даже если подписчиков нет. Я хочу кэшировать последнее испущенное событие, так что, если абонент первого/следующего подписчика подписывается, он получает только один (последний) элемент.Кэш RxJava последний элемент для будущих подписчиков

Я создал тестовый класс, который описывает мою проблему:

public class RxBus { 

ApplicationsRxEventBus applicationsRxEventBus; 

public RxBus() { 
    applicationsRxEventBus = new ApplicationsRxEventBus(); 
} 

public static void main(String[] args) { 
    RxBus rxBus = new RxBus(); 
    rxBus.start(); 
} 

private void start() { 
    ExecutorService executorService = Executors.newScheduledThreadPool(2); 

    Runnable runnable0 =() -> { 
     while (true) { 
      long currentTime = System.currentTimeMillis(); 
      System.out.println("emiting: " + currentTime); 
      applicationsRxEventBus.emit(new ApplicationsEvent(currentTime)); 
      try { 
       Thread.sleep(500); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    }; 

    Runnable runnable1 =() -> applicationsRxEventBus 
      .getBus() 
      .subscribe(new Subscriber<ApplicationsEvent>() { 
       @Override 
       public void onCompleted() { 

       } 

       @Override 
       public void onError(Throwable throwable) { 

       } 

       @Override 
       public void onNext(ApplicationsEvent applicationsEvent) { 
        System.out.println("runnable 1: " + applicationsEvent.number); 
       } 
      }); 

    Runnable runnable2 =() -> applicationsRxEventBus 
      .getBus() 
      .subscribe(new Subscriber<ApplicationsEvent>() { 
       @Override 
       public void onCompleted() { 

       } 

       @Override 
       public void onError(Throwable throwable) { 

       } 

       @Override 
       public void onNext(ApplicationsEvent applicationsEvent) { 
        System.out.println("runnable 2: " + applicationsEvent.number); 
       } 
      }); 


    executorService.execute(runnable0); 
    try { 
     Thread.sleep(3000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    executorService.execute(runnable1); 
    try { 
     Thread.sleep(3000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    executorService.execute(runnable2); 
} 

private class ApplicationsRxEventBus { 
    private final Subject<ApplicationsEvent, ApplicationsEvent> mRxBus; 
    private final Observable<ApplicationsEvent> mBusObservable; 

    public ApplicationsRxEventBus() { 
     mRxBus = new SerializedSubject<>(BehaviorSubject.<ApplicationsEvent>create()); 
     mBusObservable = mRxBus.cache(); 
    } 

    public void emit(ApplicationsEvent event) { 
     mRxBus.onNext(event); 
    } 

    public Observable<ApplicationsEvent> getBus() { 
     return mBusObservable; 
    } 
} 
private class ApplicationsEvent { 
    long number; 

    public ApplicationsEvent(long number) { 
     this.number = number; 
    } 
} 
} 

runnable0 излучает события, даже если нет подписчиков. runnable1 подписывается через 3 секунды и получает последний элемент (и это нормально). Но runnable2 подписывается через 3 секунды после runnable1 и получает все элементы, полученные runnable1. Мне нужен только последний элемент, который нужно получить для runnable2. Я попытался события кэша в RxBus:

private class ApplicationsRxEventBus { 
    private final Subject<ApplicationsEvent, ApplicationsEvent> mRxBus; 
    private final Observable<ApplicationsEvent> mBusObservable; 

    private ApplicationsEvent event; 

    public ApplicationsRxEventBus() { 
     mRxBus = new SerializedSubject<>(BehaviorSubject.<ApplicationsEvent>create()); 
     mBusObservable = mRxBus; 
    } 

    public void emit(ApplicationsEvent event) { 
     this.event = event; 
     mRxBus.onNext(event); 
    } 

    public Observable<ApplicationsEvent> getBus() { 
     return mBusObservable.doOnSubscribe(() -> emit(event)); 
    } 
} 

Но проблема в том, что когда runnable2 подписывается, runnable1 получает событие дважды:

emiting: 1447183225122 
runnable 1: 1447183225122 
runnable 1: 1447183225122 
runnable 2: 1447183225122 
emiting: 1447183225627 
runnable 1: 1447183225627 
runnable 2: 1447183225627 

Я уверен, что есть оператор RxJava для этого. Как достичь этого?

ответ

2

Ваш ApplicationsRxEventBus выполняет дополнительную работу путем повторного использования сохраненного события всякий раз, когда он подписывается в дополнение ко всем кешированным событиям.

Вам нужен только один BehaviorSubject + toSerialized, так как он будет удерживаться на самом последнем событии и переиздавать его подписчикам самостоятельно.

+0

Это решило мою проблему. Я также читал о «ReplaySubject» с «createWithSize (1)», кажется, что оба дают одинаковые результаты. – Bresiu

1

Вы используете неправильный интерфейс. Когда вы подписываетесь на cold Наблюдаемый, вы получаете все его события. Вам нужно превратить его в hot Наблюдаемый первым. Это делается путем создания ConnectableObservable из вашего наблюдаемого с использованием метода publish. Затем ваши наблюдатели звонят connect, чтобы начать принимать события.

Вы также можете прочитать в разделе Hot and Cold observables учебника.

+0

Что делать, если Observable не знает о количестве абонентов и времени их подписки. И ситуация, когда Observable emit event, когда нет подписчиков, и после этого подписчик подписывается. Он должен принять последнее событие. Я бы хотел, чтобы RxBus был разделен. Я читал о горячих и холодных наблюдениях. Я нашел ReplaySubject. @akarnokd BehaviourSubject просто делает трюк. – Bresiu

+0

Я все равно сделаю автобус ConnectableObservable. К этому вы можете подключить ReplaySubject с размером 1 для кэширования последнего события, как вы сказали. – Thomas

Смежные вопросы