2015-10-20 2 views
1

Использование Spring Integration Java DSL, я создал поток, где я обрабатываю файлы синхронно с FileSplitter. Я был в состоянии использовать setDeleteFiles флаг на AbstractFilePayloadTransformer удалить файл после преобразования каждой строки в File к Message для последующей обработки, например, так:Spring Integration Java DSL flow Splitter/Aggregator удаляет файл после обработки всех строк

@Bean 
protected IntegrationFlow s3ChannelFlow() { 
    // do not exhaust filesystem w/ files downloaded from S3 
    FileToInputStreamTransformer transformer = new FileToInputStreamTransformer(); 
    transformer.setDeleteFiles(true); 

    // @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading 
    // @formatter:off 
    return IntegrationFlows 
     .from(s3Channel()) 
     .channel(StatsUtil.createRunStatsChannel(runStatsRepository)) 
     .transform(transformer) 
     .split(new FileSplitter()) 
     .transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport)) 
     .publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow())) 
     .get(); 
    // @formatter:on 
} 

Это прекрасно работает, но медленно. Таким образом, я пытаюсь добавить ExecutorChannel после вышеуказанного .split, например, так:

.channel(c -> c.executor(Executors.newFixedThreadPool(10))) 

Но вышеупомянутый флаг удаления не позволить потоку успешно завершить удаление файла (ов), прежде чем они полностью прочитан.

Если я удалю флаг, у меня есть потенциал для исчерпания локальной файловой системы, где файлы были синхронизированы с S3.

Что я могу представить выше: a) обрабатывать каждый файл полностью и b) удалять файл из локальной файловой системы после его завершения? Другими словами, есть ли способ узнать точно, когда файл полностью обрабатывается (когда его строки обрабатываются асинхронно через потоки в пуле)?

Если вам интересно, вот мой осущ из FileToInputStreamTransformer:

public class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> { 

    private static final int BUFFER_SIZE = 64 * 1024; // 64 kB 

    @Override 
    // @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/ 
    protected InputStream transformFile(File payload) throws Exception { 
     return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE); 
    } 
} 

UPDATE

Так как же то, что по ходу потока знаю, что спросить?

Кстати, если я отслеживаю ваши советы правильно, когда я обновить .split с new FileSplitter(true, true) выше, я получаю

 
2015-10-20 14:26:45,288 [pool-6-thread-1] org.springframework.integration.handler.LoggingHandler ERROR org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is java.lang.IllegalArgumentException: 'json' argument must be an instance of: [class java.lang.String, class [B, class java.io.File, class java.net.URL, class java.io.InputStream, class java.io.Reader] , but gotten: class org.springframework.integration.file.splitter.FileSplitter$FileMarker 
    at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44) 

ответ

0

С благодарностью Артем.

Мне удалось решить проблему, но, возможно, более тяжело.

Вводя ExecutorChannel вызвало довольно пульсацию корректировок реализации, в том числе: выключая setDeleteFiles флаг на AbtractFilePayloadTransformer, обновление JPA @Entity, RunStats и хранилище для таких, чтобы захватить статус обработки файлов, а также статус обработки для всей запустить. В совокупности обновления статуса обработки позволяют потоку знать, когда удалять файлы из локальной файловой системы (т. Е. Когда они полностью обрабатываются) и возвращать статус в конечной точке /stats/{run}, чтобы пользователь мог знать, когда запуск завершен.

Вот отрывки из моей реализации (если кто-то любопытно) ...

class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> { 

private static final int BUFFER_SIZE = 64 * 1024; // 64 kB 

@Override 
// @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/ 
protected InputStream transformFile(File payload) throws Exception { 
    return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE); 
} 
} 

public class RunStatsHandler extends AbstractMessageHandler { 

private final SplunkSlf4jLogger log = new SplunkSlf4jLogger(LoggerFactory.getLogger(getClass())); 
private static final int BUFFER_SIZE = 64 * 1024; // 64 kB 

private final RunStatsRepository runStatsRepository; 

public RunStatsHandler(RunStatsRepository runStatsRepository) { 
    this.runStatsRepository = runStatsRepository; 
} 

// Memory efficient routine, @see http://www.baeldung.com/java-read-lines-large-file 
@Override 
protected void handleMessageInternal(Message<?> message) throws Exception { 
    RunStats runStats = message.getHeaders().get(RunStats.RUN, RunStats.class); 
    String token = message.getHeaders().get(RunStats.FILE_TOKEN, String.class); 
    if (runStats != null) { 
     File compressedFile = (File) message.getPayload(); 
     String compressedFileName = compressedFile.getCanonicalPath(); 
     LongAdder lineCount = new LongAdder(); 
     // Streams and Scanner implement java.lang.AutoCloseable 
     InputStream fs = new FileInputStream(compressedFile); 
     InputStream gzfs = new GZIPInputStream(fs, BUFFER_SIZE); 
     try (Scanner sc = new Scanner(gzfs, "UTF-8")) { 
      while (sc.hasNextLine()) { 
       sc.nextLine(); 
       lineCount.increment(); 
      } 
      // note that Scanner suppresses exceptions 
      if (sc.ioException() != null) { 
       log.warn("file.lineCount", ImmutableMap.of("run", runStats.getRun(), "file", compressedFileName, 
         "exception", sc.ioException().getMessage())); 
       throw sc.ioException(); 
      } 
      runStats.addFile(compressedFileName, token, lineCount.longValue()); 
      runStatsRepository.updateRunStats(runStats); 
      log.info("file.lineCount", 
        ImmutableMap.of("run", runStats.getRun(), "file", compressedFileName, "lineCount", lineCount.intValue())); 
     } 
    } 
} 

} 

Обновленный поток

@Bean 
protected IntegrationFlow s3ChannelFlow() { 
    // @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading 
    // @formatter:off 
    return IntegrationFlows 
     .from(s3Channel()) 
     .enrichHeaders(h -> h.headerFunction(RunStats.FILE_TOKEN, f -> UUID.randomUUID().toString())) 
     .channel(runStatsChannel()) 
     .channel(c -> c.executor(Executors.newFixedThreadPool(persistencePoolSize))) 
     .transform(new FileToInputStreamTransformer()) 
     .split(new FileSplitter()) 
     .transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport)) 
     .publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow())) 
     .get(); 
    // @formatter:on 
} 

@Bean 
public MessageChannel runStatsChannel() { 
    DirectChannel wiretapChannel = new DirectChannel(); 
    wiretapChannel.subscribe(new RunStatsHandler(runStatsRepository)); 
    DirectChannel loggingChannel = new DirectChannel(); 
    loggingChannel.addInterceptor(new WireTap(wiretapChannel)); 
    return loggingChannel; 
} 

К сожалению, я не могу поделиться RunStats и Репо реализации.

0

FileSplitter имеет markersoption именно для этой цели:

Set true, чтобы испускать начало/конец сообщений маркера файла до и после данных файла. Маркеры - это сообщения с полезными нагрузками FileSplitter.FileMarkerSTART и END значениями в свойстве mark). Маркеры могут использоваться при последовательной обработке файлов в потоке ниже по потоку, где фильтруются некоторые строки. Они позволяют последующей обработке знать, когда файл был полностью обработан. Маркер END включает в себя количество строк. По умолчанию: false. Когда true, apply-sequence - false по умолчанию.

Вы можете использовать его в потоке ниже по течению, чтобы определить, можно ли уже удалить файл или нет.

+0

См. Мое обновление выше. Как я могу продолжить? –

+0

Рассмотрите возможность использования 'PayloadTypeRouter' или' publishSubscribeChannel' с 'aggregator' с одной стороны и' filter' перед вашим 'transform()' на другом. –

Смежные вопросы