Я пишу Spring Batch с идеей масштабирования, когда это необходимо. Мой ApplicationContext выглядит следующим образомSpring Batch Multiple Threads
@Configuration
@EnableBatchProcessing
@EnableTransactionManagement
@ComponentScan(basePackages = "in.springbatch")
@PropertySource(value = {"classpath:springbatch.properties"})
public class ApplicationConfig {
@Autowired
Environment environment;
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job job() throws Exception {
return jobs.get("spring_batch")
.flow(step()).end()
.build();
}
@Bean(name = "dataSource", destroyMethod = "close")
public DataSource dataSource() {
BasicDataSource basicDataSource = new BasicDataSource();
return basicDataSource;
}
@Bean
public JobRepository jobRepository() throws Exception {
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setTransactionManager(transactionManager());
jobRepositoryFactoryBean.setDataSource(dataSource());
return jobRepositoryFactoryBean.getObject();
}
@Bean(name = "batchstep")
public Step step() throws Exception {
return stepBuilderFactory.get("batchstep").allowStartIfComplete(true).
transactionManager(transactionManager()).
chunk(2).reader(batchReader()).processor(processor()).writer(writer()).build();
}
@Bean
ItemReader batchReader() throws Exception {
System.out.println(Thread.currentThread().getName()+"reader");
HibernateCursorItemReader<Source> hibernateCursorItemReader = new HibernateCursorItemReader<>();
hibernateCursorItemReader.setQueryString("from Source");
hibernateCursorItemReader.setFetchSize(2);
hibernateCursorItemReader.setSessionFactory(sessionFactory().getObject());
hibernateCursorItemReader.close();
return hibernateCursorItemReader;
}
@Bean
public ItemProcessor processor() {
return new BatchProcessor();
}
@Bean
public ItemWriter writer() {
return new BatchWriter();
}
public TaskExecutor taskExecutor(){
SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch");
asyncTaskExecutor.setConcurrencyLimit(5);
return asyncTaskExecutor;
}
@Bean
public LocalSessionFactoryBean sessionFactory() {
LocalSessionFactoryBean sessionFactory = new LocalSessionFactoryBean();
sessionFactory.setDataSource(dataSource());
sessionFactory.setPackagesToScan(new String[]{"in.springbatch.entity"});
sessionFactory.setHibernateProperties(hibernateProperties());
return sessionFactory;
}
@Bean
public PersistenceExceptionTranslationPostProcessor exceptionTranslation() {
return new PersistenceExceptionTranslationPostProcessor();
}
@Bean
@Autowired
public HibernateTransactionManager transactionManager() {
HibernateTransactionManager txManager = new HibernateTransactionManager();
txManager.setSessionFactory(sessionFactory().getObject());
return txManager;
}
Properties hibernateProperties() {
return new Properties() {
{
setProperty("hibernate.hbm2ddl.auto", environment.getProperty("hibernate.hbm2ddl.auto"));
setProperty("hibernate.dialect", environment.getProperty("hibernate.dialect"));
setProperty("hibernate.globally_quoted_identifiers", "false");
}
};
}
}
- С выше конфигурации я могу читать из БД, обрабатывать данные и записывать в БД.
- Я использую размер блока как 2 и считываю 2 записи из курсора, используя . Читатель HibernateCusrsorItem и мой запрос на чтение из базы данных основаны на дату, чтобы выбрать текущие записи даты.
- До сих пор я мог достичь желаемого поведения, а также перезапускать способность с заданием только собирать записи, которые не обрабатывались из-за отказа в предыдущем запуске.
Теперь мое требование состоит в том, чтобы пакетное использование нескольких потоков для обработки данных и записи в БД.
Мой процессор и писатель выглядит следующим образом
@Component
public class BatchProcessor implements ItemProcessor<Source,DestinationDto>{
@Override
public DestinationDto process(Source source) throws Exception {
System.out.println(Thread.currentThread().getName()+":"+source);
DestinationDto destination=new DestinationDto();
destination.setName(source.getName());
destination.setValue(source.getValue());
destination.setSourceId(source.getSourceId().toString());
return destination;
}
@Component
public class BatchWriter implements ItemWriter<DestinationDto>{
@Autowired
IBatchDao batchDao;
@Override
public void write(List<? extends DestinationDto> list) throws Exception {
System.out.println(Thread.currentThread().getName()+":"+list);
batchDao.saveToDestination((List<DestinationDto>)list);
}
Я обновил мой шаг и добавил ThreadPoolTaskExecutor следующим
@Bean(name = "batchstep")
public Step step() throws Exception {
return stepBuilderFactory.get("batchstep").allowStartIfComplete(true).
transactionManager(transactionManager()).chunk(1).reader(batchReader()).
processor(processor()).writer(writer()).taskExecutor(taskExecutor()).build();
}
После этого мой процессор вызывался несколькими потоками, но с теми же данными источника , Есть ли что-нибудь дополнительное, что мне нужно сделать?
Спасибо, что нашли время, чтобы прочитать длительный вопрос. Я думаю, что сначала я должен потратить некоторое время, пытаясь использовать Spring Batch из коробки, чтобы добиться многопоточности. – Amardeep
Реально говорящий для вас вариант case 3 может быть проще. Функция Spring Batch из коробки позволяет добиться многопоточной работы, когда вы можете многопоточно использовать весь шаг, т. Е. Ваш читатель может читать информацию в кусках. –
Хорошо, я могу попробовать вариант 3. Но, как вы говорите, «Spring Batch out of функция ящика, чтобы добиться многопоточной обработки, когда вы можете многопоточно перебирать весь шаг, то есть читатель может читать информацию в кусках ». Я читаю данные из HibernateCursorReader, который предоставляет мне возможность считывать данные в кусках. Правильно ли это, тогда вы можете указать мне, как я могу многопотопить мой шаг? – Amardeep