Я использую Firebase в своем приложении вместе с RxJava. Firebase может уведомлять ваше приложение всякий раз, когда что-то меняется в базовых данных (добавление, удаление, изменения, ...). Я пытаюсь объединить функцию Firebase с RxJava.Комбинирование слушателя данных реального времени Firebase с RxJava
Данные, которые я слушаю для называется Leisure
и Observable
излучает LeisureUpdate
, который содержит Leisure
и тип обновления (добавлять, удалять, перемещать, изменилось).
Вот мой метод, который позволяет подписаться на эти события.
private Observable<LeisureUpdate> leisureUpdatesObservable;
private ChildEventListener leisureUpdatesListener;
private int leisureUpdatesSubscriptionsCount;
@NonNull
public Observable<LeisureUpdate> subscribeToLeisuresUpdates() {
if (leisureUpdatesObservable == null) {
leisureUpdatesObservable = Observable.create(new Observable.OnSubscribe<LeisureUpdate>() {
@Override
public void call(final Subscriber<? super LeisureUpdate> subscriber) {
leisureUpdatesListener = firebase.child(FirebaseStructure.LEISURES).addChildEventListener(new ChildEventListener() {
@Override
public void onChildAdded(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.ADDED));
}
@Override
public void onChildChanged(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.CHANGED));
}
@Override
public void onChildRemoved(DataSnapshot dataSnapshot) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.REMOVED));
}
@Override
public void onChildMoved(DataSnapshot dataSnapshot, String s) {
final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue());
subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.MOVED));
}
@Override
public void onCancelled(FirebaseError firebaseError) {
subscriber.onError(new Error(firebaseError.getMessage()));
}
});
}
});
}
leisureUpdatesSubscriptionsCount++;
return leisureUpdatesObservable;
}
Во-первых, я хотел бы использовать Observable.fromCallable()
метод для того, чтобы создать Observable
, но я предполагаю, что это невозможно, так как Firebase использует обратные вызовы, верно?
Я держу один экземпляр Observable
, чтобы всегда иметь один Observable
, где могут подписаться несколько Subscriber
.
Проблема возникает, когда все отписываются, и мне нужно прекратить слушать события в Firebase. Я так и не нашел, чтобы сделать Observable
понять, есть ли еще какие-либо подписки. Поэтому я продолжаю подсчитывать, сколько звонков я получил до subscribeToLeisuresUpdates()
, с leisureUpdatesSubscriptionsCount
.
Тогда каждый раз, когда кто-то хочет, чтобы отказаться от подписки он должен позвонить
@Override
public void unsubscribeFromLeisuresUpdates() {
if (leisureUpdatesObservable == null) {
return;
}
leisureUpdatesSubscriptionsCount--;
if (leisureUpdatesSubscriptionsCount == 0) {
firebase.child(FirebaseStructure.LEISURES).removeEventListener(leisureUpdatesListener);
leisureUpdatesObservable = null;
}
}
Это единственный способ, которым я нашел, чтобы сделать Observable
выделяет элементы, когда есть абонент, но я чувствую, что должно быть проще способ, особенно понимая, когда абонентов больше не слушают наблюдаемые.
Любой, кто столкнулся с подобной проблемой или имеет другой подход?
Я довольно новичок в RX и хотел дать Firestore - новую базу данных реального времени для Firebase - попробовать. Тем не менее, я сталкиваюсь с теми же асинхронными проблемами, которые ваша библиотека помогла мне решить с помощью Firebase RTDB - благодаря тонне! Будет ли rx-обертка на Firestore похожа на то, что вы здесь сделали? –