2013-09-25 2 views
12

Весной партии Мне нужно передать элементы, прочитанные ItemReader, на два разных процессора и записи. То, что я пытаюсь добиться в том, что ...Spring Batch: один читатель, несколько процессоров и писателей

 
         +---> ItemProcessor#1 ---> ItemWriter#1 
         | 
ItemReader ---> item ---+ 
         | 
         +---> ItemProcessor#2 ---> ItemWriter#2 

Это необходимо потому, что пункты, написанные ItemWriter # 1 должны быть обработаны в совершенно иначе по сравнению с теми, написанным ItemWriter # 2. Кроме того, ItemReader считывает элемент из базы данных, и запросы, которые он выполняет, настолько дороги, что выполнение одного и того же запроса дважды должно быть отброшено.

Любой намек на то, как достичь такой настройки? Или, по крайней мере, логически эквивалентная настройка?

ответ

-1

это решение, которое я придумал.

Итак, идея состоит в том, чтобы закодировать новый Writer, который «содержит» как ItemProcessor, так и ItemWriter. Чтобы дать вам представление, мы назвали его PreprocessoWriter, и это основной код.

private ItemWriter<O> writer; 
private ItemProcessor<I, O> processor; 

@Override 
public void write(List<? extends I> items) throws Exception { 
    List<O> toWrite = new ArrayList<O>(); 
    for (I item : items) { 
     toWrite.add(processor.process(item)); 
    } 
    writer.write(toWrite); 
} 

Есть много вещей, оставшихся в стороне. Например, управление элементом ItemStream. Но в нашем конкретном сценарии этого было достаточно.

Таким образом, вы можете просто комбинировать несколько препроцессоров с CompositeWriter.

+0

Смешивание процессора и писателя - плохая идея: эти компоненты создали отдельно, чтобы сделать процесс и написать две отдельные проблемы –

+0

Я согласен с вами в общей концепции. Однако, как вы, конечно же, читали исходный вопрос, проблема заключалась в том, чтобы читать элементы один раз и обрабатывать/записывать их двумя совершенно разными способами. Итак, можете ли вы поделиться тем, как решить конкретную проблему, о которой идет речь? – danidemi

+1

Проверьте мой ответ. Обработка выполняется с использованием составного процессора, принимающего один элемент в качестве входных данных, и пользовательский компонент (CompositeResultBean), содержащий результат многократной обработки. Запись с использованием делегирования: CompositeResultBean - это вход и вызов правильного делегированного автора для CompositeResultBean.result1 и CompositeResultBean.result2. Прочитайте один раз, обработайте/напишите с помощью отдельного ItemProcessor/ItemWriter, используя состав и делегирование. Все, что сделано без концепций смешивания –

2

Вы можете использовать CompositeItemProcessor и CompositeItemWriter

Это не будет выглядеть так же, как ваша схема, он будет последовательным, но это будет делать эту работу.

+0

Не думаю, что это достаточно. Я просто закончу «сложным» процессором, но в конечном итоге будет только один обработанный элемент, переданный только одному элементу ItemWriter! Вместо этого мне нужен ItemWriter # 1, чтобы написать элемент, который не тот, который будет написан ItemWriter # 2! Требование состоит в том, что два элемента ItemWriters действуют на разные предметы! Имейте в виду, что в весенней партии у вас есть шаг, который является тройным: один ItemReader, один ItemProcessor, один ItemWriter. – danidemi

+0

Thanx для обозначения этих компонентов. Они решили мою проблему – Ravikiran763

8

Это решение справедливо, если ваш товар должен быть обработан процессором # 1 и процессора # 2

Вы должны создать процессор # 0 с этой подписью:

class Processor0<Item, CompositeResultBean> 

где CompositeResultBean является боб определяется как

class CompositeResultBean { 
    Processor1ResultBean result1; 
    Processor2ResultBean result2; 
} 

в вашем Processor # 0 просто делегировать работу процессоров # 1 и # 2 и положить в результате CompositeResultBean

CompositeResultBean Processor0.process(Item item) { 
    final CompositeResultBean r = new CompositeResultBean(); 
    r.setResult1(processor1.process(item)); 
    r.setResult2(processor2.process(item)); 
    return r; 
} 

Ваш собственный писатель является CompositeItemWriter, что делегат писателя CompositeResultBean.result1 или CompositeResultBean.result2 (посмотреть на PropertyExtractingDelegatingItemWriter, возможно, может помочь)

+0

К сожалению, требования действительно в том, что один и тот же элемент должен обрабатываться процессорами BOTH. Чтобы дать вам еще один контекст, читатель читает одну запись из db, а затем процессор # 1 должен сопоставлять имена полей с другими именами полей, а писатель # 1 будет записывать сопоставленный элемент в другой db. Процессор # 2, начиная с того же исходного элемента, должен выполнить совершенно другую разработку, а затем написать элемент в старой системе. – danidemi

+0

Хорошо, у меня есть решение также для этой проблемы. просто дождитесь редактирования –

+0

@bellabax - Мне было бы интересно увидеть ваше решение. Возможно, у вас есть время обновить его здесь? – emeraldjava

0

Существует другое решение, если у вас есть разумное количество предметов (например, менее 1 Go): вы можете кэшировать результат вашего выбора в коллекцию, завернутую в компонент Spring.

Тогда вы можете просто прочитать коллекцию дважды без каких-либо затрат.

+0

Да, это действительно так. Но что произойдет, если первый писатель не сработает при написании статьи? Если я правильно понял вашу идею, я думаю, что второй автор не получит никакой возможности написать какой-либо предмет. Это верно ? – danidemi

+0

ммм нет, авторы не передают данные друг другу, они читают то же самое в коллекции памяти из одного весеннего одноэлементного компонента. – Tristan

2

Я следовал за предложением Luca использовать PropertyExtractingDelegatingItemWriter в качестве автора, и я смог работать с двумя разными объектами за один шаг.

Прежде всего то, что я сделал, чтобы определить DTO, который хранит два объекта/результаты от процессора

public class DatabaseEntry { 
    private AccessLogEntry accessLogEntry; 
    private BlockedIp blockedIp; 

    public AccessLogEntry getAccessLogEntry() { 
     return accessLogEntry; 
    } 

    public void setAccessLogEntry(AccessLogEntry accessLogEntry) { 
     this.accessLogEntry = accessLogEntry; 
    } 

    public BlockedIp getBlockedIp() { 
     return blockedIp; 
    } 

    public void setBlockedIp(BlockedIp blockedIp) { 
     this.blockedIp = blockedIp; 
    } 
} 

Тогда я прошел этот DTO к писателю, а PropertyExtractingDelegatingItemWriter класса, где я определяю два настроенных методы записать объекты в базу данных, см мой писатель код ниже:

@Configuration 
public class LogWriter extends LogAbstract { 
    @Autowired 
    private DataSource dataSource; 

    @Bean() 
    public PropertyExtractingDelegatingItemWriter<DatabaseEntry> itemWriterAccessLogEntry() { 
     PropertyExtractingDelegatingItemWriter<DatabaseEntry> propertyExtractingDelegatingItemWriter = new PropertyExtractingDelegatingItemWriter<DatabaseEntry>(); 
     propertyExtractingDelegatingItemWriter.setFieldsUsedAsTargetMethodArguments(new String[]{"accessLogEntry", "blockedIp"}); 
     propertyExtractingDelegatingItemWriter.setTargetObject(this); 
     propertyExtractingDelegatingItemWriter.setTargetMethod("saveTransaction"); 
     return propertyExtractingDelegatingItemWriter; 
    } 

    public void saveTransaction(AccessLogEntry accessLogEntry, BlockedIp blockedIp) throws SQLException { 
     writeAccessLogTable(accessLogEntry); 
     if (blockedIp != null) { 
      writeBlockedIp(blockedIp); 
     } 

    } 

    private void writeBlockedIp(BlockedIp entry) throws SQLException { 
     PreparedStatement statement = dataSource.getConnection().prepareStatement("INSERT INTO blocked_ips (ip,threshold,startDate,endDate,comment) VALUES (?,?,?,?,?)"); 
     statement.setString(1, entry.getIp()); 
     statement.setInt(2, threshold); 
     statement.setTimestamp(3, Timestamp.valueOf(startDate)); 
     statement.setTimestamp(4, Timestamp.valueOf(endDate)); 
     statement.setString(5, entry.getComment()); 
     statement.execute(); 
    } 

    private void writeAccessLogTable(AccessLogEntry entry) throws SQLException { 
     PreparedStatement statement = dataSource.getConnection().prepareStatement("INSERT INTO log_entries (date,ip,request,status,userAgent) VALUES (?,?,?,?,?)"); 
     statement.setTimestamp(1, Timestamp.valueOf(entry.getDate())); 
     statement.setString(2, entry.getIp()); 
     statement.setString(3, entry.getRequest()); 
     statement.setString(4, entry.getStatus()); 
     statement.setString(5, entry.getUserAgent()); 
     statement.execute(); 
    } 
} 

при таком подходе вы можете получить хотел щёток поведение от одного читателя для обработки нескольких объектов и сохранять их в один шаг.

Смежные вопросы