Я смотрю на следующем примере, взятом из GitHub, который четко работает, но я не понимаю, почему:rxJava - Как Observable может испускать из тела метода .create?
Observable<String> fsObs = CreateObservable.listFolder(
Paths.get("src", "com", "alex", "experiment"),
"*.java")
.flatMap(path -> CreateObservable.from(path));
// CreateObservable.listFolder
public static Observable<Path> listFolder(Path dir, String glob) {
return Observable.<Path>create(subscriber -> {
try {
DirectoryStream<Path> stream =
Files.newDirectoryStream(dir, glob);
subscriber.add(Subscriptions.create(() -> {
try {
stream.close();
} catch (IOException e) {
e.printStackTrace();
}
}));
Observable.<Path>from(stream).subscribe(subscriber);
} catch (DirectoryIteratorException ex) {
subscriber.onError(ex);
} catch (IOException ioe) {
subscriber.onError(ioe);
}
});
}
Обратите внимание, что Observable.<Path>from(stream).subscribe(subscriber)
создается. Как эта линия выдает сообщения в flatMap(path -> CreateObservable.from(path)) ?
(pt1) Эта строка 'return Observable. create' дает нам Наблюдаемый , который будет передан как второй параметр .flatmap http://reactivex.io/RxJava/javadoc/rx/Observable.html#flatMap%28rx.functions.Func1%29 Однако внутри lambda в методе 'create (-> {...})', мы также создаем еще один Observable из 'DirectoryStream' (который реализует интерфейс' Iterable'). Как выбросы изнутри Наблюдаемые в лямбда, выходят наружу в '.flatmap'? Все выбросы изнутри наблюдаемого в конечном итоге отправляются абоненту. –
newprint
(pt2) Возможно, у меня есть пробел в знаниях Java, но для меня внутри/снаружи Observable - полностью отключенные объекты. Было бы разумно, если бы оно было написано как «return Observable from (stream) .subscribe (подписчик);', однако '.subscribe (абонент)' return 'Subscription' –
newprint
@newprint обновил мой ответ. –