2014-12-03 3 views
0

Я пишу Spring Batch с идеей масштабирования, когда это необходимо. Мой ApplicationContext выглядит следующим образомSpring Batch Multiple Threads

@Configuration 
@EnableBatchProcessing 
@EnableTransactionManagement 
@ComponentScan(basePackages = "in.springbatch") 
@PropertySource(value = {"classpath:springbatch.properties"}) 

public class ApplicationConfig { 

@Autowired 
Environment environment; 

@Autowired 
private JobBuilderFactory jobs; 

@Autowired 
private StepBuilderFactory stepBuilderFactory; 

@Bean 
public Job job() throws Exception { 

    return jobs.get("spring_batch") 
      .flow(step()).end() 
      .build(); 
} 

@Bean(name = "dataSource", destroyMethod = "close") 
public DataSource dataSource() { 

    BasicDataSource basicDataSource = new BasicDataSource(); 



    return basicDataSource; 
} 

@Bean 
public JobRepository jobRepository() throws Exception { 
    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); 
    jobRepositoryFactoryBean.setTransactionManager(transactionManager()); 
    jobRepositoryFactoryBean.setDataSource(dataSource()); 
    return jobRepositoryFactoryBean.getObject(); 
} 

@Bean(name = "batchstep") 
public Step step() throws Exception { 

    return stepBuilderFactory.get("batchstep").allowStartIfComplete(true). 
    transactionManager(transactionManager()). 
      chunk(2).reader(batchReader()).processor(processor()).writer(writer()).build(); 

    } 



@Bean 
ItemReader batchReader() throws Exception { 
    System.out.println(Thread.currentThread().getName()+"reader"); 
    HibernateCursorItemReader<Source> hibernateCursorItemReader = new HibernateCursorItemReader<>(); 
    hibernateCursorItemReader.setQueryString("from Source"); 
    hibernateCursorItemReader.setFetchSize(2); 
    hibernateCursorItemReader.setSessionFactory(sessionFactory().getObject()); 


    hibernateCursorItemReader.close(); 
    return hibernateCursorItemReader; 
} 

@Bean 
public ItemProcessor processor() { 
    return new BatchProcessor(); 
} 

@Bean 
public ItemWriter writer() { 
    return new BatchWriter(); 
} 

public TaskExecutor taskExecutor(){ 

    SimpleAsyncTaskExecutor asyncTaskExecutor=new SimpleAsyncTaskExecutor("spring_batch"); 
    asyncTaskExecutor.setConcurrencyLimit(5); 
    return asyncTaskExecutor; 


} 
@Bean 
public LocalSessionFactoryBean sessionFactory() { 
    LocalSessionFactoryBean sessionFactory = new LocalSessionFactoryBean(); 
    sessionFactory.setDataSource(dataSource()); 
    sessionFactory.setPackagesToScan(new String[]{"in.springbatch.entity"}); 
    sessionFactory.setHibernateProperties(hibernateProperties()); 

    return sessionFactory; 
} 

@Bean 
public PersistenceExceptionTranslationPostProcessor exceptionTranslation() { 
    return new PersistenceExceptionTranslationPostProcessor(); 
} 

@Bean 
@Autowired 
public HibernateTransactionManager transactionManager() { 
    HibernateTransactionManager txManager = new HibernateTransactionManager(); 
    txManager.setSessionFactory(sessionFactory().getObject()); 

    return txManager; 
} 

Properties hibernateProperties() { 
    return new Properties() { 
     { 
      setProperty("hibernate.hbm2ddl.auto",  environment.getProperty("hibernate.hbm2ddl.auto")); 
      setProperty("hibernate.dialect", environment.getProperty("hibernate.dialect")); 
      setProperty("hibernate.globally_quoted_identifiers", "false"); 

     } 
    }; 
} 

}

  1. С выше конфигурации я могу читать из БД, обрабатывать данные и записывать в БД.
  2. Я использую размер блока как 2 и считываю 2 записи из курсора, используя . Читатель HibernateCusrsorItem и мой запрос на чтение из базы данных основаны на дату, чтобы выбрать текущие записи даты.
  3. До сих пор я мог достичь желаемого поведения, а также перезапускать способность с заданием только собирать записи, которые не обрабатывались из-за отказа в предыдущем запуске.

Теперь мое требование состоит в том, чтобы пакетное использование нескольких потоков для обработки данных и записи в БД.

Мой процессор и писатель выглядит следующим образом

@Component 
public class BatchProcessor implements ItemProcessor<Source,DestinationDto>{ 

@Override 
public DestinationDto process(Source source) throws Exception { 

     System.out.println(Thread.currentThread().getName()+":"+source); 
     DestinationDto destination=new DestinationDto(); 
     destination.setName(source.getName()); 
     destination.setValue(source.getValue()); 
     destination.setSourceId(source.getSourceId().toString()); 

    return destination; 
} 
@Component 
public class BatchWriter implements ItemWriter<DestinationDto>{ 

@Autowired 
IBatchDao batchDao; 

@Override 
public void write(List<? extends DestinationDto> list) throws Exception { 
    System.out.println(Thread.currentThread().getName()+":"+list); 
    batchDao.saveToDestination((List<DestinationDto>)list); 
} 

Я обновил мой шаг и добавил ThreadPoolTaskExecutor следующим

@Bean(name = "batchstep") 
public Step step() throws Exception { 

    return stepBuilderFactory.get("batchstep").allowStartIfComplete(true). 
    transactionManager(transactionManager()).chunk(1).reader(batchReader()). 
    processor(processor()).writer(writer()).taskExecutor(taskExecutor()).build(); 

    } 

После этого мой процессор вызывался несколькими потоками, но с теми же данными источника , Есть ли что-нибудь дополнительное, что мне нужно сделать?

ответ

0

Это большой вопрос

  1. Ваш лучший выбор на получение хороших ответов будет выглядеть через Scaling и параллельной обработки главы в Spring Batch документации (Here)

  2. Там может быть некоторыми многопоточными образцами в примерах весенних партий (Here)

  3. Простым способом потоковой работы с весенними партиями является создание будущего процессора - вы ставите ll ваша логика обработки в будущем объекте, и ваш класс весеннего процессора добавляет объекты в будущее. Вы создаете класс, а затем ждите в будущем, прежде чем выполнять процесс записи. Извините, у меня нет образца, чтобы указать вам тоже на это, но если у вас есть конкретные вопросы, я могу попробовать и ответить!

+0

Спасибо, что нашли время, чтобы прочитать длительный вопрос. Я думаю, что сначала я должен потратить некоторое время, пытаясь использовать Spring Batch из коробки, чтобы добиться многопоточности. – Amardeep

+1

Реально говорящий для вас вариант case 3 может быть проще. Функция Spring Batch из коробки позволяет добиться многопоточной работы, когда вы можете многопоточно использовать весь шаг, т. Е. Ваш читатель может читать информацию в кусках. –

+0

Хорошо, я могу попробовать вариант 3. Но, как вы говорите, «Spring Batch out of функция ящика, чтобы добиться многопоточной обработки, когда вы можете многопоточно перебирать весь шаг, то есть читатель может читать информацию в кусках ». Я читаю данные из HibernateCursorReader, который предоставляет мне возможность считывать данные в кусках. Правильно ли это, тогда вы можете указать мне, как я могу многопотопить мой шаг? – Amardeep