2014-02-17 3 views
2

Вопросы:Как предотвратить, чтобы мои потребительские потоки дважды удаляли последний элемент?

  1. Почему я получаю NoSuchElementException при попытке удалить последний элемент ?
  2. Как это исправить?

У меня есть 3-х классов (смотри ниже), что добавить/удалить типа Integer к LinkedList. Все работает нормально до тех пор, пока не удалятся нитки до последнего элемента.

Кажется, что обе нити пытаются его удалить. Первый преуспевает, второй не может. Но я думал, что этот синхронный метод/синхронизированный атрибут + !sharedList.isEmpty() справится с этим.

Class Производитель: Этот класс должен созданных случайных чисел, положить их в sharedList, писать в консоль, что он просто добавил номер и остановиться, как только она будет прервана. Ожидается только 1 поток этого класса.

import java.util.LinkedList; 

    public class Producer extends Thread 
    { 

     private LinkedList sharedList; 
     private String name; 

     public Producer(String name, LinkedList sharedList) 
     { 
      this.name = name; 
      this.sharedList = sharedList; 
     } 

     public void run() 
     { 
      while(!this.isInterrupted()) 
      { 
       while(sharedList.size() < 100) 
       { 
        if(this.isInterrupted()) 
        { 
         break; 
        } else 
        { 
         addElementToList(); 
        } 
       } 
      } 
     } 

     private synchronized void addElementToList() 
     { 
      synchronized(sharedList) 
      { 
       sharedList.add((int)(Math.random()*100)); 
       System.out.println("Thread " + this.name + ": " + sharedList.getLast() + " added"); 
      } 
      try { 
       sleep(300); 
      } catch (InterruptedException e) { 
       this.interrupt(); 
      } 
     } 
    } 

Класс Потребитель: Этот класс должен удалить первый элемент в sharedList, если он существует. Выполнение должно продолжаться (после прерывания) до sharedList пуст. Ожидается несколько (по крайней мере 2) потоков этого класса.

import java.util.LinkedList; 

public class Consumer extends Thread 
{ 
    private String name; 
    private LinkedList sharedList; 

    public Consumer(String name, LinkedList sharedList) 
    { 
     this.name = name; 
     this.sharedList = sharedList; 
    } 

    public void run() 
    { 
     while(!this.isInterrupted()) 
     { 
      while(!sharedList.isEmpty()) 
      { 
       removeListElement(); 
      } 
     } 
    } 

    private synchronized void removeListElement() 
    { 
     synchronized(sharedList) 
     { 
      int removedItem = (Integer) (sharedList.element()); 
      sharedList.remove(); 
      System.out.println("Thread " + this.name + ": " + removedItem + " removed"); 
     } 
     try { 
      sleep(1000); 
     } catch (InterruptedException e) { 
      this.interrupt(); 
     } 
    } 
} 

Класс MainMethod: Этот класс должен начинать и прерывать нити.

import java.util.LinkedList; 


public class MainMethod 
{ 

    public static void main(String[] args) throws InterruptedException 
    { 
     LinkedList sharedList = new LinkedList(); 
     Producer producer = new Producer("producer", sharedList); 
     producer.start(); 
     Thread.sleep(1000); 
     Consumer consumer1 = new Consumer("consumer1", sharedList); 
     Consumer consumer2 = new Consumer("consumer2", sharedList); 
     consumer1.start(); 
     consumer2.start(); 
     Thread.sleep(10000); 
     producer.interrupt(); 
     consumer1.interrupt(); 
     consumer2.interrupt(); 
    } 

} 

Исключение: Это точное исключение я получаю.

Исключение в потоке "Резьба-2" java.util.NoSuchElementException на java.util.LinkedList.getFirst (LinkedList.java:126) в java.util.LinkedList.element (LinkedList.java:476) в Consumer.removeListElement (Consumer.java:29) в Consumer.run (Consumer.java:20)

+1

Почему бы не использовать ['BlockingQueue'] (http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html) для проблем с потоковыми производителями? – zapl

+0

Почему два цикла? может у, пожалуйста, объясните мне? –

+0

@VishalSanthrama Я думал, что 2 петли необходимы для a) не останавливаться до прерывания (внешний цикл) и b) даже если прерывание не останавливается до тех пор, пока List не будет пустым (внутренний цикл). Если я ошибаюсь. – whateverrest

ответ

3

Ваше исключение является довольно просто объяснить. В

 while(!sharedList.isEmpty()) 
     { 
      removeListElement(); 
     } 

sharedList.isEmpty() происходит вне синхронизации и поэтому один потребитель все еще можно увидеть список пуст, как в то время как другой потребитель уже принял последний элемент.

Потребитель, который неправомерно полагал, что он пуст, не попытается удалить элемент, который больше не существует, что приводит к вашему сбою.

Если вы хотите сделать это потоковым, используя LinkedList, вам нужно будет сделать каждые операция чтения/записи атомарная. Например.

while(!this.isInterrupted()) 
{ 
    if (!removeListElementIfPossible()) 
    { 
     break; 
    } 
} 

и

// method does not need to be synchronized - no thread besides this one is 
// accessing it. Other threads have their "own" method. Would make a difference 
// if this method was static, i.e. shared between threads. 
private boolean removeListElementIfPossible() 
{ 
    synchronized(sharedList) 
    { 
     // within synchronized so we can be sure that checking emptyness + removal happens atomic 
     if (!sharedList.isEmpty()) 
     { 
      int removedItem = (Integer) (sharedList.element()); 
      sharedList.remove(); 
      System.out.println("Thread " + this.name + ": " + removedItem + " removed"); 
     } else { 
      // unable to remove an element because list was empty 
      return false; 
     } 
    } 
    try { 
     sleep(1000); 
    } catch (InterruptedException e) { 
     this.interrupt(); 
    } 
    // an element was removed 
    return true; 
} 

Та же проблема существует в ваших производителей. Но они просто создали бы 110-й элемент или что-то в этом роде.

Хорошим решением проблемы будет использование BlockingQueue. См. Пример documentation. В очереди выполняется вся блокировка синхронизации &, поэтому ваш код не должен беспокоиться.

Редактирование: относительно двух циклов while: вам не нужно использовать 2 цикла, 1 петлю цикла, но вы столкнетесь с другой проблемой: потребители могут видеть, что очередь пуста до того, как производители ее заполнили. Таким образом, вы либо должны убедиться, что в очереди есть что-то, прежде чем оно может быть использовано, либо вам придется вручную останавливать потоки другими способами. You thread.sleep(1000) после запуска производителя должен быть довольно безопасным, но нити не гарантируются даже после 1 секунды. Использовать, например. a CountDownLatch, чтобы сделать его действительно безопасным.

0

Попробуйте поменять местами из

while(!sharedList.isEmpty()) 

и

synchronized(sharedList) 

Я не думаю, что вам нужно синхронизировать на removeListElement().

1

Мне интересно, почему вы не используете уже существующие классы, предлагаемые Java. Я переписал вашу программу, используя ее, и она становится намного короче и легче читать. Кроме того, недостаток synchronized, который блокирует все потоки, кроме тех, кто получает блокировку (и даже делает двойную синхронизацию), позволяет программе фактически запускаться параллельно.

Вот код:

Производитель:

public class Producer implements Runnable { 

    protected final String name; 
    protected final LinkedBlockingQueue<Integer> sharedList; 
    protected final Random random = new Random(); 

    public Producer(final String name, final LinkedBlockingQueue<Integer> sharedList) { 
    this.name = name; 
    this.sharedList = sharedList; 
    } 

    public void run() { 
    try { 
     while (Thread.interrupted() == false) { 
     final int number = random.nextInt(100); 
     sharedList.put(number); 
     System.out.println("Thread " + this.name + ": " + number); 
     Thread.sleep(100); 
     } 
    } catch (InterruptedException e) { 
    } 
    } 
} 

Потребитель:

public class Consumer implements Runnable { 

    protected final String name; 
    protected final LinkedBlockingQueue<Integer> sharedList; 

    public Consumer(final String name, final LinkedBlockingQueue<Integer> sharedList) { 
    this.name = name; 
    this.sharedList = sharedList; 
    } 

    public void run() { 
    try { 
     while (Thread.interrupted() == false) { 
     final int number = sharedList.take(); 
     System.out.println("Thread " + name + ": " + number + " taken."); 
     Thread.sleep(100); 
     } 
    } catch (InterruptedException e) { 
    } 
    } 
} 

Главная:

public static void main(String[] args) throws InterruptedException { 
    final LinkedBlockingQueue<Integer> sharedList = new LinkedBlockingQueue<>(100); 
    final ExecutorService executor = Executors.newFixedThreadPool(4); 

    executor.execute(new Producer("producer", sharedList)); 
    Thread.sleep(1000); 

    executor.execute(new Consumer("consumer1", sharedList)); 
    executor.execute(new Consumer("consumer2", sharedList)); 

    Thread.sleep(1000); 
    executor.shutdownNow(); 
} 

Есть несколько отличий:

  • Поскольку я использую параллельный список, мне не нужно заботиться (о многом) о синхронизации, список делает это внутри.

  • Поскольку этот список использует атомную блокировку вместо истинной блокировки через , она будет масштабироваться намного лучше, чем больше потоков.

  • Я устанавливаю предел блокирующей очереди на 100, поэтому даже если в списке нет проверки, в списке не будет более 100 элементов, поскольку put будет блокировать, если предел достигнут.

  • Я использую random.nextInt(100), который является функцией удобства для того, что вы использовали, и будет производить намного меньше опечаток, поскольку использование намного яснее.

  • Продюсер и потребитель - это Runnables, так как это предпочтительный способ для потоковой передачи на Java. Это позволяет впоследствии обернуть любую форму потока вокруг них для выполнения, а не только примитивный класс Thread.

  • Вместо Thread, я использую ExecutorService, который позволяет упростить управление несколькими потоками. Создание темы, планирование и другая обработка выполняются внутренне, поэтому все, что мне нужно сделать, это выбрать наиболее подходящий ExecutorService и позвонить shutdownNow(), когда закончите.

  • Также обратите внимание, что нет необходимости бросать прерываниеException в пустоту. Если потребитель/производитель прерван, это сигнал прекратить выполнение изящно как можно скорее. Если мне не нужно сообщать другим «за» этот поток, нет необходимости снова бросать это исключение (хотя и никакого вреда не делается).

  • Я использую ключевое слово final, чтобы отметить элементы, которые не будут меняться позже. На этот раз это подсказка для компилятора, который допускает некоторые оптимизации, это также помогает мне предотвратить случайное изменение переменной, которая не должна изменяться. Множество проблем можно предотвратить, не допуская изменения переменных в потоковой среде, поскольку проблемы с потоком почти всегда требуют, чтобы что-то читалось и записывалось в одно и то же время. Такие вещи не могут произойти, если вы не можете писать.

Потратив некоторое время на поиск через библиотеку Java для класса, который соответствует вашей проблеме лучшие обычно решает много проблем и уменьшает размер кода много.

Смежные вопросы