2015-11-05 3 views
2

Я изучаю RxScala и пришел к этому очень синтетическому фрагменту. Я пытаюсь обрабатывать исключения в блоке OnError:Наблюдаемая обработка исключений

def doLongOperation():String = { 
    (1 to 10).foreach { 
    _ => 
     Thread.sleep(100) 
     print(".") 
    } 
    println() 
    if (System.currentTimeMillis() % 2 == 0) { 
    throw new RuntimeException("Something went wrong during long operation") 
    } 
    s"OK" 
} 

def main(args: Array[String]) { 
    println("Changing status: -> doing task1") 
    Observable.just(
    doLongOperation() 
).subscribe(
    str => println("Changing status: doing task1 -> doing task2"), 
    throwable => println(s"Failed: ${throwable.getMessage}"),  //never get here 
    () => println("Completed part") 
) 
} 

В случае исключения я ожидал что-то вроде:

Failed: Something went wrong during long operation 

Но то, что я получаю:

.........Exception in thread "main" java.lang.RuntimeException: Something went wrong during long operation 
at stats.STest$.doLongOperation(STest.scala:20) 
at stats.STest$.main(STest.scala:49) 
at stats.STest.main(STest.scala) 

Что же я отсутствует? Должен ли я «вручную» вызывать onrror на наблюдателе? Цените любую помощь.

ответ

1

Проблема заключается в неправильной интерпретации только что(). Он принимает существующее значение за время сборки последовательности, а не метод, который выполняется, когда подписчик подписывается. Другими словами, ваш код делает это:

var tempValue = doLongOperation(); 

Observable.just(tempValue).subscribe(...) 

и выбрасывает путь до того, как Наблюдение даже создано.

(К сожалению, я не знаю, Scala или RxScala достаточно, чтобы извинить мои примеры Java 8.)

Я не знаю, как далеко позади RxScala RxJava, но RxJava 1.0.15 имеет новый фабричный метод , fromCallable, что позволяет перенести одно значение:

Observable.fromCallable(() -> doLongOperation()).subscribe(...) 

альтернативой является обернуть оригинал в defer так, когда doLongOperation броски, он получает направляется к абоненту:

Observable.defer(() -> Observable.just(doLongOperation())).subscribe(...) 
+0

Спасибо большое! Теперь мне стало намного понятнее – Nyavro

1

Observable.just не обрабатывает случай исключения хорошо, не совсем уверен, является ли это ошибкой или ожидаемым поведением. Вы можете попробовать этот путь, хотя:

Observable.create[String](o => { 
    o.onNext(doLongOperation()) 
    o.onCompleted() 
    Subscription{} 
}).subscribe(
    str => println("Changing status: doing task1 -> doing task2"), 
    throwable => println(s"Failed: ${throwable.getMessage}"), here 
() => println("Completed part") 
) 
+0

Большое спасибо! Решение работает отлично! – Nyavro

Смежные вопросы