Использование Rx Java, это требование может быть решено несколькими способами при использовании DirectoryStream от JDK.
следующие комбинации даст вам желаемый эффект, я бы объяснить их в следующей последовательности:
подход 1. Рекурсивный подход с использованием операторов flatMap() и defer()
Подход 2. Рекурсивный подход с использованием flatMap() и fromCallable операторы
Примечание: Если заменить использование flatMap() с concatMap(), дерево каталогов навигации обязательно произойдет в глубинной первой категории (DFS). С функцией flatMap() эффект DFS не гарантируется.
Подход 1: Использование flatMap() и отложить()
private Observable<Path> recursiveFileSystemNavigation_Using_Defer(Path dir) {
return Observable.<Path>defer(() -> {
//
// try-resource block
//
try(DirectoryStream<Path> children = Files.newDirectoryStream(dir))
{
//This intermediate storage is required because DirectoryStream can't be navigated more than once.
List<Path> subfolders = Observable.<Path>fromIterable(children)
.toList()
.blockingGet();
return Observable.<Path>fromIterable(subfolders)
/* Line X */ .flatMap(p -> !isFolder(p) ? Observable.<Path> just(p) : recursiveFileSystemNavigation_Using_Defer(p), Runtime.getRuntime().availableProcessors());
// /* Line Y */ .concatMap(p -> !isFolder(p) ? Observable.<Path> just(p) : recursiveFileSystemNavigation_Using_Defer(p));
} catch (IOException e) {
/*
This catch block is required even though DirectoryStream is Closeable
resource. Reason is that .close() call on a DirectoryStream throws a
checked exception.
*/
return Observable.<Path>empty();
}
});
}
Этот подход является нахождение детей данного каталога, а затем испускать детей наблюдаемыми. Если дочерний файл является файлом, он будет немедленно доступен для абонента else flatMap() в Строка X вызовет метод, рекурсивно передающий каждый подкаталог в качестве аргумента. Для каждого такого subdir, flatmap будет внутренне подписываться на своих детей в одно и то же время. Это похоже на цепную реакцию, которую нужно контролировать.
Поэтому использование Runtime.getRuntime(). AvailableProcessors() устанавливает уровень максимального параллелизма для flatmap() и защищает его от подписки на все вложенные папки в то же самое время. Не устанавливая уровень параллелизма, представьте, что произойдет, когда в папке будет 1000 детей.
Использование defer() предотвращает преждевременное создание DirectoryStream и гарантирует, что это произойдет только тогда, когда будет создана настоящая подписка для поиска ее подпапок.
Наконец метод возвращает Наблюдаемые < Path> так, что клиент может подписаться и сделать что-то полезное с результатами, как показано ниже:
//
// Using the defer() based approach
//
recursiveDirNavigation.recursiveFileSystemNavigation_Using_Defer(startingDir)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
.subscribe(p -> System.out.println(p.toUri()));
Неудобство использования отложить() является то, что ему не справляется с проверенными исключениями, если функция аргумента бросает проверенное исключение. Поэтому, хотя DirectoryStream (который реализует Closeable) был создан в блоке ресурсов try, нам все равно пришлось поймать IOException, потому что автоматическое закрытие DirectoryStream вызывает это исключенное исключение.
При использовании стиля на основе Rx использование блоков catch() для обработки ошибок звучит немного странно, потому что даже ошибки отправляются как события в реактивном программировании. Итак, почему бы нам не использовать оператор, который подвергает такие ошибки, как события.
Лучшей альтернативой назван fromCallable() был добавлен в Rx Java 2.x. Второй подход показывает его использование.
Подход 2. Использование flatMap() и fromCallable операторы
Этот подход использует fromCallable() оператор, который принимает Callable в качестве аргумента. Поскольку нам нужен рекурсивный подход, ожидаемым результатом этого вызываемого является наблюдение за дочерними элементами данной папки. Поскольку мы хотим, чтобы подписчик получал результаты, когда они доступны, нам необходимо вернуть Observable из этого метода. Поскольку результатом внутреннего вызываемого является список наблюдаемых дочерних элементов, то чистый эффект является наблюдаемым наблюдением.
private Observable<Observable<Path>> recursiveFileSystemNavigation_WithoutExplicitCatchBlock_UsingFromCallable(Path dir) {
/*
* fromCallable() takes a Callable argument. In this case the callbale's return value itself is
* a list of sub-paths therefore the overall return value of this method is Observable<Observable<Path>>
*
* While subscribing the final results, we'd flatten this return value.
*
* Benefit of using fromCallable() is that it elegantly catches the checked exceptions thrown
* during the callable's call and exposes that via onError() operator chain if you need.
*
* Defer() operator does not give that flexibility and you have to explicitly catch and handle appropriately.
*/
return Observable.<Observable<Path>> fromCallable(() -> traverse(dir))
.onErrorReturnItem(Observable.<Path>empty());
}
private Observable<Path> traverse(Path dir) throws IOException {
//
// try-resource block
//
try(DirectoryStream<Path> children = Files.newDirectoryStream(dir))
{
//This intermediate storage is required because DirectoryStream can't be navigated more than once.
List<Path> subfolders = Observable.<Path>fromIterable(children)
.toList()
.blockingGet();
return Observable.<Path>fromIterable(subfolders)
/* Line X */ .flatMap(p -> (!isFolder(p) ? Observable.<Path> just(p) : recursiveFileSystemNavigation_WithoutExplicitCatchBlock_UsingFromCallable(p).blockingSingle())
,Runtime.getRuntime().availableProcessors());
// /* Line Y */ .concatMap(p -> (!isFolder(p) ? Observable.<Path> just(p) : recursiveFileSystemNavigation_WithoutExplicitCatchBlock_UsingFromCallable(p).blockingSingle()));
}
}
Абонент будет затем необходимо выравнивать поток результатов, как показано ниже:
//
// Using the fromCallable() based approach
//
recursiveDirNavigation.recursiveFileSystemNavigation_WithoutExplicitCatchBlock_UsingFromCallable(startingDir)
.subscribeOn(Schedulers.io())
.flatMap(p -> p)
.observeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
.subscribe(filePath -> System.out.println(filePath.toUri()));
В траверсе() метод, почему линия Х с помощью блокировки Получить
потому что рекурсивной функция возвращает Наблюдаемый < Observable>, но планшет на этой линии требует подписки на Observable.
линии Y в обоих подходах используется concatMap()
Поскольку concatMap() можно удобно использовать, если мы не хотим, параллелизм при innner подписок сделанных flatmap().
В обоих подходах, реализация метода isFolder выглядит, как показано ниже:
private boolean isFolder(Path p){
if(p.toFile().isFile()){
return false;
}
return true;
}
Maven координаты для Java RX 2,0
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.0.3</version>
</dependency>
Импорт в Java файл
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.Executors;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
, что вы имеете в виду «Нет каталогов не должны быть перечислены» – gaurav5430
возможно дубликат [Список всех файлов из каталога (http://stackoverflow.com/questions/2534632/list-all-files-from-a-directory-recursive-with-java) –
@BrianRoach Как это дубликат? Я попросил решить проблему с nio.file.DirectoryStream. –