2012-07-04 2 views
1

У меня есть приложение SpringBatch, где мы пытаемся выполнить параллельную обработку. В пакете он считывает из таблицы и обновляет другую таблицу с ответом. Если в таблице ввода 100 записей, выходная таблица также должна иметь 100 записей.SimpleAsyncTaskExecutor vs SyncTaskExecutor в SpringBatch

Теперь у меня есть 13600 записей в таблице ввода. Когда я пытался с SyncTaskExecutor, работал только один поток, а выходная таблица получила 13600 записей. Когда я попытался с SimpleAsyncTaskExecutor, в выходной таблице было всего 900 записей.

декларация Работа ниже:

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xmlns:task="http://www.springframework.org/schema/task" 
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd 
    http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd 
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd"> 

<import resource="applicationContext.xml" /> 


<bean id="itemReader" 
    class="org.springframework.batch.item.database.JdbcCursorItemReader"> 
    <property name="dataSource" ref="dataSource" /> 
    <property name="sql" value="select REP_QMUT_KEY, DLN_DLNRNR, DLN_AFVDAT, F_IND_MEMO_DVB, MUT_MUTDAT_UM, MUT_VERWDAT_UM, MUT_SRT_MUT_UM from REP_QMUT" /> 
    <property name="rowMapper"> 
     <bean class="com.aegon.quinto.service.mapper.MutationInputRowMapper" /> 
    </property> 
</bean> 
<bean id="simpleStep" 
    class="org.springframework.batch.core.step.item.SimpleStepFactoryBean"> 
    <property name="transactionManager" ref="transactionManager" /> 
    <property name="jobRepository" ref="jobRepository" /> 
    <property name="itemReader" ref="itemReader" /> 
    <property name="itemWriter" ref="itemWriter" /> 
    <property name="commitInterval" value="10" /> 
    <property name="startLimit" value="1" /> 
</bean> 

<bean id="itemWriter" class="org.springframework.batch.item.database.JdbcBatchItemWriter"> 
    <property name="dataSource" ref="dataSource" /> 
    <property name="itemSqlParameterSourceProvider"> 
     <bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" /> 
    </property> 
    <property name="sql" value="INSERT INTO MUT_TRIAL(DLNRNR, AFVDAT, MEMO_MUTATION, MEMO_PARTICIPANT, MUTATION_DATE, PROCESSING_DATE, RUN_NR, SRT_MUT, REP_QMUT_CORTICON_KEY) VALUES (:dlnrnr,:afvDat,:memo,:participantMemo,:mutationDate,:processDate,:runNr,:mutationType,:mutationKey)" /> 
    </bean> 

<bean id="simpleChunkListner" class="com.aegon.quinto.service.listener.SimpleChunkListener" /> 

<bean id="taskExecutor" class="org.springframework.core.task.SyncTaskExecutor" /> 
<bean id="itemProcessor" class="com.aegon.quinto.service.processor.SimpleItemProcessor" /> 

<!-- job id="simpleJob" xmlns="http://www.springframework.org/schema/batch"> 
    <step id="simpleStep"> 
     <tasklet> 
      <chunk reader="itemReader" writer="itemWriter" 
       commit-interval="50"> 
      </chunk> 
     </tasklet> 
    </step> 

</job--> 

<job id="simpleJob" xmlns="http://www.springframework.org/schema/batch"> 
    <step id="simpleStep"> 
     <tasklet task-executor="taskExecutor" throttle-limit="25"> 
      <chunk reader="itemReader" processor="itemProcessor" writer="itemWriter" 
       commit-interval="50"> 
      </chunk> 
     </tasklet> 
    </step> 

</job> 


<!-- For running the BatchLauncher --> 
<bean id="batchLauncher" class="com.aegon.quinto.service.BatchLauncher"> 
    <property name="jobLauncher" ref="jobLauncher" /> 
    <property name="jobRepository" ref="jobRepository" /> 
    <property name="job" ref="simpleJob" /> 
</bean> 
</beans> 

Я пытаюсь выполнить шаг в несколько потоков

Mapper:

import java.sql.ResultSet; 

импорт java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;

импорт com.aegon.quinto.model.MutationInput;

общественного класс MutationInputRowMapper реализует RowMapper {

public Object mapRow(ResultSet rs, int rowNum) throws SQLException { 
    // TODO Auto-generated method stub 
    MutationInput mutationInput = new MutationInput(); 
    mutationInput.setMutationKey(rs.getInt("REP_QMUT_KEY")); 
    mutationInput.setDlnrnr(rs.getString("DLN_DLNRNR")); 
    mutationInput.setMemo(rs.getString("F_IND_MEMO_MVM")); 
    mutationInput.setParticipantMemo(rs.getString("F_IND_MEMO_DVB")); 
    mutationInput.setProcessDate(rs.getInt("MUT_VERWDAT_UM")); 
    mutationInput.setRunNr(new Integer("2")); 
    mutationInput.setMutationType(rs.getString("MUT_SRT_MUT_UM")); 

    return mutationInput; 
} 

}

Моего общее требование следующим образом: я буду читать данные из входной таблицы, проверять данные с внешней службой и обновлением проверки ответ в выходной таблице. В таблице ввода данные будут в плоской структуре. Т.е. для ученика могут быть результаты экзамена для нескольких экзаменов. Мне нужно получить все результаты экзамена для этого участника перед тем, как перейти на внешнюю службу. Связь с внешней службой будет узким местом из-за латентности сети. Следовательно, требуется многопоточность. Если есть примеры реализации/любого руководства, пожалуйста, покажите мне путь. P.S: Я новичок в SpringBatch.

+0

Добавьте свою конфигурацию вашего экзешника. –

+0

Конфигурация TaskExecutor находится в первой строке кода – Rajkumar

+0

@Rajkumar. Когда вы говорите, что при использовании SimpleAsyncTaskExecutor было написано только 900 записей, был ли статус РАБОТА ЗАВЕРШЕН? – gsndev

ответ

1

Похоже, что один или несколько ваших компонентов не являются потокобезопасными.

+0

Я использую 'JdbcCursorItemReader' и' JdbcBatchItemWriter' для чтения и записи. Должен ли я использовать некоторых других читателей? – Rajkumar

+0

есть больше компонентов, itemProcessor и используемый объект данных между считывателем, процессором и писателем, pls публикуют их все :-) –

+0

Спасибо Michael. Добавлен весь xml выше. – Rajkumar

Смежные вопросы