У меня есть большой файл, который может содержать записи от 100K до 500K. Я планирую использовать обработку, ориентированную на чанк, и моя мысльОбработка большого файла с использованием весенней партии
1) Разделить большой файл на меньший, в зависимости от количества, пусть говорят 10K в каждом файле.
2) Если есть 100K записей, то я буду получать 10 файлов каждый из которых содержит 10K reocrds
3) Я хотел бы разделить эти 10 файлов и хотел бы обрабатывать с использованием 5 нитей. Я думаю использовать пользовательские MultiResourcePartioner
4) 5 потоков должны обрабатывать все 10 файлов, созданных в процессе разделения.
5) Я не хочу создавать одинаковое количество потоков, равное количеству файлов, так как в этом случае я могу столкнуться с проблемами памяти. Я ищу то, что количество файлов, которые я хотел бы обрабатывать, используя только 5 потоков (я могу увеличить, исходя из моих требований).
Эксперт, можете ли вы сообщить мне, что это может быть достигнуто с использованием весенней партии? Если да не могли бы вы поделиться указателями или эталонных реализациями
Заранее спасибо
Рабочей работы-конфигурация XML
<description>Spring Batch File Chunk Processing</description>
<import resource="../config/batch-context.xml" />
<batch:job id="file-partition-batch" job-repository="jobRepository" restartable="false">
<batch:step id="master">
<batch:partition partitioner="partitioner" handler="partitionHandler" />
</batch:step>
</batch:job>
<batch:step id="slave">
<batch:tasklet>
<batch:chunk reader="reader" processor="compositeProcessor"
writer="compositeWriter" commit-interval="5">
</batch:chunk>
</batch:tasklet>
</batch:step>
<bean id="partitionHandler" class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="taskExecutor"/>
<property name="step" ref="slave" />
<property name="gridSize" value="5" />
</bean>
<bean id="partitioner" class="com.poc.partitioner.FileMultiResourcePartitioner">
<property name="resources" value="file:/Users/anupghosh/Documents/Spring_Batch/FilePartitionBatch/*.txt" />
<property name="threadName" value="feed-processor" />
</bean>
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="5" />
<property name="maxPoolSize" value="5" />
</bean>
<bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="resource" value="#{stepExecutionContext['fileName']}" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="delimiter" value="|"/>
<property name="names" value="key,docName,docTypCD,itemType,itemNum,launchDate,status" />
</bean>
</property>
<property name="fieldSetMapper">
<bean class="com.poc.mapper.FileRowMapper" />
</property>
</bean>
</property>
</bean>
<bean id="validatingProcessor" class="org.springframework.batch.item.validator.ValidatingItemProcessor">
<constructor-arg ref="feedRowValidator" />
</bean>
<bean id="feedProcesor" class="com.poc.processor.FeedProcessor" />
<bean id="compositeProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor" scope="step">
<property name="delegates">
<list>
<ref bean="validatingProcessor" />
<ref bean="feedProcesor" />
</list>
</property>
</bean>
<bean id="recordDecWriter" class="com.poc.writer.RecordDecWriter" />
<bean id="reconFlatFileCustomWriter" class="com.poc.writer.ReconFileWriter">
<property name="reconFlatFileWriter" ref="reconFlatFileWriter" />
</bean>
<bean id="reconFlatFileWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">
<property name="resource" value="file:/Users/anupghosh/Documents/Spring_Batch/recon-#{stepExecutionContext[threadName]}.txt" />
<property name="shouldDeleteIfExists" value="true" />
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="delimiter" value="|" />
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
<property name="names" value="validationError" />
</bean>
</property>
</bean>
</property>
</bean>
<bean id="compositeWriter" class="org.springframework.batch.item.support.CompositeItemWriter">
<property name="delegates">
<list>
<ref bean="recordDecWriter" />
<ref bean="reconFlatFileCustomWriter" />
</list>
</property>
</bean>
<bean id="feedRowValidator" class="org.springframework.batch.item.validator.SpringValidator">
<property name="validator">
<bean class="com.poc.validator.FeedRowValidator"/>
</property>
</bean>
У вас действительно есть процесс, который выкладывается правильно. Каков ваш конкретный вопрос? –
Спасибо @MichaelMinella за ответ в короткий промежуток времени. Мой вопрос: если у меня есть 10 файлов после процесса split (может быть больше), как я буду обрабатывать эти файлы с помощью 5 потоков? Более конкретно, как я буду разделять эти файлы, чтобы 5 потоков обработали все эти 10 файлов. –
«MultiResourcePartitioner» - это правильный способ создания разделов. Оттуда, используя «TaskExecutionPartitionHandler», вы будете контролировать, сколько потоков используется через конфигурацию вашего «TaskExecutor». По умолчанию используется «SyncTaskExecutor», но мы ожидаем, что вы настроите что-то еще, как «ThreadPoolTaskExecutor». В этом 'TaskExecutor' вы можете настроить максимальные потоки и т. Д. –