2016-03-31 2 views
9

Я использую Firebase в своем приложении вместе с RxJava. Firebase может уведомлять ваше приложение всякий раз, когда что-то меняется в базовых данных (добавление, удаление, изменения, ...). Я пытаюсь объединить функцию Firebase с RxJava.Комбинирование слушателя данных реального времени Firebase с RxJava

Данные, которые я слушаю для называется Leisure и Observable излучает LeisureUpdate, который содержит Leisure и тип обновления (добавлять, удалять, перемещать, изменилось).

Вот мой метод, который позволяет подписаться на эти события.

private Observable<LeisureUpdate> leisureUpdatesObservable; 
private ChildEventListener leisureUpdatesListener; 
private int leisureUpdatesSubscriptionsCount; 

@NonNull 
public Observable<LeisureUpdate> subscribeToLeisuresUpdates() { 
    if (leisureUpdatesObservable == null) { 
     leisureUpdatesObservable = Observable.create(new Observable.OnSubscribe<LeisureUpdate>() { 

      @Override 
      public void call(final Subscriber<? super LeisureUpdate> subscriber) { 
       leisureUpdatesListener = firebase.child(FirebaseStructure.LEISURES).addChildEventListener(new ChildEventListener() { 
        @Override 
        public void onChildAdded(DataSnapshot dataSnapshot, String s) { 
         final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue()); 
         subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.ADDED)); 
        } 

        @Override 
        public void onChildChanged(DataSnapshot dataSnapshot, String s) { 
         final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue()); 
         subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.CHANGED)); 
        } 

        @Override 
        public void onChildRemoved(DataSnapshot dataSnapshot) { 
         final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue()); 
         subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.REMOVED)); 
        } 

        @Override 
        public void onChildMoved(DataSnapshot dataSnapshot, String s) { 
         final Leisure leisure = convertMapToLeisure((Map<String, Object>) dataSnapshot.getValue()); 
         subscriber.onNext(new LeisureUpdate(leisure, LeisureUpdate.MOVED)); 
        } 

        @Override 
        public void onCancelled(FirebaseError firebaseError) { 
         subscriber.onError(new Error(firebaseError.getMessage())); 
        } 
       }); 
      } 
     }); 
    } 
    leisureUpdatesSubscriptionsCount++; 
    return leisureUpdatesObservable; 
} 

Во-первых, я хотел бы использовать Observable.fromCallable() метод для того, чтобы создать Observable, но я предполагаю, что это невозможно, так как Firebase использует обратные вызовы, верно?

Я держу один экземпляр Observable, чтобы всегда иметь один Observable, где могут подписаться несколько Subscriber.

Проблема возникает, когда все отписываются, и мне нужно прекратить слушать события в Firebase. Я так и не нашел, чтобы сделать Observable понять, есть ли еще какие-либо подписки. Поэтому я продолжаю подсчитывать, сколько звонков я получил до subscribeToLeisuresUpdates(), с leisureUpdatesSubscriptionsCount.

Тогда каждый раз, когда кто-то хочет, чтобы отказаться от подписки он должен позвонить

@Override 
public void unsubscribeFromLeisuresUpdates() { 
    if (leisureUpdatesObservable == null) { 
     return; 
    } 
    leisureUpdatesSubscriptionsCount--; 
    if (leisureUpdatesSubscriptionsCount == 0) { 
     firebase.child(FirebaseStructure.LEISURES).removeEventListener(leisureUpdatesListener); 
     leisureUpdatesObservable = null; 
    } 
} 

Это единственный способ, которым я нашел, чтобы сделать Observable выделяет элементы, когда есть абонент, но я чувствую, что должно быть проще способ, особенно понимая, когда абонентов больше не слушают наблюдаемые.

Любой, кто столкнулся с подобной проблемой или имеет другой подход?

ответ

2

Поместите это в свой Observable.create() в конец.

subscriber.add(Subscriptions.create(new Action0() { 
        @Override public void call() { 
         ref.removeEventListener(leisureUpdatesListener); 
        } 
       })); 
6

Кроме того, вы можете использовать мою библиотеку rxFirebase

+0

Я довольно новичок в RX и хотел дать Firestore - новую базу данных реального времени для Firebase - попробовать. Тем не менее, я сталкиваюсь с теми же асинхронными проблемами, которые ваша библиотека помогла мне решить с помощью Firebase RTDB - благодаря тонне! Будет ли rx-обертка на Firestore похожа на то, что вы здесь сделали? –

5

Вы можете использовать Observable.fromEmitter, что-то вдоль этих линий

return Observable.fromEmitter(new Action1<Emitter<LeisureUpdate>>() { 
     @Override 
     public void call(final Emitter<LeisureUpdate> leisureUpdateEmitter) { 
      final ValueEventListener listener = new ValueEventListener() { 
       @Override 
       public void onDataChange(DataSnapshot dataSnapshot) { 
        // process update 
        LeisureUpdate leisureUpdate = ... 
        leisureUpdateEmitter.onNext(leisureUpdate); 
       } 

       @Override 
       public void onCancelled(DatabaseError databaseError) { 
        leisureUpdateEmitter.onError(new Throwable(databaseError.getMessage())); 
        mDatabaseReference.removeEventListener(this); 
       } 
      }; 
      mDatabaseReference.addValueEventListener(listener); 
      leisureUpdateEmitter.setCancellation(new Cancellable() { 
       @Override 
       public void cancel() throws Exception { 
        mDatabaseReference.removeEventListener(listener); 
       } 
      }); 
     } 
    }, Emitter.BackpressureMode.BUFFER); 
+0

Что такое эквивалент в Rx Java 2.0? – Mike6679

+0

Вам нужно будет использовать Flowable.create, см. Https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0. – Francesc

2

Я предлагаю вам проверить как ссылку (или просто использовать ее) в одной из следующих библиотек:

RxJava: https://github.com/nmoskalenko/RxFirebase

RxJava 2,0: https://github.com/FrangSierra/Rx2Firebase

Один из них работает с RxJava, а другой с новым РЦ RxJava 2,0. Если вас это интересует, вы можете увидеть различия между обоими here.

0

Вот пример кода для использования RxJava2 с CompletionListener Firebase в:

Completable.create(new CompletableOnSubscribe() { 
     @Override 
     public void subscribe(final CompletableEmitter e) throws Exception { 
      String orderKey = FirebaseDatabase.getInstance().getReference().child("orders").push().getKey(); 
      FirebaseDatabase.getInstance().getReference().child("orders").child(orderKey).setValue(order, 
        new DatabaseReference.CompletionListener() { 
         @Override 
         public void onComplete(DatabaseError databaseError, DatabaseReference databaseReference) { 
          if (e.isDisposed()) { 
           return; 
          } 
          if (databaseError == null) { 
           e.onComplete(); 
          } else { 
           e.onError(new Throwable(databaseError.getMessage())); 
          } 
         } 
        }); 

     } 
    }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); 
0

Еще одна удивительная библиотека, которая поможет вам обернуть все firebase логики базы данных в реальном времени под шаблоны гх.

https://github.com/Link184/Respiration

Здесь вы можете создать свой firebase хранилище и расширить его из GeneralRepository, например:

@RespirationRepository(dataSnapshotType = Leisure.class) 
public class LeisureRepository extends GeneralRepository<Leisure>{ 
    protected LeisureRepository(Configuration<Leisure> repositoryConfig) { 
     super(repositoryConfig); 
    } 

    // observable will never emit any events if there are no more subscribers 
    public void killRepository() { 
     if (!behaviorSubject.hasObservers()) { 
      //protected fields 
      behaviorSubject.onComplete(); 
      databaseReference.removeEventListener(valueListener); 
     } 
    } 
} 

Вы можете "убить" ваш репозиторий таким образом:

// LeisureRepositoryBuilder is a generated class by annotation processor, will appear after a successful gradle build 
LeisureRepositoryBuilder.getInstance().killRepository(); 

Но Я думаю, что для вашей ситуации будет лучше расширить com.link184.respiration.repository.ListRepository, чтобы избежать отображения данных из java.util.Map в Leisur e model через LeisureUpdate

+1

Вы также должны объяснить, как использовать эту библиотеку. – Taslim

+1

Хотя эта ссылка может ответить на вопрос, лучше включить здесь основные части ответа и предоставить ссылку для справки. Ответные ссылки могут стать недействительными, если связанная страница изменится. - [Из обзора] (/ review/low-quality-posts/18966092) –

Смежные вопросы