Я ищу субъект (или нечто подобное), которые могут:Очередь как Subject в RxJava
- мог бы получить предметы и держать их в очереди или буфер, если нет подписчиков
- После того, как мы у подписчика все элементы потребляются и никогда не выработано снова
- Я могу подписаться/отписаться от/Темы
BehaviorSubject
почти будет делать работу, но она сохраняет последний наблюдаемый предмет.
UPDATE
На основании принятого ответа я разработал подобное решение для одного наблюдаемого элемента. Также добавлена часть отказа от подписки, чтобы избежать утечек памяти.
class LastEventObservable private constructor(
private val onSubscribe: OnSubscribe<Any>,
private val state: State
) : Observable<Any>(onSubscribe) {
fun emit(value: Any) {
if (state.subscriber.hasObservers()) {
state.subscriber.onNext(value)
} else {
state.lastItem = value
}
}
companion object {
fun create(): LastEventObservable {
val state = State()
val onSubscribe = OnSubscribe<Any> { subscriber ->
just(state.lastItem)
.filter { it != null }
.doOnNext { subscriber.onNext(it) }
.doOnCompleted { state.lastItem = null }
.subscribe()
val subscription = state.subscriber.subscribe(subscriber)
subscriber.add(Subscriptions.create { subscription.unsubscribe() })
}
return LastEventObservable(onSubscribe, state)
}
}
private class State {
var lastItem: Any? = null
val subscriber = PublishSubject.create<Any>()
}
}
Вы видели мой ответ? –
Уточните, что вы подразумеваете под словом «Как только у нас есть подписчик, все предметы будут потребляться и никогда не появляться снова» - если у вас есть что-то вроде: yourSource.take (1) .subscribe(), следует ли уничтожить все элементы из вашего источника? –
Нет, это должно потреблять только этот предмет. –