2013-07-23 4 views
0

Я использую многопоточность для обработки списка строк партиями, однако я получаю эту ошибку, когда задача Runnable выполняет итерацию по списку для обработки каждой строки.java.util.ConcurrentModificationException: Неожиданное изменение списка при многопоточности?

Например, код примерно следующим образом этой структуры:

public class RunnableTask implements Runnable { 

private List<String> batch; 

    RunnableTask(List<String> batch){ 
     this.batch = batch; 
    } 


    @Override 
    public void run() { 

     for(String record : batch){ 

      entry = record.split(","); 
      m = regex.matcher(entry[7]); 

      if (m.find() && m.group(3) != null){ 
       currentKey = m.group(3).trim(); 
       currentValue = Integer.parseInt(entry[4]); 

       if (resultMap.get(currentKey) == null){ 
        resultMap.put(currentKey, currentValue); 
       } else { 
        resultMap.put(currentKey, resultMap.get(currentKey) + currentValue); 
      } 
     } 

    } 
} 
}  

В случае, если поток, который проходит эту партию для обработки никогда не изменяет «партии» и никаких изменений в партию сделана внутри для цикла. Я понимаю, что это исключение ConcurrentModificationException связано с изменением List во время итерации, но насколько я могу судить, этого не происходит. Есть что-то, что мне не хватает?

Любая помощь приветствуется,

Thankyou!

UPDATE1: Кажется, что переменные-экземпляры не являются потокобезопасными. Я попытался использовать CopyOnWriteArrayList вместо ArrayList, но получил непоследовательные результаты - предположив, что полная итерация не завершится до того, как список каким-то образом изменен, а не каждый элемент обрабатывается.

UPDATE2: Блокировка на петле с синхронизированным и/или реентронным замком по-прежнему дает то же исключение.

Мне нужен способ передачи списков запускаемым задачам и перебора этих списков без новых потоков, вызывающих проблемы параллелизма с этим списком.

+0

Учитывая, что вы не показываете код внутри цикла foreach, никто не может сказать. Кроме того, ваш список изменен _outside_ из runnable? – fge

+1

Ставлю пиво, которое либо вызывающий, либо ваш цикл изменяет партию. – user949300

+0

Обработка внутри цикла использует только запись и не ссылается на пакет. – ddnm

ответ

0

Попробуйте это в вашем коде:

public void run() { 
    for(String record : new ArrayList(batch)){ 
     //do processing with record 
    } 
} 

Существует своего рода проблема с всеми вашими темами обработки списка, но трудно сказать, с кодом (это список изменен во время процесса?) вы предоставляете

2

Я понимаю, что это исключение ConcurrentModificationException происходит из-за изменения списка во время итерации, но, насколько я могу сказать, что не происходит

Хорошо, подумайте, что происходит, когда вы создаете новый поток, передавая ссылку на экземпляр RunnableTask, инициализированный другим списком в качестве параметра конструктора? Вы просто изменили ссылку на список, чтобы указать на другой список. И подумайте, что происходит, когда в то же время другая нить внутри метода run() меняет list в любой точке. Это будет в некоторый момент времени, бросьте ConcurrentModificationException.

Переменные экземпляра не являются Thread-Safe.

+0

Я предполагал, что это потокобезопасность, если каждый новый поток является отдельным экземпляром. Могли бы вы предложить модификацию, которая была бы потокобезопасной? – ddnm

+0

Мне что-то не хватает. Его RunnableTask имеет ссылку на Список. Что не меняется после строительства. Нет ссылки на список, есть несколько отдельных ссылок. Насколько я понимаю его код. Примечание: было бы лучше, если бы пакет был объявлен окончательным. – user949300

+0

@howlerdingdong. Вы можете посмотреть [java.util.concurrent.CopyOnWriteArrayList] (http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CopyOnWriteArrayList.html) –

0

Проблема связана с несколькими потоками одновременно с изменением источника List. Я бы предложил вам разделить исходный список на новый подсписок (в соответствии с размером) и передать этот список в потоки.

Сообщите вашим источником List 100 элементов. и вы используете 5 параллельных потоков.

int index = 0; 
List<TObject> tempList = new ArrayList<>(); 
for(TObject obj:srcList){ 
    if(i==(srcList.size()/numberOfthread)){ 
     RunnableTask task = new RunnableTask(tempList); 
     tempList = new ArrayList<>(); 
    }else 
     tempList.add(obj); 
} 

В этом случае ваш первоначальный список не будет изменен.

+0

Каким образом изменение размера партии влияет на параллелизм каким-либо образом? – ddnm

+0

Нет, вы пропустили эту точку. Это создает новый совершенно новый список. Объект –

0

вам необходимо заблокировать список перед доступом к его элементам. потому что List не является потокобезопасным. Попробуйте это

public void run() { 
    synchronizd(batch){ 
     for(String record : batch){//do processing with record} 
    } 
    } 
+0

Использование синхронизированного в цикле for по-прежнему дает мне исключение, потому что изменение в пакете происходит, когда новые потоковые экземпляры изменяют пакет на уровне класса. – ddnm

0

Да, вы получаете ConcurrentModificationException, потому что ваш список изменяется во время итерации. Если производительность не является критичной проблемой, я предлагаю использовать синхронизацию. общественного класса RunnableTask реализует Runnable {

private List<String> batch = new ArrayList<String>(); 

RunnableTask(List<String> batch){ 
    this.batch = batch; 
} 


public void run() { 
    synchronized (batch) { 
     for(String record : batch){//do processing with record} 
     } 
    } 

} 
} 

или даже лучше использовать ReentrantLock.

+0

Производительность - это проблема, к сожалению, я обрабатываю партии в сотнях тысяч. – ddnm

+0

Затем используйте ReentrantLock. Он не блокируется при чтении (что вы, по-видимому, делаете в процессе обработки), он выполняет изменчивое чтение. Он будет блокироваться только для записи. –

+0

Оба реентрактора и синхронизация на самом деле все еще дают мне одно и то же исключение, учитывая, что модификация происходит не из цикла for, а из повторной инициализации партии с каждым новым потоком. – ddnm

0

Ваши последующие действия указывают, что вы пытаетесь повторно использовать один и тот же список несколько раз. Ваш вызывающий должен создать новый список для каждого Runnable.

0

Очевидно, что кто-то еще меняет содержание списка, что не соответствует приведенному вами коду. (Если вы уверены, что ConcurrentModificationException жалуется на batch список, но не resultMap, и вы на самом деле показываете весь код в RunnableTask)

Попробуйте искать в вашем коде, места, которые обновляют содержимое списка, проверьте, возможно ли это одновременно с вашим RunnableTask.

Просто синхронизация в RunnableTask не поможет, вам нужно синхронизировать весь доступ к списку, что, очевидно, происходит где-то в другом месте.

Если производительность является проблемой для вас, так что вы не можете синхронизировать на batch списка (которые запрещают мультипликатор RunnableTask выполнять одновременно), рассмотреть возможность использования ReaderWriterLock: RunnableTask получает читать замок, в то время как логика обновления списка получить блокировку записи ,

Смежные вопросы