Я пытаюсь реализовать механизм, в котором исполняемые файлы являются производителями и потребителями;Производители потребительских товаров; вторая партия не должна появляться до тех пор, пока не завершится предыдущая партия
Ситуация
это-мне нужно прочитать записи из БД в пакетном режиме, и обработать то же самое. Я пытаюсь использовать шаблон производителя. Я получаю пакет, я обрабатываю. Получите пакетный процесс. Это получает пакет, когда он видит, что очередь пуста. Один из потоков идет и получает вещи. Но проблема в том, что я не могу пометить записи, которые получаются для обработки, и это мое ограничение. Итак, если мы выберем следующую партию, прежде чем полностью выполнить предыдущий, я могу снова взять те же записи. Поэтому я должен быть в состоянии представить предыдущий, прежде чем потянуть другого. Я смущаюсь о том, что мне следует делать здесь. Я попробовал сохранить счетчик, а затем держал его до тех пор, пока этот счет не будет достигнут.
Каков наилучший способ справиться с этой ситуацией? Обработка записей из БД в кусках - самое большое ограничение, которое я имею здесь, это то, что я не могу отметить записи, которые были подобраны. Итак, я хочу, чтобы партии шли последовательно. Но пакет должен использовать многопоточность внутри.
public class DealStoreEnricher extends AsyncExecutionSupport {
private static final int BATCH_SIZE = 5000;
private static final Log log = LogFactory.getLog(DealStoreEnricher.class);
private final DealEnricher dealEnricher;
private int concurrency = 10;
private final BlockingQueue<QueryDealRecord> dealsToBeEnrichedQueue;
private final BlockingQueue<QueryDealRecord> dealsEnrichedQueue;
private DealStore dealStore;
private ExtractorProcess extractorProcess;
ExecutorService executor;
public DealStoreEnricher(DealEnricher dealEnricher, DealStore dealStore, ExtractorProcess extractorProcess) {
this.dealEnricher = dealEnricher;
this.dealStore = dealStore;
this.extractorProcess = extractorProcess;
dealsToBeEnrichedQueue = new LinkedBlockingQueue<QueryDealRecord>();
dealsEnrichedQueue = new LinkedBlockingQueue<QueryDealRecord>(BATCH_SIZE * 3);
}
public ExtractorProcess getExtractorProcess() {
return extractorProcess;
}
public DealEnricher getDealEnricher() {
return dealEnricher;
}
public int getConcurrency() {
return concurrency;
}
public void setConcurrency(int concurrency) {
this.concurrency = concurrency;
}
public DealStore getDealStore() {
return dealStore;
}
public DealStoreEnricher withConcurrency(int concurrency) {
setConcurrency(concurrency);
return this;
}
@Override
public void start() {
super.start();
executor = Executors.newFixedThreadPool(getConcurrency());
for (int i = 0; i < getConcurrency(); i++)
executor.submit(new Runnable() {
public void run() {
try {
QueryDealRecord record = null;
while ((record = get()) != null && !isCancelled()) {
try {
update(getDealEnricher().enrich(record));
processed.incrementAndGet();
} catch (Exception e) {
failures.incrementAndGet();
log.error("Failed to process deal: " + record.getTradeId(), e);
}
}
} catch (InterruptedException e) {
setCancelled();
}
}
});
executor.shutdown();
}
protected void update(QueryDealRecord enrichedRecord) {
dealsEnrichedQueue.add(enrichedRecord);
if (batchComplete()) {
List<QueryDealRecord> enrichedRecordsBatch = new ArrayList<QueryDealRecord>();
synchronized (this) {
dealsEnrichedQueue.drainTo(enrichedRecordsBatch);
}
if (!enrichedRecordsBatch.isEmpty())
updateTheDatabase(enrichedRecordsBatch);
}
}
private void updateTheDatabase(List<QueryDealRecord> enrichedRecordsBatch) {
getDealStore().insertEnrichedData(enrichedRecordsBatch, getExtractorProcess());
}
/**
* @return true if processed records have reached the batch size or there's
* nothing to be processed now.
*/
private boolean batchComplete() {
return dealsEnrichedQueue.size() >= BATCH_SIZE || dealsToBeEnrichedQueue.isEmpty();
}
/**
* Gets an item from the queue of things to be enriched
*
* @return {@linkplain QueryDealRecord} to be enriched
* @throws InterruptedException
*/
protected synchronized QueryDealRecord get() throws InterruptedException {
try {
if (!dealsToBeEnrichedQueue.isEmpty()) {
return dealsToBeEnrichedQueue.take();
} else {
List<QueryDealRecord> records = getNextBatchToBeProcessed();
if (!records.isEmpty()) {
dealsToBeEnrichedQueue.addAll(records);
return dealsToBeEnrichedQueue.take();
}
}
} catch (InterruptedException ie) {
throw new UnRecoverableException("Unable to retrieve QueryDealRecord", ie);
}
return null;
}
private List<QueryDealRecord> getNextBatchToBeProcessed() {
List<QueryDealRecord> recordsThatNeedEnriching = getDealStore().getTheRecordsThatNeedEnriching(getExtractorProcess());
return recordsThatNeedEnriching;
}
@Override
public void stop() {
super.stop();
if (executor != null)
executor.shutdownNow();
}
@Override
public boolean await() throws InterruptedException {
return executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS) && !isCancelled() && complete();
}
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return executor.awaitTermination(timeout, unit) && !isCancelled() && complete();
}
private boolean complete() {
setCompleted();
return true;
}
}
Может быть, вы хотите [ 'Phaser'] (http://docs.oracle.com/javase /7/docs/api/java/util/concurrent/Phaser.html) для управления «текущей периодичностью» для всех потоков. – Holger
Если вы хотите иметь две задачи, где нужно подождать, пока другой будет выполнен, прежде чем вы сделаете другой, самое простое - это иметь один поток. Многопоточные работы лучше всего работают, когда у них есть независимые задачи для выполнения, т. Е. Никто не ждет другого. –