2014-12-12 1 views
0

Я пытаюсь реализовать механизм, в котором исполняемые файлы являются производителями и потребителями;Производители потребительских товаров; вторая партия не должна появляться до тех пор, пока не завершится предыдущая партия

Ситуация

это-

мне нужно прочитать записи из БД в пакетном режиме, и обработать то же самое. Я пытаюсь использовать шаблон производителя. Я получаю пакет, я обрабатываю. Получите пакетный процесс. Это получает пакет, когда он видит, что очередь пуста. Один из потоков идет и получает вещи. Но проблема в том, что я не могу пометить записи, которые получаются для обработки, и это мое ограничение. Итак, если мы выберем следующую партию, прежде чем полностью выполнить предыдущий, я могу снова взять те же записи. Поэтому я должен быть в состоянии представить предыдущий, прежде чем потянуть другого. Я смущаюсь о том, что мне следует делать здесь. Я попробовал сохранить счетчик, а затем держал его до тех пор, пока этот счет не будет достигнут.

Каков наилучший способ справиться с этой ситуацией? Обработка записей из БД в кусках - самое большое ограничение, которое я имею здесь, это то, что я не могу отметить записи, которые были подобраны. Итак, я хочу, чтобы партии шли последовательно. Но пакет должен использовать многопоточность внутри.

public class DealStoreEnricher extends AsyncExecutionSupport { 
private static final int BATCH_SIZE = 5000; 
private static final Log log = LogFactory.getLog(DealStoreEnricher.class); 
private final DealEnricher dealEnricher; 
private int concurrency = 10; 
private final BlockingQueue<QueryDealRecord> dealsToBeEnrichedQueue; 
private final BlockingQueue<QueryDealRecord> dealsEnrichedQueue; 
private DealStore dealStore; 
private ExtractorProcess extractorProcess; 
ExecutorService executor; 

public DealStoreEnricher(DealEnricher dealEnricher, DealStore dealStore, ExtractorProcess extractorProcess) { 
    this.dealEnricher = dealEnricher; 
    this.dealStore = dealStore; 
    this.extractorProcess = extractorProcess; 
    dealsToBeEnrichedQueue = new LinkedBlockingQueue<QueryDealRecord>(); 
    dealsEnrichedQueue = new LinkedBlockingQueue<QueryDealRecord>(BATCH_SIZE * 3); 
} 

public ExtractorProcess getExtractorProcess() { 
    return extractorProcess; 
} 

public DealEnricher getDealEnricher() { 
    return dealEnricher; 
} 

public int getConcurrency() { 
    return concurrency; 
} 

public void setConcurrency(int concurrency) { 
    this.concurrency = concurrency; 
} 

public DealStore getDealStore() { 
    return dealStore; 
} 

public DealStoreEnricher withConcurrency(int concurrency) { 
    setConcurrency(concurrency); 
    return this; 
} 

@Override 
public void start() { 
    super.start(); 
    executor = Executors.newFixedThreadPool(getConcurrency()); 
    for (int i = 0; i < getConcurrency(); i++) 
     executor.submit(new Runnable() { 
      public void run() { 
       try { 
        QueryDealRecord record = null; 
        while ((record = get()) != null && !isCancelled()) { 
         try { 
          update(getDealEnricher().enrich(record)); 
          processed.incrementAndGet(); 
         } catch (Exception e) { 
          failures.incrementAndGet(); 
          log.error("Failed to process deal: " + record.getTradeId(), e); 
         } 
        } 
       } catch (InterruptedException e) { 
        setCancelled(); 
       } 
      } 
     }); 

    executor.shutdown(); 
} 

protected void update(QueryDealRecord enrichedRecord) { 
    dealsEnrichedQueue.add(enrichedRecord); 
    if (batchComplete()) { 
     List<QueryDealRecord> enrichedRecordsBatch = new ArrayList<QueryDealRecord>(); 
     synchronized (this) { 
      dealsEnrichedQueue.drainTo(enrichedRecordsBatch); 
     } 
     if (!enrichedRecordsBatch.isEmpty()) 
      updateTheDatabase(enrichedRecordsBatch); 
    } 
} 

private void updateTheDatabase(List<QueryDealRecord> enrichedRecordsBatch) { 
    getDealStore().insertEnrichedData(enrichedRecordsBatch, getExtractorProcess()); 
} 

/** 
* @return true if processed records have reached the batch size or there's 
*   nothing to be processed now. 
*/ 
private boolean batchComplete() { 
    return dealsEnrichedQueue.size() >= BATCH_SIZE || dealsToBeEnrichedQueue.isEmpty(); 
} 

/** 
* Gets an item from the queue of things to be enriched 
* 
* @return {@linkplain QueryDealRecord} to be enriched 
* @throws InterruptedException 
*/ 
protected synchronized QueryDealRecord get() throws InterruptedException { 
    try { 
     if (!dealsToBeEnrichedQueue.isEmpty()) { 
      return dealsToBeEnrichedQueue.take(); 
     } else { 
      List<QueryDealRecord> records = getNextBatchToBeProcessed(); 
      if (!records.isEmpty()) { 
       dealsToBeEnrichedQueue.addAll(records); 
       return dealsToBeEnrichedQueue.take(); 
      } 
     } 
    } catch (InterruptedException ie) { 
     throw new UnRecoverableException("Unable to retrieve QueryDealRecord", ie); 
    } 
    return null; 
} 

private List<QueryDealRecord> getNextBatchToBeProcessed() { 


    List<QueryDealRecord> recordsThatNeedEnriching = getDealStore().getTheRecordsThatNeedEnriching(getExtractorProcess()); 

    return recordsThatNeedEnriching; 
} 

@Override 
public void stop() { 
    super.stop(); 
    if (executor != null) 
     executor.shutdownNow(); 
} 

@Override 
public boolean await() throws InterruptedException { 
    return executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS) && !isCancelled() && complete(); 
} 

@Override 
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { 
    return executor.awaitTermination(timeout, unit) && !isCancelled() && complete(); 
} 

private boolean complete() { 
    setCompleted(); 
    return true; 
} 

}

+0

Может быть, вы хотите [ 'Phaser'] (http://docs.oracle.com/javase /7/docs/api/java/util/concurrent/Phaser.html) для управления «текущей периодичностью» для всех потоков. – Holger

+0

Если вы хотите иметь две задачи, где нужно подождать, пока другой будет выполнен, прежде чем вы сделаете другой, самое простое - это иметь один поток. Многопоточные работы лучше всего работают, когда у них есть независимые задачи для выполнения, т. Е. Никто не ждет другого. –

ответ

2

Вы уже используете BlockingQueue - он делает всю эту работу за вас.

Однако вы используете неправильный метод addAll(), чтобы добавить новые элементы в очередь. Этот метод генерирует исключение, если очередь не может принимать элементы. Скорее вы должны использовать put(), потому что это метод блокировки, соответствующий take(), который вы используете правильно.

Что касается Вашего заявления в ДОЛЖНОСТИ:

второй партия не должен прийти до предыдущей партии завершен

Вам не нужно беспокоиться о сроках поступающих против исходящих пакетов если вы правильно используете BlockingQueue.

1

Похоже, что Semaphore будет отлично работать для вас. Имейте производящую нить acquire семафор, в то время как потребляющая нить release семафор при завершении партии.

BlockingQueue blockingQueue = ...; 
Semapore semaphore = new Semaphore(1); 

Производить-Thread

Batch batch = db.getBatch(); 
semaphore.acquire(); // wait until previous batch completes 
blockingQueue.add(batch); 

Потребляя Thread

for(;;){ 
    Batch batch = blockingQueue.take(); 
    doBatchUpdate(batch); 
    semaphore.release(); // tell next batch to run 
} 
Смежные вопросы