2013-12-18 4 views
9

Я изучаю реактивное программирование и RxJava. Это весело, но я застрял на проблеме, для которой я не могу найти ответ. Мой основной вопрос: что такое реактивно-подходящий способ прекратить бесконечно бегущий наблюдаемый? Я также приветствую критику и реактивную передовую практику в отношении моего кода.RxJava - Завершение бесконечных потоков

В качестве упражнения я пишу утилиту хвоста журнала. Поток строк в файле журнала представлен Observable<String>. Чтобы получить BufferedReader, чтобы продолжить чтение текста, который добавлен в файл, я игнорирую обычную проверку завершения reader.readLine() == null и вместо этого интерпретирую ее как означающую, что мой поток должен спать и ждать больше текста журнала.

Но пока я могу завершить Обозреватель, используя takeUntil, мне нужно найти чистый способ прекратить работу бесконечно работающего наблюдателя. Я могу написать свой собственный метод terminateWatcher, но это разрушает инкапсуляцию Observable/Observer - и я хотел бы оставаться максимально строгим по отношению к реактивной парадигме.

Вот Observable<String> код:

public class FileWatcher implements OnSubscribeFunc<String> { 
    private Path path = . . .; 

    @Override 
    // The <? super String> generic is pointless but required by the compiler 
    public Subscription onSubscribe(Observer<? super String> observer) { 
     try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) { 
      String newLine = ""; 
      while (!Thread.interrupted()) { // How do I terminate this reactively? 
       if ((newLine = reader.readLine()) != null) 
        observer.onNext(newLine); 
       else 
        try { 
         // Wait for more text 
         Thread.sleep(250); 
        } catch (InterruptedException e) { 
         Thread.currentThread().interrupt(); 
        } 
      } 
      observer.onCompleted(); 
     } catch (Exception e) { 
      observer.onError(e); 
     } 

     return null; // Not sure what Subscription I should return 
    } 
} 

Вот код наблюдателя, который печатает новые линии, как они приходят:

public static void main(String... args) { 
    . . . 
    Observable<String> lines = Observable.create(createWatcher(file)); 
    lines = lines.takeWhile(new Func1<String, Boolean>() { 
     @Override 
     public Boolean call(String line) { 
      // Predicate for which to continue processing 
      return !line.contains("shutdown"); 
     } 
    }).subscribeOn(Schedulers.threadPoolForIO()) 
      .observeOn(Schedulers.currentThread()); 
    // Seems like I should use subscribeOn() and observeOn(), but they 
    // make my tailer terminate without reading any text. 

    Subscription subscription = lines.subscribe(new Action1<String>() { 
     @Override 
     public void call(String line) { 
      System.out.printf("%20s\t%s\n", file, line); 
     } 
    }); 
} 

Мои два вопроса:

  1. Что является реактивно-последовательным способом прекращения бесконечно бегущего потока?
  2. Какие еще ошибки в моем коде заставляют вас плакать? :)

ответ

6

Поскольку вы инициируя файл, наблюдая, когда подписка запрашивается, то имеет смысл прекратить его действие, когда подписка заканчивается, то есть, когда Subscription закрыт. Это обращается к одному из свободных концов в ваших комментариях кода: вы возвращаете Subscription, который сообщает FileWatcher прекратить просмотр файла. Затем замените условие цикла таким образом, чтобы он проверял, была ли отменена подписка, вместо проверки того, был ли прерван текущий поток.

Проблема в том, что это не очень полезно, если ваш метод onSubscribe() никогда не возвращается. Возможно, вашему FileWatcher необходимо указать планировщик, который будет указан, с чтением, происходящим в этом планировщике.

+0

Таким образом, планировщик может определить, как часто следует записывать файл журнала; тогда, когда планировщик запускает FileWatcher, FileWatcher вытаскивает строки до 'readLine() == null'. Затем Observer может отказаться от подписки, как обычно, или использовать 'takeUntil()' для автоматической отмены подписки. Мне нужно будет изучить эти методы, чтобы увидеть, работает ли это, но мне нравится идея. – MonkeyWithDarts

Смежные вопросы