2012-03-28 3 views
0

Я новичок в многопоточном программировании и задаюсь вопросом. Как получить каждый поток для итерации по всем элементам в списке, добавляемом другим потоком?Итерация через список, изменяемый другим потоком

Вот простую программу для демонстрации. У меня есть один список целых чисел и 10 потоков, пронумерованных от 1 до 10, которые работают над ним. Каждый поток должен записывать все значения в списке в StringBuilder. После того, как поток записывает все значения в списке, он добавляет его номер в список, а затем завершает работу.

Я пытаюсь, чтобы каждый поток продолжал проверять список элементов до тех пор, пока список больше не будет изменен никаким другим потоком, но у меня возникнут проблемы с блокировкой на нем. В случае успеха, эта программа будет иметь выход, который может выглядеть как:

3: 1, 
8: 1,3,2,4,5,7, 
6: 1,3,2,4,5,7,8, 
9: 1,3,2,4,5,7,8,6, 
7: 1,3,2,4,5, 
10: 1,3,2,4,5,7,8,6,9, 
5: 1,3,2,4, 
4: 1,3,2, 
2: 1,3, 
1: 

Что случается иногда, но часто два или более нитей закончить до того, как замок установлен, поэтому итерация заканчивается преждевременно:

1: 
2: 1,5,4,8,7,3,10, 
10: 1,5,4,8,7,3, 
9: 1,5,4,8,7,3,10,2, 
3: 1,5,4,8,7, 
7: 1,5,4,8, 
5: 1, <<one of these threads didn't wait to stop iterating. 
4: 1, << 
8: 1,5,4, 
6: 1,5,4,8,7,3,10,2, 

У кого-нибудь есть идеи?

===========

import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.locks.ReentrantLock; 

public class ListChecker implements Runnable{ 

static List<Integer> list = new ArrayList<Integer>(); 
static ReentrantLock lock = new ReentrantLock(); 

int id; 
StringBuilder result = new StringBuilder(); 

public ListChecker(int id){ 
    this.id = id; 
} 

@Override 
public void run() { 
    int i=0; 

    do{ 
     while (i < list.size()){    
      result.append(list.get(i++)).append(','); 
     } 
     if (!lock.isLocked()){ 
      break; 
     } 
    }while (true);     
    addElement(id); 
    System.out.println(id + ": " + result.toString()); 
} 

public void addElement(int element){ 
    try{ 
     lock.lock(); 
     list.add(element); 
    }finally{ 
     lock.unlock(); 
    } 
} 


public static void main(String[] args){ 
    for(int i=1; i<=10; i++){ 
     ListChecker checker = new ListChecker(i); 
     new Thread(checker).start(); 
    } 
} 
} 

Edit: Спасибо за помощь до сих пор. Я должен уточнить, что я хотел бы, чтобы каждый поток повторялся через список в одно и то же время. В моем случае есть много обработки, которая должна выполняться по каждому элементу списка по каждому потоку (вместо добавления к StringBuffer, я делаю много сравнений элемента-кандидата со списком финалистов). Таким образом, для того, чтобы каждый поток мог работать над одним и тем же списком одновременно, для многопоточности требуется улучшить мою производительность. Таким образом, я не думаю, что блокировка вокруг всей итерации, или включение всей итерации - это синхронизированный (список) блок, будет работать.


Редактировать 2: Я думаю, что получил. Хитрость заключалась не только в синхронизации в списке при добавлении в него элементов, но и при определении того, есть ли еще элементы. Это препятствует тому, чтобы поток 2 прекратил свою итерацию до того, как поток 1 завершит добавление в список. Он выглядит немного глупым, но это сохраняет код, который мне нужно запускать в нескольких потоках вне блока синхронизации, поэтому мой реальный случай должен получить увеличение производительности, в котором я нуждаюсь.

Спасибо всем, кто помог!

import java.util.ArrayList; 
import java.util.List; 

public class ListChecker2 implements Runnable{ 

    static List<Integer> list = new ArrayList<Integer>(); 

    int id; 
    StringBuilder result = new StringBuilder(); 

    public ListChecker2(int id){ 
     this.id = id; 
    } 

    @Override 
    public void run() { 
     int i = 0; 
     do{ 
      synchronized (list) {   
       if (i >= list.size()){ 
        list.add(id); 
        System.out.println(id + ": " + result.toString()); 
        return; 
       } 
      } 
      result.append(list.get(i++)).append(','); 
      System.out.println("running " + id); 

     }while(true); 
    } 


    public static void main(String[] args){ 
     for(int i=1; i<=30; i++){ 
      ListChecker2 checker = new ListChecker2(i); 
      new Thread(checker).start(); 
     } 
    } 
} 
+0

Опасность состоит в том, что один поток попадет в конец списка (1) и добавит новую запись (2), в то время как другой поток добавит свою запись между (1) и (2). После этого у вас будет поток, который не просмотрел все записи. Не могли бы вы использовать специально созданный список, который автоматически блокирует себя в вашем потоке, как только он будет итерации к последней записи? – OldCurmudgeon

ответ

1

Вы можете сделать это гораздо проще за счет синхронизации в списке.

public class ListChecker implements Runnable { 
    // Number of threads. 
    static final int THREADS = 10; 
    // The list. 
    static final List<Integer> list = new ArrayList<Integer>(THREADS); 

    // My ID 
    int id; 

    public ListChecker(int id) { 
    // Remember my ID. 
    this.id = id; 
    } 

    @Override 
    public void run() { 
    // My string. 
    StringBuilder result = new StringBuilder(); 
    // Wait for exclusive access to the list. 
    synchronized (list) { 
     // Build my string. 
     for (Integer i : list) { 
     result.append(i).append(","); 
     } 
     // Add me to the list. 
     list.add(id); 
    } 
    System.out.println(id + ": " + result.toString()); 
    } 

    public static void main(String[] args) { 
    for (int i = 1; i <= THREADS; i++) { 
     ListChecker checker = new ListChecker(i); 
     new Thread(checker).start(); 
    } 
    } 
} 

Это мой выход. Я немного боюсь, но это доказывает, что это работает.

1: 
2: 1, 
3: 1,2, 
4: 1,2,3, 
5: 1,2,3,4, 
6: 1,2,3,4,5, 
7: 1,2,3,4,5,6, 
8: 1,2,3,4,5,6,7, 
9: 1,2,3,4,5,6,7,8, 
10: 1,2,3,4,5,6,7,8,9, 

Добавлено

Если вам нужно, чтобы избежать блокировок всего списка (как ваша правка предполагает), вы можете попробовать специальный список, который запирает себя всякий раз, когда это доставляет последнюю запись. Вам тогда нужно специально разблокировать его, конечно.

К сожалению, эта техника не очень хорошо справляется с ситуацией с пустым списком. Возможно, вы можете придумать подходящее решение.

public class ListChecker implements Runnable { 
    // Number of threads. 
    static final int THREADS = 20; 
    // The list. 
    static final EndLockedList<Integer> list = new EndLockedList<Integer>(); 
    // My ID 
    int id; 

    public ListChecker(int id) { 
    // Remember my ID. 
    this.id = id; 
    } 

    @Override 
    public void run() { 
    // My string. 
    StringBuilder result = new StringBuilder(); 
    // Build my string. 
    for (int i = 0; i < list.size(); i++) { 
     result.append(i).append(","); 
    } 
    // Add me to the list. 
    list.add(id); 
    // Unlock the end lock. 
    list.unlock(); 
    System.out.println(id + ": " + result.toString()); 
    } 

    public static void main(String[] args) { 
    for (int i = 0; i < THREADS; i++) { 
     ListChecker checker = new ListChecker(i + 1); 
     new Thread(checker).start(); 
    } 
    } 

    private static class EndLockedList<T> extends ArrayList<T> { 
    // My lock. 
    private final Lock lock = new ReentrantLock(); 
    // Were we locked? 
    private volatile boolean locked = false; 

    @Override 
    public boolean add(T it) { 
     lock.lock(); 
     try { 
     return super.add(it); 
     } finally { 
     lock.unlock(); 
     } 
    } 

    // Special get that locks the list when the last entry is got. 
    @Override 
    public T get(int i) { 
     // Get it. 
     T it = super.get(i); 
     // If we are at the end. 
     if (i == size() - 1) { 
     // Speculative lock. 
     lock.lock(); 
     // Still at the end? 
     if (i < size() - 1) { 
      // Release the lock! 
      lock.unlock(); 
     } else { 
      // Remember we were locked. 
      locked = true; 
      // It is now locked for putting until specifically unlocked. 
     } 
     } 
     // That's the one. 
     return it; 
    } 

    // Unlock. 
    void unlock() { 
     if (locked) { 
     locked = false; 
     lock.unlock(); 
     } 
    } 
    } 
} 

Выход (обратите внимание на неправильном обращение с пустым списком):

2: 
8: 
5: 
1: 
4: 
7: 
6: 
9: 
3: 
10: 0,1,2,3,4,5,6,7,8, 
11: 0,1,2,3,4,5,6,7,8,9, 
12: 0,1,2,3,4,5,6,7,8,9,10, 
13: 0,1,2,3,4,5,6,7,8,9,10,11, 
14: 0,1,2,3,4,5,6,7,8,9,10,11,12, 
15: 0,1,2,3,4,5,6,7,8,9,10,11,12,13, 
16: 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14, 
17: 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15, 
18: 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16, 
19: 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17, 
20: 0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18, 
+0

Спасибо, O.C. См. Мое редактирование; Я думаю, это ограничивает итерацию одним потоком, который я бы хотел предотвратить. –

+0

Указывает на то, что ваши комментарии, наконец, сделали этот материал понятным, и идея EndLockedList была довольно умной. –

1

Вы выбрали жесткий футляр, чтобы ваши ноги были мокрыми. Чтение и запись объектов в то же время является сложным, никогда не рекомендуется, и часто специально предотвращено (т.е. не будет работать с использованием итератора и список)

НО

Если вы должны сделать это, вы должны синхронизироваться на ваш список, прежде чем добавлять элемент, и после добавления элемента вызовите list.notifyAll, который пробудит потоки, ожидающие большего количества результатов. Между тем, ваши другие потоки должны быть прочитаны до конца, а затем синхронизироваться в списке (вам нужно синхронизировать объект, чтобы вызвать wait, notify или notifyAll) и вызвать ожидание на нем.

BTW. Более простой способ сделать это - использовать шаблон слушателя/наблюдателя/наблюдаемого, хотя шаблон прослушивателя работает (как правило) с одним потоком. Возможно, вам понадобится Google для учебных пособий по этому вопросу.

0

вы можете использовать метод, как Collections.synchronizedList (список List)

это как вы его используете:

List list = Collections.synchronizedList(new ArrayList()); 
    ... 
    synchronized (list) { 
    Iterator i = list.iterator(); // Must be in synchronized block 
    while (i.hasNext()) 
     foo(i.next()); 
    } 
+0

Спасибо, Том! См. Мое редактирование; Я хотел бы, чтобы итерация выполнялась в нескольких потоках одновременно, но, по-моему, это изменение ограничивает его одним потоком за раз. –

0

Пожалуйста, обновите ваш метод запуска. и дайте мне знать, если он работает по ожиданию ура или нет?

public void run() { 
int i=0; 
    lock.lock(); 
    while (i < list.size()){    
     result.append(list.get(i++)).append(','); 
} 
addElement(id); 
lock.unlock();`` 
System.out.println(id + ": " + result.toString()); 
} 
+0

Это дает правильные результаты, но если я правильно его понимаю, он делает это, ограничивая итерацию одним потоком за раз. Вы можете видеть это, если вы печатаете идентификатор каждого потока в цикле while, особенно если вы увеличиваете количество потоков от 10 до 30 или около того. Я думаю, что решение Тома по размещению Итератора в синхронизированном (списке) блоке делает то же самое. В идеале несколько потоков могут одновременно итерировать один и тот же список, но затем приостанавливать всякий раз, когда список обновляется. Но спасибо за вашу помощь! –

+0

Да, поскольку список является общим, мы должны синхронизировать доступ, иначе результат будет неправильным –

Смежные вопросы