Я изучаю реактивное программирование и RxJava. Это весело, но я застрял на проблеме, для которой я не могу найти ответ. Мой основной вопрос: что такое реактивно-подходящий способ прекратить бесконечно бегущий наблюдаемый? Я также приветствую критику и реактивную передовую практику в отношении моего кода.RxJava - Завершение бесконечных потоков
В качестве упражнения я пишу утилиту хвоста журнала. Поток строк в файле журнала представлен Observable<String>
. Чтобы получить BufferedReader
, чтобы продолжить чтение текста, который добавлен в файл, я игнорирую обычную проверку завершения reader.readLine() == null
и вместо этого интерпретирую ее как означающую, что мой поток должен спать и ждать больше текста журнала.
Но пока я могу завершить Обозреватель, используя takeUntil
, мне нужно найти чистый способ прекратить работу бесконечно работающего наблюдателя. Я могу написать свой собственный метод terminateWatcher
, но это разрушает инкапсуляцию Observable/Observer - и я хотел бы оставаться максимально строгим по отношению к реактивной парадигме.
Вот Observable<String>
код:
public class FileWatcher implements OnSubscribeFunc<String> {
private Path path = . . .;
@Override
// The <? super String> generic is pointless but required by the compiler
public Subscription onSubscribe(Observer<? super String> observer) {
try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
String newLine = "";
while (!Thread.interrupted()) { // How do I terminate this reactively?
if ((newLine = reader.readLine()) != null)
observer.onNext(newLine);
else
try {
// Wait for more text
Thread.sleep(250);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}
return null; // Not sure what Subscription I should return
}
}
Вот код наблюдателя, который печатает новые линии, как они приходят:
public static void main(String... args) {
. . .
Observable<String> lines = Observable.create(createWatcher(file));
lines = lines.takeWhile(new Func1<String, Boolean>() {
@Override
public Boolean call(String line) {
// Predicate for which to continue processing
return !line.contains("shutdown");
}
}).subscribeOn(Schedulers.threadPoolForIO())
.observeOn(Schedulers.currentThread());
// Seems like I should use subscribeOn() and observeOn(), but they
// make my tailer terminate without reading any text.
Subscription subscription = lines.subscribe(new Action1<String>() {
@Override
public void call(String line) {
System.out.printf("%20s\t%s\n", file, line);
}
});
}
Мои два вопроса:
- Что является реактивно-последовательным способом прекращения бесконечно бегущего потока?
- Какие еще ошибки в моем коде заставляют вас плакать? :)
Таким образом, планировщик может определить, как часто следует записывать файл журнала; тогда, когда планировщик запускает FileWatcher, FileWatcher вытаскивает строки до 'readLine() == null'. Затем Observer может отказаться от подписки, как обычно, или использовать 'takeUntil()' для автоматической отмены подписки. Мне нужно будет изучить эти методы, чтобы увидеть, работает ли это, но мне нравится идея. – MonkeyWithDarts