2016-09-11 3 views
2

Я читаю корневой каталог в FileReadingMessageSource из Spring Integration для извлечения текущих файлов. Сценарий заключается в том, что в корневом каталоге могут существовать несколько подкаталогов. WatchServiceDirectoryScanner из SI 4.3.1 используется для сбора любых файлов, созданных в любом новом подкаталоге.WatchServiceDirectoryScanner не вытягивает новые файлы, созданные после первого опроса

@Bean 
public MessageSource<File> fileReadingMessageSource() { 

    CompositeFileListFilter<File> filters = new CompositeFileListFilter<>(); 
    filters.addFilter(new SimplePatternFileListFilter("pattern*")); 
    //filters.addFilter(new LastModifiedFileListFilter()); 

    FileReadingMessageSource fileSource = new FileReadingMessageSource(); 

    String filePath = "root-directory"; 

    fileSource.setDirectory(new File(filePath)); 
    fileSource.setFilter(filters); 
    fileSource.setUseWatchService(true); 
    fileSource.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE,FileReadingMessageSource.WatchEventType.MODIFY,FileReadingMessageSource.WatchEventType.DELETE); 

    return fileSource; 
} 

@Bean 
public IntegrationFlow readDirectoryFlow() { 

    return IntegrationFlows.from(
      fileReadingMessageSource(), 
      e -> e.poller(Pollers.cron("*/5 * * * * *"))) 
      .channel(fileInputChannel()) 
      .handle(tailerRestart) 
      .handle(System.out::println) 
      .get(); 
} 

На первом опросе, все файлы, соответствующие шаблону доступны через Message Resource, но если какие-либо новые файлы, созданные позже в любом новом подкаталоге затем Message Resource не в состоянии выбрать новые базы соответствующих файлов.

я вижу следующее DEBUG сообщение в журнале

DEBUG SourcePollingChannelAdapter - не получил сообщение в ходе опроса, возвращая «ложь»

Что может быть не хватает?

ответ

1

Я только написал несколько тест-случай очень близко к коду:

@Bean 
    public MessageSource<File> fileReadingMessageSource() { 
     CompositeFileListFilter<File> filters = new CompositeFileListFilter<>(); 
     filters.addFilter(new SimplePatternFileListFilter("*.watch")); 

     FileReadingMessageSource fileSource = new FileReadingMessageSource(); 
     fileSource.setDirectory(tmpDir.getRoot()); 
     fileSource.setFilter(filters); 
     fileSource.setUseWatchService(true); 
     fileSource.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE, 
       FileReadingMessageSource.WatchEventType.MODIFY, 
       FileReadingMessageSource.WatchEventType.DELETE); 
     return fileSource; 
    } 

    @Bean 
    public IntegrationFlow readDirectoryFlow() { 
     return IntegrationFlows 
       .from(fileReadingMessageSource(), 
         e -> e.poller(p -> p.cron("*/1 * * * * *"))) 
       .handle(System.out::println) 
       .get(); 
    } 

тест-код выглядит как:

@ClassRule 
public static final TemporaryFolder tmpDir = new TemporaryFolder(); 

@Test 
public void testWatchServiceMessageSource() throws Exception { 
    File newFolder1 = tmpDir.newFolder(); 
    FileOutputStream file = new FileOutputStream(new File(newFolder1, "foo.watch")); 
    file.write(("foo").getBytes()); 
    file.flush(); 
    file.close(); 

    File newFolder2 = tmpDir.newFolder(); 
    file = new FileOutputStream(new File(newFolder2, "bar.watch")); 
    file.write(("bar").getBytes()); 
    file.flush(); 
    file.close(); 

    file = new FileOutputStream(new File(tmpDir.getRoot(), "root.watch")); 
    file.write(("root").getBytes()); 
    file.flush(); 
    file.close(); 

    Thread.sleep(10000); 
} 

И у меня есть этот журнал:

GenericMessage [payload=C:\Users\abilan\AppData\Local\Temp\junit7602962373770028652\junit7776799219532481336\foo.watch, headers={id=50d44197-e0af-708a-6b61-2a2cfeec68da, timestamp=1473686655061}] 
GenericMessage [payload=C:\Users\abilan\AppData\Local\Temp\junit7602962373770028652\junit813088196038861528\bar.watch, headers={id=8d80c853-19b6-f667-7950-d6de49d509ab, timestamp=1473686656062}] 
GenericMessage [payload=C:\Users\abilan\AppData\Local\Temp\junit7602962373770028652\root.watch, headers={id=e585203b-41dc-cadb-6a36-4c9009a34701, timestamp=1473686657063}] 

каждую секунду.

Не знаете, где ваша проблема ...

Вам не нужно .channel(fileInputChannel()). Он будет создан автоматически между ednpoints.

с конфиге:

.handle(tailerRestart) 
.handle(System.out::println) 

вы должны быть уверены, что tailerRestart возвращается что-то. Хотя, в соответствии с нашей другой дискуссии, это не делает:

@ServiceActivator 
public void restartTailer(File input) throws Exception { 
    tailFileProducer.stop(); 
    tailFileProducer.setFile(input); 
    tailFileProducer.start(); 
} 

UPDATE

После некоторого частного расследования мы выяснили, что проблема с FileReadingMessageSource.start() называют несколько раз по инфраструктуре Spring Облако поток , вызывая повторное создание внутреннего объекта WatchService.

FileReadingMessageSource.start() должен быть закреплен идемпотентным: https://jira.spring.io/browse/INT-4108

The Spring Облако поток был зафиксирован в версии 1.1: https://github.com/spring-cloud/spring-cloud-stream/issues/525.

Для устранения этой проблемы, как гарантировать, что FileReadingMessageSource.start() вызывается только один раз:

FileReadingMessageSource fileSource = new FileReadingMessageSource() { 

    private final AtomicBoolean running = new AtomicBoolean(); 

    @Override 
    public void start() { 
     if (!this.running.getAndSet(true)) { 
     super.start(); 
     } 
    } 

    @Override 
    public void stop() { 
     if (this.running.getAndSet(false)) { 
     super.stop(); 
     } 
    } 

}; 
+0

спасибо за тестируя код. Я изменил активацию restartTailer-активатора для возврата файла. Проблема, которую я чувствую, заключается в том, что WatchServiceDirectoryScanner заполняет очередь toBeReceived только один раз в FileReadingMessageSource. Если файлы/подкаталоги добавлены позже, невозможно просмотреть сообщение для этих файлов в новом подкаталоге.У нас есть бесконечный опрос в WatchServiceDirectoryScanner для просмотра всех событий из Java 7 WatchService? –

+1

Пожалуйста, найдите «ОБНОВЛЕНИЕ» в моем ответе. –

Смежные вопросы