Использование Spring Integration Java DSL, я создал поток, где я обрабатываю файлы синхронно с FileSplitter
. Я был в состоянии использовать setDeleteFiles
флаг на AbstractFilePayloadTransformer
удалить файл после преобразования каждой строки в File
к Message
для последующей обработки, например, так:Spring Integration Java DSL flow Splitter/Aggregator удаляет файл после обработки всех строк
@Bean
protected IntegrationFlow s3ChannelFlow() {
// do not exhaust filesystem w/ files downloaded from S3
FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
transformer.setDeleteFiles(true);
// @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
// @formatter:off
return IntegrationFlows
.from(s3Channel())
.channel(StatsUtil.createRunStatsChannel(runStatsRepository))
.transform(transformer)
.split(new FileSplitter())
.transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
.publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
.get();
// @formatter:on
}
Это прекрасно работает, но медленно. Таким образом, я пытаюсь добавить ExecutorChannel
после вышеуказанного .split
, например, так:
.channel(c -> c.executor(Executors.newFixedThreadPool(10)))
Но вышеупомянутый флаг удаления не позволить потоку успешно завершить удаление файла (ов), прежде чем они полностью прочитан.
Если я удалю флаг, у меня есть потенциал для исчерпания локальной файловой системы, где файлы были синхронизированы с S3.
Что я могу представить выше: a) обрабатывать каждый файл полностью и b) удалять файл из локальной файловой системы после его завершения? Другими словами, есть ли способ узнать точно, когда файл полностью обрабатывается (когда его строки обрабатываются асинхронно через потоки в пуле)?
Если вам интересно, вот мой осущ из FileToInputStreamTransformer
:
public class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {
private static final int BUFFER_SIZE = 64 * 1024; // 64 kB
@Override
// @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/
protected InputStream transformFile(File payload) throws Exception {
return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE);
}
}
UPDATE
Так как же то, что по ходу потока знаю, что спросить?
Кстати, если я отслеживаю ваши советы правильно, когда я обновить .split
с new FileSplitter(true, true)
выше, я получаю
2015-10-20 14:26:45,288 [pool-6-thread-1] org.springframework.integration.handler.LoggingHandler ERROR org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is java.lang.IllegalArgumentException: 'json' argument must be an instance of: [class java.lang.String, class [B, class java.io.File, class java.net.URL, class java.io.InputStream, class java.io.Reader] , but gotten: class org.springframework.integration.file.splitter.FileSplitter$FileMarker at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44)
См. Мое обновление выше. Как я могу продолжить? –
Рассмотрите возможность использования 'PayloadTypeRouter' или' publishSubscribeChannel' с 'aggregator' с одной стороны и' filter' перед вашим 'transform()' на другом. –