2016-06-09 4 views
7

Я ищу субъект (или нечто подобное), которые могут:Очередь как Subject в RxJava

  1. мог бы получить предметы и держать их в очереди или буфер, если нет подписчиков
  2. После того, как мы у подписчика все элементы потребляются и никогда не выработано снова
  3. Я могу подписаться/отписаться от/Темы

BehaviorSubject почти будет делать работу, но она сохраняет последний наблюдаемый предмет.

UPDATE

На основании принятого ответа я разработал подобное решение для одного наблюдаемого элемента. Также добавлена ​​часть отказа от подписки, чтобы избежать утечек памяти.

class LastEventObservable private constructor(
     private val onSubscribe: OnSubscribe<Any>, 
     private val state: State 
) : Observable<Any>(onSubscribe) { 

    fun emit(value: Any) { 
     if (state.subscriber.hasObservers()) { 
      state.subscriber.onNext(value) 
     } else { 
      state.lastItem = value 
     } 
    } 

    companion object { 
     fun create(): LastEventObservable { 
      val state = State() 

      val onSubscribe = OnSubscribe<Any> { subscriber -> 
       just(state.lastItem) 
         .filter { it != null } 
         .doOnNext { subscriber.onNext(it) } 
         .doOnCompleted { state.lastItem = null } 
         .subscribe() 

       val subscription = state.subscriber.subscribe(subscriber) 

       subscriber.add(Subscriptions.create { subscription.unsubscribe() }) 
      } 

      return LastEventObservable(onSubscribe, state) 
     } 
    } 

    private class State { 
     var lastItem: Any? = null 
     val subscriber = PublishSubject.create<Any>() 
    } 
} 
+0

Вы видели мой ответ? –

+0

Уточните, что вы подразумеваете под словом «Как только у нас есть подписчик, все предметы будут потребляться и никогда не появляться снова» - если у вас есть что-то вроде: yourSource.take (1) .subscribe(), следует ли уничтожить все элементы из вашего источника? –

+0

Нет, это должно потреблять только этот предмет. –

ответ

7

Я достигаю ожидаемого результата, создавая настраиваемый Observable, который обертывает тему публикации и обрабатывает кеш эмиссии, если нет подключенных подписчиков. Проверьте это.

public class ExampleUnitTest { 
    @Test 
    public void testSample() throws Exception { 
     MyCustomObservable myCustomObservable = new MyCustomObservable(); 

     myCustomObservable.emit("1"); 
     myCustomObservable.emit("2"); 
     myCustomObservable.emit("3"); 

     Subscription subscription = myCustomObservable.subscribe(System.out::println); 

     myCustomObservable.emit("4"); 
     myCustomObservable.emit("5"); 

     subscription.unsubscribe(); 

     myCustomObservable.emit("6"); 
     myCustomObservable.emit("7"); 
     myCustomObservable.emit("8"); 

     myCustomObservable.subscribe(System.out::println); 
    } 
} 

class MyCustomObservable extends Observable<String> { 
    private static PublishSubject<String> publishSubject = PublishSubject.create(); 
    private static List<String> valuesCache = new ArrayList<>(); 

    protected MyCustomObservable() { 
     super(subscriber -> { 
      Observable.from(valuesCache) 
        .doOnNext(subscriber::onNext) 
        .doOnCompleted(valuesCache::clear) 
        .subscribe(); 

      publishSubject.subscribe(subscriber); 
     }); 
    } 

    public void emit(String value) { 
     if (publishSubject.hasObservers()) { 
      publishSubject.onNext(value); 
     } else { 
      valuesCache.add(value); 
     } 
    } 
} 

Надеюсь, что это поможет!

С наилучшими пожеланиями.

+0

Работал как шарм.Не слишком любил эти статические переменные, хотя :) –

+0

Меня тоже нет @MartynasJurkus. Как мы можем улучшить это решение? –

+0

См. Мой обновленный вопрос. Хотя мой код находится в Котлине. –

1

Если вы хотите только подождать одного абонента, используйте UnicastSubject, но обратите внимание, что если вы отмените подписку посередине, все последующие очереди будут потеряны.

Edit:

После того как мы подписчик все элементы потребляются и не излучается снова

Для нескольких абонентов, используйте ReplaySubject.

+0

Должна быть возможность иметь несколько подписчиков. –

2

У меня была аналогичная проблема, мои требования были:

  • Если поддерживать воспроизведение значений при отсутствии наблюдателя не подписался
  • Если допускается только один наблюдатель подписался на время
  • Если позволить другому Observer подписаться, когда первый наблюдатель расположен

Я реализовал его как RxRelay, но реализация для темы была бы аналогичной:

public final class CacheRelay<T> extends Relay<T> { 

    private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>(); 
    private final PublishRelay<T> relay = PublishRelay.create(); 

    private CacheRelay() { 
    } 

    public static <T> CacheRelay<T> create() { 
     return new CacheRelay<>(); 
    } 

    @Override 
    public void accept(T value) { 
     if (relay.hasObservers()) { 
      relay.accept(value); 
     } else { 
      queue.add(value); 
     } 
    } 

    @Override 
    public boolean hasObservers() { 
     return relay.hasObservers(); 
    } 

    @Override 
    protected void subscribeActual(Observer<? super T> observer) { 
     if (hasObservers()) { 
      EmptyDisposable.error(new IllegalStateException("Only a single observer at a time allowed."), observer); 
     } else { 
      for (T element; (element = queue.poll()) != null;) { 
       observer.onNext(element); 
      } 
      relay.subscribeActual(observer); 
     } 
    } 
} 

Посмотрите на этот Gist for more

Смежные вопросы