2015-03-16 2 views
9

У меня возникла ситуация, когда я создаю Observable, содержащий результаты из базы данных. Затем я применяю к ним ряд фильтров. Затем у меня есть подписчик, который регистрирует результаты. Это может быть случай, когда никакие элементы не пробиваются сквозь фильтры. Моя бизнес-логика утверждает, что это не ошибка. Однако, когда это происходит, мой onError вызывается и содержит следующее исключение: java.util.NoSuchElementException: Sequence contains no elementsПравильное обращение с пустым Наблюдаемое в RxJava

Является ли принятая практика просто определять этот тип исключения и игнорировать его? Или есть лучший способ справиться с этим?

Версия 1.0.0.

Вот простой тестовый пример, который раскрывает то, что я вижу. Похоже, что это связано с тем, что все события были отфильтрованы до достижения карты и уменьшения.

@Test 
public void test() 
{ 

    Integer values[] = new Integer[]{1, 2, 3, 4, 5}; 

    Observable.from(values).filter(new Func1<Integer, Boolean>() 
    { 
     @Override 
     public Boolean call(Integer integer) 
     { 
      if (integer < 0) 
       return true; 
      else 
       return false; 
     } 
    }).map(new Func1<Integer, String>() 
    { 
     @Override 
     public String call(Integer integer) 
     { 
      return String.valueOf(integer); 
     } 
    }).reduce(new Func2<String, String, String>() 
    { 
     @Override 
     public String call(String s, String s2) 
     { 
      return s + "," + s2; 
     } 
    }) 

      .subscribe(new Action1<String>() 
      { 
       @Override 
       public void call(String s) 
       { 
        System.out.println(s); 
       } 
      }); 
} 

Потому что я использую безопасный подписчик, он сначала бросает OnErrorNotImplementedException который оборачивает следующее исключение:

java.util.NoSuchElementException: Sequence contains no elements 
    at rx.internal.operators.OperatorSingle$1.onCompleted(OperatorSingle.java:82) 
    at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140) 
    at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73) 
    at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45) 
    at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59) 
    at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121) 
    at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43) 
    at rx.internal.operators.OperatorFilter$1.onCompleted(OperatorFilter.java:42) 
    at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79) 
    at rx.internal.operators.OperatorScan$2$1.request(OperatorScan.java:147) 
    at rx.Subscriber.setProducer(Subscriber.java:139) 
    at rx.internal.operators.OperatorScan$2.setProducer(OperatorScan.java:139) 
    at rx.Subscriber.setProducer(Subscriber.java:133) 
    at rx.Subscriber.setProducer(Subscriber.java:133) 
    at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47) 
    at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33) 
    at rx.Observable$1.call(Observable.java:144) 
    at rx.Observable$1.call(Observable.java:136) 
    at rx.Observable$1.call(Observable.java:144) 
    at rx.Observable$1.call(Observable.java:136) 
    at rx.Observable$1.call(Observable.java:144) 
    at rx.Observable$1.call(Observable.java:136) 
    at rx.Observable$1.call(Observable.java:144) 
    at rx.Observable$1.call(Observable.java:136) 
    at rx.Observable$1.call(Observable.java:144) 
    at rx.Observable$1.call(Observable.java:136) 
    at rx.Observable.subscribe(Observable.java:7284) 

На основании ответа от @davem ниже, я создал новый тестовый пример:

@Test 
public void testFromBlockingAndSingle() 
{ 

    Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5}; 

    List<String> results = Observable.from(values).filter(new Func1<Integer, Boolean>() 
    { 
     @Override 
     public Boolean call(Integer integer) 
     { 
      if (integer < 0) 
       return true; 
      else 
       return false; 
     } 
    }).map(new Func1<Integer, String>() 
    { 
     @Override 
     public String call(Integer integer) 
     { 
      return String.valueOf(integer); 
     } 
    }).reduce(new Func2<String, String, String>() 
    { 
     @Override 
     public String call(String s, String s2) 
     { 
      return s + "," + s2; 
     } 
    }).toList().toBlocking().single(); 

    System.out.println("Test: " + results + " Size: " + results.size()); 

} 

И это результаты тестирования в следующем поведении:

Когда вход:

Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5}; 

Затем результаты (как ожидалось) являются:

Test: [-2,-1] Size: 1 

И когда вход:

Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5}; 

Тогда результат заключается в следующем трассировки стека :

java.util.NoSuchElementException: Sequence contains no elements 
at rx.internal.operators.OperatorSingle$1.onCompleted(OperatorSingle.java:82) 
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:140) 
at rx.internal.operators.TakeLastQueueProducer.emit(TakeLastQueueProducer.java:73) 
at rx.internal.operators.TakeLastQueueProducer.startEmitting(TakeLastQueueProducer.java:45) 
at rx.internal.operators.OperatorTakeLast$1.onCompleted(OperatorTakeLast.java:59) 
at rx.internal.operators.OperatorScan$2.onCompleted(OperatorScan.java:121) 
at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43) 
at rx.internal.operators.OperatorFilter$1.onCompleted(OperatorFilter.java:42) 
at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:79) 
at rx.internal.operators.OperatorScan$2$1.request(OperatorScan.java:147) 
at rx.Subscriber.setProducer(Subscriber.java:139) 
at rx.internal.operators.OperatorScan$2.setProducer(OperatorScan.java:139) 
at rx.Subscriber.setProducer(Subscriber.java:133) 
at rx.Subscriber.setProducer(Subscriber.java:133) 
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:47) 
at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:33) 
at rx.Observable$1.call(Observable.java:144) 
at rx.Observable$1.call(Observable.java:136) 
at rx.Observable$1.call(Observable.java:144) 
at rx.Observable$1.call(Observable.java:136) 
at rx.Observable$1.call(Observable.java:144) 
at rx.Observable$1.call(Observable.java:136) 
at rx.Observable$1.call(Observable.java:144) 
at rx.Observable$1.call(Observable.java:136) 
at rx.Observable$1.call(Observable.java:144) 
at rx.Observable$1.call(Observable.java:136) 
at rx.Observable$1.call(Observable.java:144) 
at rx.Observable$1.call(Observable.java:136) 
at rx.Observable$1.call(Observable.java:144) 
at rx.Observable$1.call(Observable.java:136) 
at rx.Observable.subscribe(Observable.java:7284) 
at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:441) 
at rx.observables.BlockingObservable.single(BlockingObservable.java:340) 
at EmptyTest2.test(EmptyTest2.java:19) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) 
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) 
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) 
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) 
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) 
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) 
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) 
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) 
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) 
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) 
at org.junit.runners.ParentRunner.run(ParentRunner.java:309) 
at org.junit.runner.JUnitCore.run(JUnitCore.java:160) 
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74) 
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211) 
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) 

Так оно и есть что проблема, безусловно, связана с использованием функции уменьшения. Смотрите следующий тест, который обрабатывает обе ситуации:

@Test 
public void testNoReduce() 
{ 

    Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5}; 

    List<String> results = Observable.from(values).filter(new Func1<Integer, Boolean>() 
    { 
     @Override 
     public Boolean call(Integer integer) 
     { 
      if (integer < 0) 
       return true; 
      else 
       return false; 
     } 
    }).map(new Func1<Integer, String>() 
    { 
     @Override 
     public String call(Integer integer) 
     { 
      return String.valueOf(integer); 
     } 
    }).toList().toBlocking().first(); 

    Iterator<String> itr = results.iterator(); 
    StringBuilder b = new StringBuilder(); 

    while (itr.hasNext()) 
    { 
     b.append(itr.next()); 

     if (itr.hasNext()) 
      b.append(","); 
    } 

    System.out.println("Test NoReduce: " + b); 

} 

со следующими входными:

Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5}; 

я получаю следующие результаты, которые ожидаются:

Test NoReduce: -2,-1 

И со следующим входом :

Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5}; 

я получаю следующий результат, который, как ожидается:

Test NoReduce: 

Итак, если я не полностью недоразумение что-то, единственный способ действительно обрабатывать нулевой элемент Observable, что результаты из фильтра при последующей карте и уменьшить это реализовать логику уменьшения за пределами цепи Observable. Вы все согласны с этим заявлением?


Окончательное решение

Вот мое окончательное решение после реализации того, что предложил и Томаш Дворжак и Дэвид Motten. Я думаю, что это решение разумно.

@Test 
public void testWithToList() 
{ 

    Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5}; 

    Observable.from(values).filter(new Func1<Integer, Boolean>() 
    { 
     @Override 
     public Boolean call(Integer integer) 
     { 
      if (integer < 0) 
       return true; 
      else 
       return false; 
     } 
    }).toList().map(new Func1<List<Integer>, String>() 
    { 
     @Override 
     public String call(List<Integer> integers) 
     { 
      Iterator<Integer> intItr = integers.iterator(); 
      StringBuilder b = new StringBuilder(); 

      while (intItr.hasNext()) 
      { 
       b.append(intItr.next()); 

       if (intItr.hasNext()) 
       { 
        b.append(","); 
       } 
      } 

      return b.toString(); 
     } 
    }).subscribe(new Action1<String>() 
    { 
     @Override 
     public void call(String s) 
     { 
      System.out.println("With a toList: " + s); 
     } 
    }); 

} 

Вот как ведет себя этот тест при использовании следующих входов.

Когда данный поток, который будет иметь некоторые значения проходят через фильтры:

Integer values[] = new Integer[]{-2, -1, 0, 1, 2, 3, 4, 5}; 

Результат:

With a toList: -2,-1 

Когда данный поток, который не будет иметь каких-либо значений проходят через фильтры :

Integer values[] = new Integer[]{0, 1, 2, 3, 4, 5}; 

результат:

With a toList: <empty string> 
+0

Пожалуйста, пост код, поэтому мы не должны выводить его из трассировки стека. –

+0

Вы правы, я не смог добавить тестовый пример. Я добавил простой. – babernathy

ответ

13

Теперь, после вашего обновления, ошибка очевидна. Reduce в RxJava потерпит неудачу с IllegalArgumentException, если наблюдаемое уменьшение уменьшается, как указано в спецификации (http://reactivex.io/documentation/operators/reduce.html).

В функциональном программировании обычно есть два общих оператора, которые объединяют коллекцию в одно значение, fold и reduce. В принятой терминологии fold принимает начальное значение аккумулятора и функцию, которая берет текущий накопитель и значение из коллекции и производит другое значение аккумулятора. Пример псевдокода:

[1, 2, 3, 4].fold(0, (accumulator, value) => accumulator + value)

будет начинаться с 0, и в конечном счете добавить 1, 2, 3, 4 к работающему аккумулятору, наконец, получая 10, сумму значений.

Уменьшение очень похоже, только он не принимает начальное значение аккумулятора явно, он использует первое значение в качестве начального аккумулятора и затем накапливает все остальные значения. Это имеет смысл, если вы, например, ища минимальное или максимальное значение.

[1, 2, 3, 4].reduce((accumulator, value) => min(accumulator, value))

Глядя на складку и уменьшить другой путь, вы, вероятно, использовать fold всякий раз, когда агрегированный значение будет иметь смысл даже на пустой коллекции (как, в sum, 0 имеет смысл), и reduce иначе (minimum марки нет смысла в пустой коллекции, а reduce не сможет работать с такой коллекцией, в вашем случае, выбросив исключение).

Вы выполняете аналогичную агрегацию, вставляя коллекцию строк с запятой для создания одной строки. Это немного сложнее.Это, вероятно, имеет смысл в пустой коллекции (вы, вероятно, ожидаете пустую строку), но, с другой стороны, если вы начнете с пустого аккумулятора, у вас будет еще одна запятая в результате, чем вы ожидаете. Правильное решение для этого - проверить, сначала ли пустая коллекция, и либо вернуть строку возврата для пустой коллекции, либо сделать reduce в непустой коллекции. Вы, вероятно, заметите, что часто вам не нужна пустая строка в пустом случае, но что-то вроде «collection is empty» может быть более уместным, тем самым гарантируя, что это решение является чистым.

Btw, я использую слово коллекции здесь вместо наблюдаемой свободно, только для образовательных целей. Кроме того, в RxJava оба fold и reduce называются одинаковыми, reduce, только две версии этого метода: один принимает только один параметр, другие два параметра.

Что касается вашего последнего вопроса: вам не нужно оставлять цепь Observable. Просто используйте toList(), как предлагает Дэвид Мотт.

.filter(...) 
.toList() 
.map(listOfValues => listOfValues.intersperse(", ")) 

где intersperse могут быть реализованы в терминах reduce, если уже не библиотечную функцию (это довольно часто).

collection.intersperse(separator) = 
    if (collection.isEmpty()) 
     "" 
    else 
     collection.reduce(accumulator, element => accumulator + separator + element) 
+0

Благодарим вас за подробное объяснение. Это очень помогло. Я обновил вопрос своим окончательным решением. – babernathy

9

Причина, по которой это происходит, заключается в том, что вы используете toBlocking().single() на пустой поток. Если вы ожидаете 0 или 1 значения из потока, вы можете сделать toList().toBlocking().single(), затем проверьте значения в списке (который может быть пустым, но не будет вызывать исключение, которое вы получаете).

+0

Хорошая работа, выводя это из стека и абсолютно никакого кода :) +1 для вас, -1 для вопроса. Только, не хотите ли вы проверить результат 'toBlocking.toList' вместо' toList.toBlocking.single'? –

+0

Спасибо. 'toBlocking.toList' не является частью API 1.0.8, хотя может использовать' toBlocking.toIterable'. 'toBlocking.singleOrDefault' - это еще одна возможность, но без подробностей версия' toList' покрывает ее. –

+0

Вижу, я не читал документы и думал, что оригинальная 'toList' была ошибкой, я даже не подозревал о ее существовании. Я бы, вероятно, использовал '.take (1) .toBlocking.singleOrDefault', если бы после первого элемента был сохранен ресурс, но, как вы говорите, кто знает, что подходит для этого конкретного случая. –

4

Вы можете использовать сократить с начальным значением версии .reduce(0, (x,y) -> x+y) она должна работать как раз оператор объяснил Томаш Дворжак

Смежные вопросы