2017-01-24 5 views
5

У меня возникли проблемы с поиском пример того, как сделать пользовательский оператор с RxJava 2. Я рассмотрел несколько подходов:Создание пользовательских операторов в RxJava2?

  1. Использование Observable.create, а затем flatMap ИНГ на него из источника наблюдаемой. Я могу заставить это работать, но это не совсем правильно. Я в конечном итоге создаю статическую функцию, которая предоставляет источник Observable, а затем flatMap на источнике. В OnSubscribe я создаю экземпляр объекта, через который передаю эмиттер, который обрабатывает и управляет Observable/Emitter (поскольку это не тривиально, и я хочу, чтобы все было как можно инкапсулировано).
  2. Создание ObservableOperator и предоставление его Observable.lift. Я не могу найти никаких примеров этого для RxJava 2. Мне пришлось отлаживать мой собственный пример, чтобы убедиться, что мое понимание восходящего и нисходящего потока было правильным. Поскольку я не могу найти никаких примеров или документации по этому поводу для RxJava 2, я немного волнуюсь, что могу случайно сделать то, что я не должен.
  3. Создайте мой собственный Observable тип. Это похоже на то, как работают основные операторы, многие из которых распространяются на AbstractObservableWithUpstream. Однако здесь много чего происходит, и, похоже, легко пропустить что-то или сделать что-то, чего я не должен. Я не уверен, должен ли я использовать такой подход или нет. Я прошел через ментальный процесс, и кажется, что он может получить волосатый довольно быстро.

Я собираюсь продолжить работу с опцией № 2, но подумал, что стоит спросить, что поддерживаемый метод для этого был в RxJava2, а также выяснить, есть ли какая-либо документация или примеры для этого.

+0

Для # 2, я подозреваю, что это тот же самый механизм, который используется для создания всех нативных операторов Rx»например 'buffer',' window' и т. д. Итак, вы можете пойти в github и искать исходный код для них, чтобы увидеть, как они реализованы. – Luciano

+0

@ Luciano # 3 - это то, как производятся все операторы rx. Как я уже говорил выше, глядя на исходный код, он быстро становится волосатым. Многие вспомогательные методы называются повсюду.'RxAssembly',' DisposableHelper', и они используются точным образом. Это определенно можно было бы сделать, но я бы просто копировал формат. Я хотел бы убедиться, что я понимаю, что происходит в моем коде. – spierce7

ответ

1

Операторы записи не рекомендуются для начинающих, и многие существующие шаблоны потоков могут быть достигнуты с помощью существующих операторов.

Вы просматривали wiki-страницу RxJava около writing operators for 2.x? Я предлагаю прочитать его сверху донизу.

  1. использованием create() возможно, но большинство людей используют его, чтобы излучать элементы List с по-каждого цикла, а не признание того, что Flowable.fromIterable делает это.
  2. Мы сохранили эту точку расширения, хотя операторы RxJava 2 не используют сами lift(). Если вы хотите избежать некоторых шаблонов с опцией 3., то вы можете попробовать this route.
  3. Так реализованы операторы RxJava 2. AbstractObservableWithUpstream - небольшое удобство и не обязательно для external implementors.
1

Это может вам помочь. Я реализую оператор RxJava2 для обработки APiError. Я использовал лифт оператора.

См. Пример.

public final class ApiClient implements ApiClientInterface { 
    ... 
     @NonNull 
     @Override 
     public Observable<ActivateResponse> activate(String email, EmailData emailLinkData) { 
      return myApiService.activate(email, emailData) 
        .lift(getApiErrorTransformer()) 
        .subscribeOn(Schedulers.io()); 
     } 

     private <T>ApiErrorOperator<T> getApiErrorTransformer() { 
      return new ApiErrorOperator<>(gson, networkService); 
     } 

    } 

И тогда вы можете найти пользовательский оператор

public final class ApiErrorOperator<T> implements ObservableOperator<T, T> { 
     private static final String TAG = "ApiErrorOperator"; 
     private final Gson gson; 
     private final NetworkService networkService; 

     public ApiErrorOperator(@NonNull Gson gson, @NonNull NetworkService networkService) { 
      this.gson = gson; 
      this.networkService = networkService; 
     } 

     @Override 
     public Observer<? super T> apply(Observer<? super T> observer) throws Exception { 
      return new Observer<T>() { 
       @Override 
       public void onSubscribe(Disposable d) { 
        observer.onSubscribe(d); 
       } 

       @Override 
       public void onNext(T value) { 
        observer.onNext(value); 
       } 

       @Override 
       public void onError(Throwable e) { 
        Log.e(TAG, "onError", e); 

       if (e instanceof HttpException) { 
         try { 
          HttpException error = (HttpException) e; 
          Response response = error.response(); 
          String errorBody = response.errorBody().string(); 

          ErrorResponse errorResponse = gson.fromJson(errorBody.trim(), ErrorResponse.class); 
          ApiException exception = new ApiException(errorResponse, response); 

          observer.onError(exception); 
         } catch (IOException exception) { 
          observer.onError(exception); 
         } 

        } else if (!networkService.isNetworkAvailable()) { 
         observer.onError(new NetworkException(ErrorResponse.builder() 
           .setErrorCode("") 
           .setDescription("No Network Connection Error") 
           .build())); 
        } else { 
         observer.onError(e); 
        } 
       } 

       @Override 
       public void onComplete() { 
        observer.onComplete(); 
       } 
      }; 
     } 
    } 
Смежные вопросы