2016-07-08 1 views
1

В этом примере метод возвращает Observable<Integer> чисел от 1 до 9:Observable.create (SyncOnSubscribe.createStateless (...)) бросает IllegalStateException: «onNext вызывается несколько раз!»

public class Provider { 
    public static Observable<Integer> test() { 
     return Observable.create(SyncOnSubscribe.createStateless(new Action1<Observer<? super Integer>>() { 
      @Override 
      public void call(Observer<? super Integer> observer) { 
       for (int i = 1; i < 9; i++) 
        // when i == 2, throws IllegalStateException: 
        // "onNext called multiple times!": 
        observer.onNext(i); 
      } 
     })); 
    } 
} 

Это один фильтр только 1 до 9 числа, кратные 3:

public class RxJavaUnitTest { 
    @Test 
    public void rxJavaTest(){ 
     List<Integer> multiplesOf3 = Provider.test().filter(new Func1<Integer, Boolean>() { 
      @Override 
      public Boolean call(Integer i) { 
       return i % 3 == 0; 
      } 
     }).toList().toBlocking().single(); 
    } 
} 

Но он бросает IllegalStateException: “onNext called multiple times!”. Как я могу предоставить больше значений для observer, если я не могу позвонить onNext более одного раза?

Возможно, SyncOnSubscribe.createStateless является неправильным методом здесь и заменен на что-то еще?

ответ

4

Почему не используется Observable.from, Observable.fromCallable или Observable.defer или подобные вариации вместо Observable.create?


Однако для ответа:

Как указано в SyncOnSubscribe.createStateless Javadoc:

... Эта перегрузка создает "состояние-менее" SyncOnSubscribe, которая не имеет явного государственного значения , Это следует использовать, когда следующая функция закрывается над состоянием.

Это Javadoc не совсем понятно, по этому вопросу, больше можно найти в SyncOnSubscribe.next Javadoc:

... Для того, чтобы испустить данные на выходе абонентского вызова observer.onNext(t). Чтобы подать сигнал об ошибке, вызовите observer.onError(throwable) или введите Exception. Чтобы сигнализировать о завершении вызова потока данных observer.onCompleted(). Реализации этого метода должны соответствовать следующим правилам.

  • Нельзя звонить observer.onNext(t) более одного раза за вызов.
  • Нельзя одновременно звонить observer.onNext(t).

Значение, возвращаемое вызовом этого метода, будет передано как аргумент состояния следующего вызова этого метода.

Это означает, что в действии обратного вызова

  • onNext должно быть немедленно следует либо onCompleted или onError
  • и должна быть только один onNext вызова за обратный вызов вызова

Это необходимо для обеспечения безопасности в параллельной среде (это почему это называется SyncOnSubscribe) и для противодавления.

можно учитывать для цикла внутри обратного вызова:

return Observable.create(SyncOnSubscribe.createStateless(
     new Action1<Observer<? super Integer>>() { 
      int counter = 0; 

      @Override 
      public void call(Observer<? super Integer> observer) { 
       if (counter < 9) { 
        observer.onNext(counter++); 
       } else { 
        observer.onCompleted(); 
       } 
      } 

     })); 

Обратите внимание на onCompleted вызов, в противном случае, ваша монада будет работать вечно.

Приведенный ниже список {0, 3, 6}. Но код является уродливым и нарушает договор SyncOnSubscribe.createStateless. SyncOnSubscribe.createStateless был бы полезен для создания без гражданства, как случайный. Вместо этого следует использовать SyncOnSubscribe.createStateful:

return Observable.create(SyncOnSubscribe.createStateful(
     () -> 0, 
     (counter, observer) -> { 
      if (counter < 9) { 
       observer.onNext(counter); 
      } else { 
       observer.onCompleted(); 
      } 
      return counter + 1; 
     } 
)); 

Однако цикл еще должен быть разложен, и это подоконник необходимо вызвать onCompleted.

+0

Хорошо, получилось. Однако я упростил свой вопрос ... Мне нужно решение для вложенных циклов 'for-each', например:' for (NetworkInterface intf: interfaces) Список addrs = Collections.list (intf.getInetAddresses()); for (InetAddress addr: addrs) {... observer.onNext (addr); ...} 'То, что я в основном пытаюсь сделать здесь, - это преобразовать метод getIPAddress из [этого ответа] (http://stackoverflow.com/questions/6064510/how-to-get-ip-address- of-the-device/13007325 # 13007325) на 'RxJava': мне нужна часть инициализатора. Итак, как я могу справиться с этим делом? – Tar

+0

@Tar Почему бы не использовать ['flatMapIterable'] (http://reactivex.io/RxJava/javadoc/rx/Observable.html#flatMapIterable (rx.functions.Func1))? Что-то вроде (псевдокод) 'Observable.from (интерфейсы) .flatMapIterable (intf :: getInetAddresses())' – Brice

Смежные вопросы