2015-06-30 3 views
0

У меня есть пользовательский считыватель для чтения данных из файла CSV.Spring Batch для чтения нескольких файлов с таким же расширением

package org.kp.oppr.remediation.batch.csv; 

import java.util.Arrays; 
import java.util.LinkedHashMap; 
import java.util.Map; 

import org.apache.commons.lang.StringUtils; 
import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.Logger; 
import org.remediation.batch.csv.FlatFileItemReaderNewLine; 
import org.remediation.batch.model.RawItem; 
import org.remediation.batch.model.RawItemLineMapper; 
import org.springframework.batch.core.ExitStatus; 
import org.springframework.batch.core.StepExecution; 
import org.springframework.batch.core.StepExecutionListener; 
import org.springframework.batch.core.annotation.BeforeStep; 
import org.springframework.batch.item.file.LineCallbackHandler; 
import org.springframework.batch.item.file.LineMapper; 
import org.springframework.batch.item.file.mapping.DefaultLineMapper; 
import org.springframework.batch.item.file.mapping.FieldSetMapper; 
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; 
import org.springframework.batch.item.file.transform.FieldSet; 
import org.springframework.batch.item.file.transform.LineTokenizer; 
import org.springframework.core.io.Resource; 
import org.springframework.util.Assert; 
import org.springframework.validation.BindException; 

public class RawItemCsvReader extends MultiResourceItemReader<RawItem> 
     implements StepExecutionListener, LineCallbackHandler, 
     FieldSetMapper<RawItem> { 

    static final Logger LOGGER = LogManager.getLogger(RawItemCsvReader.class); 
    final private String COLUMN_NAMES_KEY = "COLUMNS_NAMES_KEY"; 
    private StepExecution stepExecution; 
    private DefaultLineMapper<RawItem> lineMapper; 
    private String[] columnNames; 
    private Resource[] resources; 
// = DelimitedLineTokenizer.DELIMITER_COMMA; 
    private char quoteCharacter = DelimitedLineTokenizer.DEFAULT_QUOTE_CHARACTER; 
    private String delimiter; 


    public RawItemCsvReader() { 
     setLinesToSkip(0); 
     setSkippedLinesCallback(this); 
    } 

    @Override 
    public void afterPropertiesSet() { 
     // not in constructor to ensure we invoke the override 
     final DefaultLineMapper<RawItem> lineMapper = new RawItemLineMapper(); 
     setLineMapper(lineMapper); 
    } 

    /** 
    * Satisfies {@link LineCallbackHandler} contract and and Acts as the 
    * {@code skippedLinesCallback}. 
    * 
    * @param line 
    */ 
    @Override 
    public void handleLine(String line) { 
     getLineMapper().setLineTokenizer(getTokenizer()); 
     getLineMapper().setFieldSetMapper(this); 
    } 

    private LineTokenizer getTokenizer() { 

     // this.columnNames = line.split(delimiter); 
     DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer(); 
     lineTokenizer.setQuoteCharacter(quoteCharacter); 
     lineTokenizer.setDelimiter(delimiter); 
     lineTokenizer.setStrict(true); 
     lineTokenizer.setNames(columnNames); 
     addColumnNames(); 
     return lineTokenizer; 
    } 

    private void addColumnNames() { 
     stepExecution.getExecutionContext().put(COLUMN_NAMES_KEY, columnNames); 
    } 


    @Override 
    public void setResources(Resource[] resources) { 

     this.resources = resources; 
     super.setResources(resources); 

    } 



    /** 
    * Provides acces to an otherwise hidden field in parent class. We need this 
    * because we have to reconfigure the {@link LineMapper} based on file 
    * contents. 
    * 
    * @param lineMapper 
    */ 
    @Override 
    public void setLineMapper(LineMapper<RawItem> lineMapper) { 
     if (!(lineMapper instanceof DefaultLineMapper)) { 
      throw new IllegalArgumentException(
        "Must specify a DefaultLineMapper"); 
     } 
     this.lineMapper = (DefaultLineMapper) lineMapper; 

     super.setLineMapper(lineMapper); 
    } 

    private DefaultLineMapper getLineMapper() { 
     return this.lineMapper; 
    } 

    /** 
    * Satisfies {@link FieldSetMapper} contract. 
    * 
    * @param fs 
    * @return 
    * @throws BindException 
    */ 
    @Override 
    public RawItem mapFieldSet(FieldSet fs) throws BindException { 
     if (fs == null) { 
      return null; 
     } 
     Map<String, String> record = new LinkedHashMap<String, String>(); 
     for (String columnName : this.columnNames) { 
      record.put(columnName, 
        StringUtils.trimToNull(fs.readString(columnName))); 
     } 
     RawItem item = new RawItem(); 
     item.setResource(resources); 
     item.setRecord(record); 
     return item; 
    } 

    @BeforeStep 
    public void saveStepExecution(StepExecution stepExecution) { 
     this.stepExecution = stepExecution; 
    } 

    @Override 
    public void beforeStep(StepExecution stepExecution) { 
     //LOGGER.info("Start Raw Read Step for " + itemResource.getFilename()); 

    } 

    @Override 
    public ExitStatus afterStep(StepExecution stepExecution) { 
     LOGGER.info("End Raw Read Step for lines read: " + stepExecution.getReadCount() 
       + " lines skipped: " + stepExecution.getReadSkipCount()); 

     /* 
     LOGGER.info("End Raw Read Step for " + itemResource.getFilename() 
       + " lines read: " + stepExecution.getReadCount() 
       + " lines skipped: " + stepExecution.getReadSkipCount()); 
       */ 
     return ExitStatus.COMPLETED; 
    } 

    public void setDelimiter(String delimiter) { 
     this.delimiter = delimiter; 
    } 

    public void setQuoteCharacter(char quoteCharacter) { 
     this.quoteCharacter = quoteCharacter; 
    } 

    public String[] getColumnNames() { 
     return columnNames; 
    } 

    public void setColumnNames(String[] columnNames) { 
     this.columnNames = columnNames; 
    } 

    public String getDelimiter() { 
     return delimiter; 
    } 

} 

Я хочу использовать MultiResourceItemReader вместе с этим классом, чтобы прочитать несколько файлов с тем же расширением. Я использую Spring MultiResourceItemReader для выполнения задания. Мне нужно знать, как настроить частный делегат ResourceAwareItemReaderItemStream; Экземпляр этого класса

package org.kp.oppr.remediation.batch.csv; 

import java.util.Arrays; 
import java.util.Comparator; 
import org.apache.commons.logging.Log; 
import org.apache.commons.logging.LogFactory; 
import org.apache.logging.log4j.LogManager; 
import org.apache.logging.log4j.Logger; 
import org.springframework.batch.item.ExecutionContext; 
import org.springframework.batch.item.ItemReader; 
import org.springframework.batch.item.ItemStream; 
import org.springframework.batch.item.ItemStreamException; 
import org.springframework.batch.item.ParseException; 
import org.springframework.batch.item.UnexpectedInputException; 
import org.springframework.batch.item.file.LineCallbackHandler; 
import org.springframework.batch.item.file.LineMapper; 
import org.springframework.batch.item.file.ResourceAwareItemReaderItemStream; 
import org.springframework.batch.item.util.ExecutionContextUserSupport; 
import org.springframework.beans.factory.InitializingBean; 
import org.springframework.core.io.Resource; 
import org.springframework.util.Assert; 
import org.springframework.util.ClassUtils; 

public class MultiResourceItemReader <T> implements ItemReader<T>, ItemStream, InitializingBean,ResourceAwareItemReaderItemStream<T> { 

    static final Logger LOGGER = LogManager 
      .getLogger(MultipleFlatFileItemReaderNewLine.class); 

    private final ExecutionContextUserSupport executionContextUserSupport = new ExecutionContextUserSupport(); 

    private ResourceAwareItemReaderItemStream<? extends T> delegate; 

    private Resource[] resources; 

    private MultiResourceIndex index = new MultiResourceIndex(); 

    private boolean saveState = true; 

    // signals there are no resources to read -> just return null on first read 
    private boolean noInput; 

    private LineMapper<T> lineMapper; 

    private int linesToSkip = 0; 

    private LineCallbackHandler skippedLinesCallback; 

    private Comparator<Resource> comparator = new Comparator<Resource>() { 

     /** 
     * Compares resource filenames. 
     */ 
     public int compare(Resource r1, Resource r2) { 
      return r1.getFilename().compareTo(r2.getFilename()); 
     } 

    }; 

    public MultiResourceItemReader() { 
     executionContextUserSupport.setName(ClassUtils.getShortName(MultiResourceItemReader.class)); 
    } 

    /** 
    * @param skippedLinesCallback 
    *   will be called for each one of the initial skipped lines 
    *   before any items are read. 
    */ 
    public void setSkippedLinesCallback(LineCallbackHandler skippedLinesCallback) { 
     this.skippedLinesCallback = skippedLinesCallback; 
    } 

    /** 
    * Public setter for the number of lines to skip at the start of a file. Can 
    * be used if the file contains a header without useful (column name) 
    * information, and without a comment delimiter at the beginning of the 
    * lines. 
    * 
    * @param linesToSkip 
    *   the number of lines to skip 
    */ 
    public void setLinesToSkip(int linesToSkip) { 
     this.linesToSkip = linesToSkip; 
    } 

    /** 
    * Setter for line mapper. This property is required to be set. 
    * 
    * @param lineMapper 
    *   maps line to item 
    */ 
    public void setLineMapper(LineMapper<T> lineMapper) { 
     this.lineMapper = lineMapper; 
    } 

    /** 
    * Reads the next item, jumping to next resource if necessary. 
    */ 
    public T read() throws Exception, UnexpectedInputException, ParseException { 

     if (noInput) { 
      return null; 
     } 

     T item; 
     item = readNextItem(); 
     index.incrementItemCount(); 

     return item; 
    } 

    /** 
    * Use the delegate to read the next item, jump to next resource if current 
    * one is exhausted. Items are appended to the buffer. 
    * @return next item from input 
    */ 
    private T readNextItem() throws Exception { 

     T item = delegate.read(); 

     while (item == null) { 

      index.incrementResourceCount(); 

      if (index.currentResource >= resources.length) { 
       return null; 
      } 

      delegate.close(); 
      delegate.setResource(resources[index.currentResource]); 
      delegate.open(new ExecutionContext()); 

      item = delegate.read(); 
     } 

     return item; 
    } 

    /** 
    * Close the {@link #setDelegate(ResourceAwareItemReaderItemStream)} reader 
    * and reset instance variable values. 
    */ 
    public void close() throws ItemStreamException { 
     index = new MultiResourceIndex(); 
     delegate.close(); 
     noInput = false; 
    } 

    /** 
    * Figure out which resource to start with in case of restart, open the 
    * delegate and restore delegate's position in the resource. 
    */ 
    public void open(ExecutionContext executionContext) throws ItemStreamException { 

     Assert.notNull(resources, "Resources must be set"); 

     noInput = false; 
     if (resources.length == 0) { 
      LOGGER.warn("No resources to read"); 
      noInput = true; 
      return; 
     } 

     Arrays.sort(resources, comparator); 
    for(int i =0; i < resources.length; i++) 
    { 
     LOGGER.info("Resources after Sorting" + resources[i]); 
    } 


     index.open(executionContext); 

     delegate.setResource(resources[index.currentResource]); 

     delegate.open(new ExecutionContext()); 

     try { 
      for (int i = 0; i < index.currentItem; i++) { 
       delegate.read(); 
      } 
     } 
     catch (Exception e) { 
      throw new ItemStreamException("Could not restore position on restart", e); 
     } 
    } 

    /** 
    * Store the current resource index and position in the resource. 
    */ 
    public void update(ExecutionContext executionContext) throws ItemStreamException { 
     if (saveState) { 
      index.update(executionContext); 
     } 
    } 

    /** 
    * @param delegate reads items from single {@link Resource}. 
    */ 
    public void setDelegate(ResourceAwareItemReaderItemStream<? extends T> delegate) { 
     this.delegate = delegate; 
    } 

    /** 
    * Set the boolean indicating whether or not state should be saved in the 
    * provided {@link ExecutionContext} during the {@link ItemStream} call to 
    * update. 
    * 
    * @param saveState 
    */ 
    public void setSaveState(boolean saveState) { 
     this.saveState = saveState; 
    } 

    /** 
    * @param comparator used to order the injected resources, by default 
    * compares {@link Resource#getFilename()} values. 
    */ 
    public void setComparator(Comparator<Resource> comparator) { 
     this.comparator = comparator; 
    } 

    /** 
    * @param resources input resources 
    */ 
    public void setResources(Resource[] resources) { 
     this.resources = resources; 
    } 

    /** 
    * Facilitates keeping track of the position within multi-resource input. 
    */ 
    private class MultiResourceIndex { 

     private static final String RESOURCE_KEY = "resourceIndex"; 

     private static final String ITEM_KEY = "itemIndex"; 

     private int currentResource = 0; 

     private int markedResource = 0; 

     private int currentItem = 0; 

     private int markedItem = 0; 

     public void incrementItemCount() { 
      currentItem++; 
     } 

     public void incrementResourceCount() { 
      currentResource++; 
      currentItem = 0; 
     } 

     public void mark() { 
      markedResource = currentResource; 
      markedItem = currentItem; 
     } 

     public void reset() { 
      currentResource = markedResource; 
      currentItem = markedItem; 
     } 

     public void open(ExecutionContext ctx) { 
      if (ctx.containsKey(executionContextUserSupport.getKey(RESOURCE_KEY))) { 
       currentResource = ctx.getInt(executionContextUserSupport.getKey(RESOURCE_KEY)); 
      } 

      if (ctx.containsKey(executionContextUserSupport.getKey(ITEM_KEY))) { 
       currentItem = ctx.getInt(executionContextUserSupport.getKey(ITEM_KEY)); 
      } 
     } 

     public void update(ExecutionContext ctx) { 
      ctx.putInt(executionContextUserSupport.getKey(RESOURCE_KEY), index.currentResource); 
      ctx.putInt(executionContextUserSupport.getKey(ITEM_KEY), index.currentItem); 
     } 
    } 

@Override 
public void afterPropertiesSet() throws Exception { 
    // TODO Auto-generated method stub 

} 

@Override 
public void setResource(Resource resource) { 
    // TODO Auto-generated method stub 

} 

} 

Configuration Files для Весна:

<batch:step id="readFromCSVFileAndUploadToDB" next="stepMovePdwFile"> 
      <batch:tasklet transaction-manager="transactionManager"> 
       <batch:chunk reader="multiResourceReader" writer="rawItemDatabaseWriter" 
        commit-interval="500" skip-policy="pdwUploadSkipPolicy" /> 
      </batch:tasklet> 
     </batch:step> 

<bean id="multiResourceReader" 
     class="org.springframework.batch.item.file.MultiResourceItemReader" scope="step"> 
     <property name="resource" value="file:#{jobParameters[filePath]}/*.dat" /> 
     <property name="delegate" ref="rawItemCsvReader"></property> 
    </bean> 


    <bean id="rawItemCsvReader" class="org.kp.oppr.remediation.batch.csv.RawItemCsvReader" 
     scope="step"> 
     <property name="resources" value="file:#{jobParameters[filePath]}/*.dat" /> 
     <property name="columnNames" value="${columnNames}" /> 
     <property name="delimiter" value="${delimiter}" /> 
    </bean> 

ответ

0

Используйте стандартный FlatFileItemReader (правильно настроен с помощью XML) вместо вашего RawItemCsvReader в качестве делегата.
Это решение ответит на ваш вопрос, потому что FlatFileItemReader реализует AbstractItemStreamItemReader.
Помните: SB основан на делегировании; писать класс, как ваш читатель, редко запрашивается.

Смежные вопросы