2015-03-10 4 views
16

Я хотел бы знать, как игнорировать исключения и продолжать бесконечный поток (в моем случае поток местоположений)?Как игнорировать ошибку и продолжать бесконечный поток?

Я извлекаю текущую позицию пользователя (используя Android-ReactiveLocation), а затем отправляю их в свой API (используя Retrofit).

В моем случае, когда исключение происходит во время сетевого вызова (например, таймаут), вызывается метод onError, и поток останавливается сам. Как этого избежать?

активность:

private RestService mRestService; 
private Subscription mSubscription; 
private LocationRequest mLocationRequest = LocationRequest.create() 
      .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY) 
      .setInterval(100); 
... 
private void start() { 
    mRestService = ...; 
    ReactiveLocationProvider reactiveLocationProvider = new ReactiveLocationProvider(this); 
    mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest) 
      .buffer(50) 
      .flatMap(locations -> mRestService.postLocations(locations)) // can throw exception 
      .subscribeOn(Schedulers.newThread()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .subscribe(); 
} 

RestService:

public interface RestService { 
    @POST("/.../") 
    Observable<Response> postLocations(@Body List<Location> locations); 
} 
+0

см. Аналогичный ответ с исключениями событий щелчка: http://stackoverflow.com/questions/26154236/how-to-subscribe-to-click-events-so-exceptions-dont-unsubscribe/26229584#26229584 –

ответ

7

mRestService.postLocations(locations) испустить один элемент, затем завершить. Если произошла ошибка, тогда она испускает ошибку, которая заполняет поток.

Как вы называете этот метод в flatMap, ошибка продолжается до вашего «основного» потока, а затем ваш поток останавливается.

Что вы можете сделать, это превратить вашу ошибку в другой элемент (как описано здесь: https://stackoverflow.com/a/28971140/476690), но не в вашем основном потоке (как я полагаю, вы уже пробовали), а на mRestService.postLocations(locations).

Таким образом, этот вызов выдаст ошибку, которая будет преобразована в элемент/другой наблюдаемый, а затем завершена. (без звонка onError).

С точки зрения потребителя, mRestService.postLocations(locations) будет излучать один элемент, а затем завершить, как если бы все получилось.

mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest) 
     .buffer(50) 
     .flatMap(locations -> mRestService.postLocations(locations).onErrorReturn((e) -> Collections.emptyList()) // can throw exception 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(AndroidSchedulers.mainThread()) 
     .subscribe(); 
34

Вы можете использовать один из error handling operators.

  • onErrorResumeNext( ) - инструктирует наблюдаемом излучать последовательность элементов, если он обнаруживает ошибку
  • onErrorReturn( ) - инструктирует наблюдаемом излучать определенный элемент, когда он сталкивается с ошибкой
  • onExceptionResumeNext( ) - инструктирует Наблюдаемые продолжать испуская элементы после того, как он сталкивается с исключением (но не с другим разнообразием бросков)
  • retry( ) - если источник Наблюдаемый испускает ошибку, повторно подпишитесь на нее в надежде, что она будет завершена без ошибок
  • retryWhen( ) - если источник Наблюдаемый излучает ошибку, передать эту ошибку в другую Observable, чтобы определить, нужно ли переоформить подписку на источник

Especialy retry и onExceptionResumeNext выглядят многообещающими в вашем случае.

+1

Когда я добавьте 'onExceptionResumeNext (Observable.empty())' после 'flatMap (местоположения -> mRestService.postLocations (местоположения))' 'onCompleted' вызывается и поток заканчивается. – Ziem

+15

Не добавляйте его после 'flatMap', но внутри' flatMap'. –

+0

Спасибо! onErrorResumeNext() - очень полезная конструкция для резервных копий. – Levor

1

Попробуйте вызвать службу отдыха в вызове Observable.defer. Таким образом, для каждого звонка вы получите возможность использовать свой собственный 'onErrorResumeNext', и ошибки не вызовут ваш основной поток.

reactiveLocationProvider.getUpdatedLocation(mLocationRequest) 
    .buffer(50) 
    .flatMap(locations -> 
    Observable.defer(() -> mRestService.postLocations(locations)) 
     .onErrorResumeNext(<SOME_DEFAULT_TO_REACT_TO>) 
) 
........ 

Это решение изначально из этой темы ->RxJava Observable and Subscriber for skipping exception?, но я думаю, что он будет работать в вашем случае тоже.

6

Просто вставив данные связи от @ ответ MikeN в упаковывают он потеряется:

import rx.Observable.Operator; 
import rx.functions.Action1; 

public final class OperatorSuppressError<T> implements Operator<T, T> { 
    final Action1<Throwable> onError; 

    public OperatorSuppressError(Action1<Throwable> onError) { 
     this.onError = onError; 
    } 

    @Override 
    public Subscriber<? super T> call(final Subscriber<? super T> t1) { 
     return new Subscriber<T>(t1) { 

      @Override 
      public void onNext(T t) { 
       t1.onNext(t); 
      } 

      @Override 
      public void onError(Throwable e) { 
       onError.call(e); 
      } 

      @Override 
      public void onCompleted() { 
       t1.onCompleted(); 
      } 

     }; 
    } 
} 

и использовать его близко к наблюдаемому источника, потому что другие операторы могут охотно отписать до этого.

Observerable.create(connectToUnboundedStream()).lift(new OperatorSuppressError(log()).doOnNext(someStuff()).subscribe(); 

Заметим, однако, что это подавляет доставку ошибки из источника. Если какой-либо из onNext в цепочке после того, как он выдает исключение, это , вероятно, источник будет отписано.

+0

Это работает, но после того, как ошибка подавлена, мой наблюдаемый источник, похоже, перестает работать. Есть ли способ перезапустить его? – Matthias

0

Небольшое изменение решения (@MikeN) для того, чтобы конечные потоки для завершения:

import rx.Observable.Operator; 
import rx.functions.Action1; 

public final class OperatorSuppressError<T> implements Operator<T, T> { 
    final Action1<Throwable> onError; 

    public OperatorSuppressError(Action1<Throwable> onError) { 
     this.onError = onError; 
    } 

    @Override 
    public Subscriber<? super T> call(final Subscriber<? super T> t1) { 
     return new Subscriber<T>(t1) { 

      @Override 
      public void onNext(T t) { 
       t1.onNext(t); 
      } 

      @Override 
      public void onError(Throwable e) { 
       onError.call(e); 
       //this will allow finite streams to complete 
       t1.onCompleted(); 
      } 

      @Override 
      public void onCompleted() { 
       t1.onCompleted(); 
      } 

     }; 
    } 
} 
1

Добавить мое решение этой проблемы:

privider 
    .compose(ignoreErrorsTransformer) 
    .subscribe() 

private final Observable.Transformer<ResultType, ResultType> ignoreErrorsTransformer = 
     new Observable.Transformer<ResultType, ResultType>() { 
      @Override 
      public Observable<ResultType> call(Observable<ResultType> resultTypeObservable) { 
       return resultTypeObservable 
         .materialize() 
         .filter(new Func1<Notification<ResultType>, Boolean>() { 
          @Override 
          public Boolean call(Notification<ResultType> resultTypeNotification) { 
           return !resultTypeNotification.isOnError(); 
          } 
         }) 
         .dematerialize(); 

      } 
     }; 
+0

Что делает материализованный оператор? –

+0

Я не работаю, концы потока –

1

Если вы просто хотите, чтобы игнорировать погрешность внутри flatMapбез возврата элемента сделать это:

flatMap(item -> 
    restService.getSomething(item).onErrorResumeNext(Observable.empty()) 
); 
Смежные вопросы