2015-11-20 3 views
1

У нас есть данные, поступающие в плоский файл. напримерВесенняя партия: агрегирование записей и количество записей

EmpCode,Salary,EmpName,... 
100,1000,...,... 
200,2000,...,... 
200,2000,...,... 
100,1000,...,... 
300,3000,...,... 
400,4000,...,... 

Мы хотели бы, чтобы агрегировать зарплату на основе EmpCode и записи в базу данных, как

Emp_Code Emp_Salary Updated_Time Updated_User 
100   2000   ...   ... 
200   4000   ...   ... 
300   3000   ...   ... 
400   4000   ...   ... 

я написал классы согласно Spring Batch следующим

ItemReader - to read the employee data into a Employee object     

A образец EmployeeItemProcessor:

public class EmployeeProcessor implements ItemProcessor<Employee, Employee> { 

    @Override 
    public Employee process(Employee employee) throws Exception { 
     employee.setUpdatedTime(new Date()); 
     employee.setUpdatedUser("someuser"); 
     return employee; 
    } 

EmployeeItemWriter:

@Repository 
public class EmployeeItemWriter implements ItemWriter<Employee> { 
@Autowired 
private SessionFactory sf; 

@Override 
public void write(List<? extends Employee> employeeList) throws Exception { 
    List<Employee> aggEmployeeList = aggregateEmpData(employeeList); 
    //write to db using session factory 
} 

private List<Employee> aggregateEmpData(List<? extends Employee> employeeList){ 
    Map<String, Employee> map = new HashMap<String, Employee>(); 
    for(Employee e: employeeList){ 
     String empCode = e.getEmpCode(); 
     if(map.containsKey(empCode)){ 
      //get employee salary and add up 
     }else{ 
      map.put(empCode,Employee); 
     } 
    }  
    return new ArrayList<Employee>(map.values());   
} 
} 

XML конфигурации

... 
<batch:job id="employeeJob"> 
    <batch:step id="step1"> 
    <batch:tasklet> 
     <batch:chunk reader="employeeItemReader" 
      writer="employeeItemWriter" processor="employeeItemProcessor" 
      commit-interval="100"> 
     </batch:chunk> 
    </batch:tasklet> 
    </batch:step> 
    </batch:job> 
... 

Он работает и служит своей цели. Однако у меня есть пара вопросов.

1) Когда я смотрю на бревна, она показывает, как показано ниже (фиксации интервала = 100):

состояние = ЗАВЕРШЕНА, статус_завершения = ЗАВЕРШЕНА, readCount = 2652, filterCount = 0, writeCount = 2652 readSkipCount = 0, writeSkipCount = 0, commitSount = 27, rollbackCount = 0

Но после агрегации в базу данных было записано только 2515 записей. Количество записи составляет 2652. Это потому, что количество элементов, попадающих в ItemWriter, по-прежнему составляет 2652? Как это можно исправить?

2) Мы повторяем этот список дважды. После этого в ItemProcessor, а затем в ItemWriter для агрегации. Это может быть проблемой производительности, если число записей выше. Есть ли лучший способ достичь этого?

+0

Пошлите, пожалуйста, ItemReader – HaMi

+0

Привет, ItemReader похож на любой другой класс ItemReader. Абсолютной логики нет. – amdg

ответ

0

Почему агрегация в ItemWriter? Я бы сделал это в ItemProcessor. Это позволило бы считать количество записей точным и отделить этот компонент от акта фактической записи. Если вы дадите некоторое представление о своей конфигурации, мы можем подробнее рассказать о них.

+0

Привет, Майкл, поскольку мне удалось выполнить агрегацию в процессоре, я попытался использовать HibernateItemWriter. Это работает. Однако в соответствии с нашим процессом мы удаляем все предыдущие данные перед загрузкой. Таким образом, saveOrUpdate здесь не требуется. Я написал своего автора, чтобы «сохранить» данные с помощью Hibernate. Мне интересно, можем ли мы сконфигурировать HibernateItemWriter только для сохранения? – amdg

+0

Не могли бы вы задать этот вопрос как отдельный вопрос, чтобы другие могли найти ответ? –

2

Если каждая строка входного файла является объектом-сотрудником, то ваш ReadCount будет содержать количество строк во входном файле. WriteCount будет суммировать размер всех списков, переданных записи элемента. Итак, возможно, ваша функция aggregateEmpData удаляет или агрегирует некоторые записи в один, и, следовательно, ваш счетчик db не совпадает с WriteCount. Если вы хотите убедиться, что WriteCount - это точно количество записей в db, вы должны сделать свой агрегат в процессоре.

+0

Да, абсолютно верно. Функция агрегации объединяет записи. Следовательно, счет меньше. Я упомянул один из вопросов в переполнении стека и попытался выполнить агрегацию в ItemProcessor, но карта инициализируется при каждом вызове элемента. Можете ли вы рассказать мне, как добиться этого в itemprocessor? – amdg

1

Мне это удалось. Я сделал это следующим образом.

public class EmployeeProcessor implements ItemProcessor<Employee, Employee> { 
    Map<String, Employee> map; 
    @Override 
    public Employee process(Employee employee) throws Exception { 
     employee.setUpdatedTime(new Date()); 
     employee.setUpdatedUser("someuser"); 
     String empCode = employee.getEmpCode(); 
     if(map.containsKey(empCode)){ 
      //get employee salary and add up 
      return null; 
     } 
     map.put(empCode,employee); 
     return employee; 
    } 

    @BeforeStep 
    public void beforeStep(StepExecution stepExecution) { 
     map = new HashMap<String, Employee>(); 
    } 

Счетчик записи отображается правильно.

Смежные вопросы