2013-07-23 2 views
2

В настоящее время я отправляю партии строк в CopyOnWrite ArrayLists с ExecutorService для обработки параллельно, где задача Runnable, обрабатывающая эти списки, должна перебирать список и обрабатывать каждую строку.Неверная итерация CopyOnWriteArrayList с многопоточным?

После того, как вы столкнулись с проблемами с параллелизмом с регулярными массивами ArrayLists, я попытался использовать CopyOnWriteArrayLists, потому что они потокобезопасны, однако мои результаты теперь несовместимы. То есть, я получаю разные результаты при каждом запуске программы, предполагая, что содержимое arraylist каким-то образом изменено, прежде чем каждый Runnable taks сможет полностью перебрать его.

public static class BatchRunnable implements Runnable { 

    private CopyOnWriteArrayList<String> batch; 

    BatchRunnable(CopyOnWriteArrayList<String> batch){ 
     this.batch = batch; 
    } 

    @Override 
    public void run(){ 
     //iterate over batch and work with String elements 
     //make no modifications to batch 
    } 
} 
  • с выполняемой задачей не делают никаких изменений в ArrayList, это только итерация по списку и использует строковые элементы списка, чтобы сделать обработку.

  • ТОЛЬКО место, в которое изменяется экземпляр CopyOnWriteArrayList, находится в его экземпляре с каждой новой задачей Runnable.

Когда я проходил в одиночных строках вместо пакетирования, я имел последовательные и правильные результаты, но когда я начал использовать партию в Струнных ArrayLists, я получил противоречивые результаты, предлагая что-то компрометирует параллелизм CopyOnWriteArrayList партии , несмотря на то, что он якобы является потокобезопасным.

Любая помощь приветствуется, спасибо!

EDIT: Вот где мои партии строятся:

 Runnable worker = null; 
     while((line = br.readLine()) != null) { 
      recordBatch.add(line); 
      if(recordBatch.size() == 100){ 
       worker = new BatchRunnable(recordBatch); 
       executor.execute(worker); 
       recordBatch.clear(); 
      } 

     }   
     executor.shutdown(); 
     executor.awaitTermination(60,TimeUnit.SECONDS); 
+0

Если вы были полностью построения партии, прежде чем итерацию над ними (то есть никакие другие потоки не изменяли вещи), то я бы ожидал, что вы будет хорошо с ArrayList в любом случае. Это говорит о том, что у вас есть что-то неожиданное изменение ваших партий. Вы должны изучить эту сторону вещей. –

+0

Я добавил, как я строю партии, может быть, есть основания полагать, что это вызывает проблемы? – ddnm

+0

Боковой комментарий: вместо этого вы должны использовать блокирующую очередь - это именно то, что они должны были делать ... – assylias

ответ

2

Посмотрите на ваш while петли:

while((line = br.readLine()) != null) { 
     recordBatch.add(line); 
     if(recordBatch.size() == 100){ 
      worker = new BatchRunnable(recordBatch); 
      executor.execute(worker); 
      recordBatch.clear(); 
     } 

    } 

Вы передаете ссылку на тот же list во всех BatchRunnable. Итак, как только вы меняете list в одном месте, он будет отражен во всей ссылке. Итак, как только вы очистите свой список, используя recordBatch.clear(), список пуст для всех ссылок, даже тот, который у вас есть в BatchRunnable. Вот почему вы получаете непоследовательный результат.

Вы должны пропускать copy вашего recordBatch списка в вашем BatchRunnable:

worker = new BatchRunnable(new ArrayList<String>(recordBatch)); 
+0

@howlerdingdong. Проверьте мое редактирование. Вам нужно передать копию своего списка. Измените свой первый оператор в 'if' на тот, который я опубликовал в конце. –

+0

Успех! Спасибо. Я не понимал, что передаю ссылку, которая может вызвать проблемы! – ddnm

0

Я предполагаю, что вы используете итератор, чтобы пройти через элементы. Итератор предоставляет моментальный снимок состояния списка при построении итератора. Во время прохождения итератора синхронизация не требуется.

Так что в вашем случае вы должны приобрести итератор в конструкторе или сделать копию CopyOnWriteArrayList.

0

Если вы работаете в вопросах параллельности с равниной ArrayList, это показывает, что он изменяется в то время как ваш BatchRunnable являются итерацию на нем , Замена ArrayList на CopyOnWriteArrayList только скроет проблемы с параллелизмом.

В вашем коде вы изменяете (clear() и add() этот список при создании BatchRunnable.Когда представлен первый runnable, он начинает обработку списка, но вы все равно продолжаете его изменять.

1

Вы очищаете партию после прохождения ее до BatchRunnable.

worker = new BatchRunnable(recordBatch); 
executor.execute(worker); 
recordBatch.clear(); // You clear all the list 

Так исполнитель будет обрабатывать все, что находится в списке, но если ясно() линия будет достигнута (и поскольку exeuctor работает в другом потоке, это может произойти до BatchRunnable отделки), чем список будет пустым (или содержать следующую партию!), а рабочая партия будет иметь непоследовательный список.

Когда вы передаете список работнику, вы передаете ссылку, а не копию! так или скопировать партию или создать новую для каждой партии:

worker = new BatchRunnable(recordBatch); 
executor.execute(worker); 
recordBatch = new CopyOnWriteArrayList<String>(); 
Смежные вопросы