У меня есть пользовательский считыватель для чтения данных из файла CSV.Spring Batch для чтения нескольких файлов с таким же расширением
package org.kp.oppr.remediation.batch.csv;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.remediation.batch.csv.FlatFileItemReaderNewLine;
import org.remediation.batch.model.RawItem;
import org.remediation.batch.model.RawItemLineMapper;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.file.LineCallbackHandler;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.batch.item.file.transform.LineTokenizer;
import org.springframework.core.io.Resource;
import org.springframework.util.Assert;
import org.springframework.validation.BindException;
public class RawItemCsvReader extends MultiResourceItemReader<RawItem>
implements StepExecutionListener, LineCallbackHandler,
FieldSetMapper<RawItem> {
static final Logger LOGGER = LogManager.getLogger(RawItemCsvReader.class);
final private String COLUMN_NAMES_KEY = "COLUMNS_NAMES_KEY";
private StepExecution stepExecution;
private DefaultLineMapper<RawItem> lineMapper;
private String[] columnNames;
private Resource[] resources;
// = DelimitedLineTokenizer.DELIMITER_COMMA;
private char quoteCharacter = DelimitedLineTokenizer.DEFAULT_QUOTE_CHARACTER;
private String delimiter;
public RawItemCsvReader() {
setLinesToSkip(0);
setSkippedLinesCallback(this);
}
@Override
public void afterPropertiesSet() {
// not in constructor to ensure we invoke the override
final DefaultLineMapper<RawItem> lineMapper = new RawItemLineMapper();
setLineMapper(lineMapper);
}
/**
* Satisfies {@link LineCallbackHandler} contract and and Acts as the
* {@code skippedLinesCallback}.
*
* @param line
*/
@Override
public void handleLine(String line) {
getLineMapper().setLineTokenizer(getTokenizer());
getLineMapper().setFieldSetMapper(this);
}
private LineTokenizer getTokenizer() {
// this.columnNames = line.split(delimiter);
DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
lineTokenizer.setQuoteCharacter(quoteCharacter);
lineTokenizer.setDelimiter(delimiter);
lineTokenizer.setStrict(true);
lineTokenizer.setNames(columnNames);
addColumnNames();
return lineTokenizer;
}
private void addColumnNames() {
stepExecution.getExecutionContext().put(COLUMN_NAMES_KEY, columnNames);
}
@Override
public void setResources(Resource[] resources) {
this.resources = resources;
super.setResources(resources);
}
/**
* Provides acces to an otherwise hidden field in parent class. We need this
* because we have to reconfigure the {@link LineMapper} based on file
* contents.
*
* @param lineMapper
*/
@Override
public void setLineMapper(LineMapper<RawItem> lineMapper) {
if (!(lineMapper instanceof DefaultLineMapper)) {
throw new IllegalArgumentException(
"Must specify a DefaultLineMapper");
}
this.lineMapper = (DefaultLineMapper) lineMapper;
super.setLineMapper(lineMapper);
}
private DefaultLineMapper getLineMapper() {
return this.lineMapper;
}
/**
* Satisfies {@link FieldSetMapper} contract.
*
* @param fs
* @return
* @throws BindException
*/
@Override
public RawItem mapFieldSet(FieldSet fs) throws BindException {
if (fs == null) {
return null;
}
Map<String, String> record = new LinkedHashMap<String, String>();
for (String columnName : this.columnNames) {
record.put(columnName,
StringUtils.trimToNull(fs.readString(columnName)));
}
RawItem item = new RawItem();
item.setResource(resources);
item.setRecord(record);
return item;
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
@Override
public void beforeStep(StepExecution stepExecution) {
//LOGGER.info("Start Raw Read Step for " + itemResource.getFilename());
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
LOGGER.info("End Raw Read Step for lines read: " + stepExecution.getReadCount()
+ " lines skipped: " + stepExecution.getReadSkipCount());
/*
LOGGER.info("End Raw Read Step for " + itemResource.getFilename()
+ " lines read: " + stepExecution.getReadCount()
+ " lines skipped: " + stepExecution.getReadSkipCount());
*/
return ExitStatus.COMPLETED;
}
public void setDelimiter(String delimiter) {
this.delimiter = delimiter;
}
public void setQuoteCharacter(char quoteCharacter) {
this.quoteCharacter = quoteCharacter;
}
public String[] getColumnNames() {
return columnNames;
}
public void setColumnNames(String[] columnNames) {
this.columnNames = columnNames;
}
public String getDelimiter() {
return delimiter;
}
}
Я хочу использовать MultiResourceItemReader вместе с этим классом, чтобы прочитать несколько файлов с тем же расширением. Я использую Spring MultiResourceItemReader для выполнения задания. Мне нужно знать, как настроить частный делегат ResourceAwareItemReaderItemStream; Экземпляр этого класса
package org.kp.oppr.remediation.batch.csv;
import java.util.Arrays;
import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.file.LineCallbackHandler;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.ResourceAwareItemReaderItemStream;
import org.springframework.batch.item.util.ExecutionContextUserSupport;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.io.Resource;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
public class MultiResourceItemReader <T> implements ItemReader<T>, ItemStream, InitializingBean,ResourceAwareItemReaderItemStream<T> {
static final Logger LOGGER = LogManager
.getLogger(MultipleFlatFileItemReaderNewLine.class);
private final ExecutionContextUserSupport executionContextUserSupport = new ExecutionContextUserSupport();
private ResourceAwareItemReaderItemStream<? extends T> delegate;
private Resource[] resources;
private MultiResourceIndex index = new MultiResourceIndex();
private boolean saveState = true;
// signals there are no resources to read -> just return null on first read
private boolean noInput;
private LineMapper<T> lineMapper;
private int linesToSkip = 0;
private LineCallbackHandler skippedLinesCallback;
private Comparator<Resource> comparator = new Comparator<Resource>() {
/**
* Compares resource filenames.
*/
public int compare(Resource r1, Resource r2) {
return r1.getFilename().compareTo(r2.getFilename());
}
};
public MultiResourceItemReader() {
executionContextUserSupport.setName(ClassUtils.getShortName(MultiResourceItemReader.class));
}
/**
* @param skippedLinesCallback
* will be called for each one of the initial skipped lines
* before any items are read.
*/
public void setSkippedLinesCallback(LineCallbackHandler skippedLinesCallback) {
this.skippedLinesCallback = skippedLinesCallback;
}
/**
* Public setter for the number of lines to skip at the start of a file. Can
* be used if the file contains a header without useful (column name)
* information, and without a comment delimiter at the beginning of the
* lines.
*
* @param linesToSkip
* the number of lines to skip
*/
public void setLinesToSkip(int linesToSkip) {
this.linesToSkip = linesToSkip;
}
/**
* Setter for line mapper. This property is required to be set.
*
* @param lineMapper
* maps line to item
*/
public void setLineMapper(LineMapper<T> lineMapper) {
this.lineMapper = lineMapper;
}
/**
* Reads the next item, jumping to next resource if necessary.
*/
public T read() throws Exception, UnexpectedInputException, ParseException {
if (noInput) {
return null;
}
T item;
item = readNextItem();
index.incrementItemCount();
return item;
}
/**
* Use the delegate to read the next item, jump to next resource if current
* one is exhausted. Items are appended to the buffer.
* @return next item from input
*/
private T readNextItem() throws Exception {
T item = delegate.read();
while (item == null) {
index.incrementResourceCount();
if (index.currentResource >= resources.length) {
return null;
}
delegate.close();
delegate.setResource(resources[index.currentResource]);
delegate.open(new ExecutionContext());
item = delegate.read();
}
return item;
}
/**
* Close the {@link #setDelegate(ResourceAwareItemReaderItemStream)} reader
* and reset instance variable values.
*/
public void close() throws ItemStreamException {
index = new MultiResourceIndex();
delegate.close();
noInput = false;
}
/**
* Figure out which resource to start with in case of restart, open the
* delegate and restore delegate's position in the resource.
*/
public void open(ExecutionContext executionContext) throws ItemStreamException {
Assert.notNull(resources, "Resources must be set");
noInput = false;
if (resources.length == 0) {
LOGGER.warn("No resources to read");
noInput = true;
return;
}
Arrays.sort(resources, comparator);
for(int i =0; i < resources.length; i++)
{
LOGGER.info("Resources after Sorting" + resources[i]);
}
index.open(executionContext);
delegate.setResource(resources[index.currentResource]);
delegate.open(new ExecutionContext());
try {
for (int i = 0; i < index.currentItem; i++) {
delegate.read();
}
}
catch (Exception e) {
throw new ItemStreamException("Could not restore position on restart", e);
}
}
/**
* Store the current resource index and position in the resource.
*/
public void update(ExecutionContext executionContext) throws ItemStreamException {
if (saveState) {
index.update(executionContext);
}
}
/**
* @param delegate reads items from single {@link Resource}.
*/
public void setDelegate(ResourceAwareItemReaderItemStream<? extends T> delegate) {
this.delegate = delegate;
}
/**
* Set the boolean indicating whether or not state should be saved in the
* provided {@link ExecutionContext} during the {@link ItemStream} call to
* update.
*
* @param saveState
*/
public void setSaveState(boolean saveState) {
this.saveState = saveState;
}
/**
* @param comparator used to order the injected resources, by default
* compares {@link Resource#getFilename()} values.
*/
public void setComparator(Comparator<Resource> comparator) {
this.comparator = comparator;
}
/**
* @param resources input resources
*/
public void setResources(Resource[] resources) {
this.resources = resources;
}
/**
* Facilitates keeping track of the position within multi-resource input.
*/
private class MultiResourceIndex {
private static final String RESOURCE_KEY = "resourceIndex";
private static final String ITEM_KEY = "itemIndex";
private int currentResource = 0;
private int markedResource = 0;
private int currentItem = 0;
private int markedItem = 0;
public void incrementItemCount() {
currentItem++;
}
public void incrementResourceCount() {
currentResource++;
currentItem = 0;
}
public void mark() {
markedResource = currentResource;
markedItem = currentItem;
}
public void reset() {
currentResource = markedResource;
currentItem = markedItem;
}
public void open(ExecutionContext ctx) {
if (ctx.containsKey(executionContextUserSupport.getKey(RESOURCE_KEY))) {
currentResource = ctx.getInt(executionContextUserSupport.getKey(RESOURCE_KEY));
}
if (ctx.containsKey(executionContextUserSupport.getKey(ITEM_KEY))) {
currentItem = ctx.getInt(executionContextUserSupport.getKey(ITEM_KEY));
}
}
public void update(ExecutionContext ctx) {
ctx.putInt(executionContextUserSupport.getKey(RESOURCE_KEY), index.currentResource);
ctx.putInt(executionContextUserSupport.getKey(ITEM_KEY), index.currentItem);
}
}
@Override
public void afterPropertiesSet() throws Exception {
// TODO Auto-generated method stub
}
@Override
public void setResource(Resource resource) {
// TODO Auto-generated method stub
}
}
Configuration Files для Весна:
<batch:step id="readFromCSVFileAndUploadToDB" next="stepMovePdwFile">
<batch:tasklet transaction-manager="transactionManager">
<batch:chunk reader="multiResourceReader" writer="rawItemDatabaseWriter"
commit-interval="500" skip-policy="pdwUploadSkipPolicy" />
</batch:tasklet>
</batch:step>
<bean id="multiResourceReader"
class="org.springframework.batch.item.file.MultiResourceItemReader" scope="step">
<property name="resource" value="file:#{jobParameters[filePath]}/*.dat" />
<property name="delegate" ref="rawItemCsvReader"></property>
</bean>
<bean id="rawItemCsvReader" class="org.kp.oppr.remediation.batch.csv.RawItemCsvReader"
scope="step">
<property name="resources" value="file:#{jobParameters[filePath]}/*.dat" />
<property name="columnNames" value="${columnNames}" />
<property name="delimiter" value="${delimiter}" />
</bean>