2015-04-24 4 views
3

Я учусь о java.util.concurrent. Я написал код, как это:Использование volatile во избежание условий гонки

import java.util.concurrent.ArrayBlockingQueue; 

public class JUCArrayBlockingQueue { 

private static ArrayBlockingQueue<String> abq = new ArrayBlockingQueue<String>(1); 
private static volatile int i = 0; 
private static volatile int j = 0; 

public static void main(String[] args) { 
    new Pop("t1").start(); 
    new Pop("t2").start(); 
    new Push("p1").start(); 
    new Push("p2").start(); 
} 

static class Pop extends Thread { 
    public Pop(String name) { 
     super(name); 
    } 

    @Override 
    public void run() { 

     String str; 
     try { 

      while (++j < 500) { 
       str = abq.take(); 
       System.out.println(j + ">>" 
         + Thread.currentThread().getName() + " take: " 
         + str); 
      } 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 

} 

static class Push extends Thread { 
    public Push(String name) { 
     super(name); 
    } 

    @Override 
    public void run() { 

     while (i < 500) { 
      if (abq.offer(Thread.currentThread().getName() + " t" + i 
        + " ~")) { 
       i++; 
      } 
      ; 
     } 
    } 
} 
} 

И результат таков:

488>>t1 take: p2 t486 ~ 
489>>t2 take: p2 t487 ~ 
490>>t2 take: p1 t488 ~ 
491>>t1 take: p1 t489 ~ 
492>>t1 take: p2 t490 ~ 
493>>t1 take: p1 t490 ~ 
494>>t2 take: p1 t492 ~ 
495>>t2 take: p1 t493 ~ 
496>>t1 take: p1 t494 ~ 
497>>t2 take: p1 t495 ~ 
498>>t1 take: p1 t496 ~ 
499>>t2 take: p1 t497 ~ 
500>>t1 take: p2 t498 ~ 

Я озадачен этим выходом, потому что левый размер, как ожидается, но не правильный размер. Я не ожидал, что с правой стороны будут показаны повторяющиеся значения. Как я могу это исправить? Кто-нибудь поможет?

+6

Хорошо, что * сделал * вы ожидаете, и почему? Обратите внимание, что 'j ++' и 'i ++' не являются потокобезопасными (потому что они не являются атомарными операциями), хотя переменные нестабильны. Вы должны посмотреть на 'AtomicInteger'. –

+3

Не связано с вопросом, но почему вы чувствуете необходимость расширения 'Thread' вместо реализации' Runnable' – CKing

+0

@JonSkeet. Я мог ошибаться, но проблема здесь не в том, что OP не использовал 'AtomicInteger'. – CKing

ответ

3

Давайте посмотрим на фрагмент кода, который печатает правую часть вашего выхода:

if (abq.offer(Thread.currentThread().getName() + " t" + i 
        + " ~")) { 
     i++; 
} 

Давайте увеличить в условие в, если заявление:

abq.offer(Thread.currentThread().getName() + " t" + i 
         + " ~") 

Таким образом, вы используете метод предложения. Давайте посмотрим на Java документ для предложения метода:

общественное логическое предложение (Е е)

Вставляет указанный элемент в хвосте этой очереди, если это возможно сделать так сразу без превышая емкость очереди, возвращая true при успешном завершении и false, если эта очередь заполнена. Этот метод обычно предпочтительнее метода add (E), , который может не вставить элемент только путем исключения исключения.

Звонок на предложение не является блокирующим вызовом. Это означает, что несколько потоков могут одновременно вызвать метод предложения. Это то, что, скорее всего, проблема:

  1. t1 предлагает элемент в ArrayBlockingQueue. i = 0
  2. p2 немедленно принимает элемент из очереди. i = 0
  3. t2 предлагает элемент ArrayBlockingQueue еще до того, как t1 получает возможность позвонить по телефону i++. i=0.
  4. p2 немедленно принимает элемент из очереди. i = 0.

Вы можете увидеть из 2) и 4), что существует вероятность того, что два потока прочитать значение i еще до i++ называются и, таким образом, видит то же значение i.

Итак, как вы исправите эту проблему? Как и другие предложили, вы можете использовать AtomicInteger, как описано ниже:

Летучие ключевое слово будет гарантировать, что когда прочитал or в записи на переменную (ячейку памяти) разделен между несколькими потоками сделано, это гарантирует, что каждый В потоке будет отображаться последнее значение переменной. Обратите внимание, что акцент делается на or.Когда вы говорите i++ или j++, вы делаете чтение and a написать вместе. Это не атомная операция, и ключевое слово volatile не остановит потоки от просмотра несогласованных значений.

Изменить

private static volatile int i = 0; 

Кому:

private static final AtomicInteger i = new AtomicInteger(); 

AtomicInteger обеспечивает операции атомного приращения incrementAndGet() (= ++i) и getAndIncrement() (= i++), которые могут быть использованы в данном случае:

int nr=0; // local holder for message number 
while ((nr = i.getAndIncrement()) < 500) { 
    abq.put(Thread.currentThread().getName() + " t" + nr + " ~"); 
} 

(1) Обратите внимание на использование локальной переменной nr! Это гарантирует, что результат операции инкремента используется как номер сообщения в put(...). Если бы мы использовали i непосредственно вместо nr, другие потоки могли бы увеличивать i тем временем, и у нас был бы другой номер сообщения или номер дубликата сообщения!

(2) Также обратите внимание, что offer(...) заменяется на put(...). Поскольку ваша очередь ограничена (только для), операция вставки должна блокироваться, если у нее заканчивается емкость. (offer(...) немедленно вернет false в этом случае.)

(3) Обратите внимание, что этот код НЕ обеспечивает правильный порядок вставки в соответствии с номером сообщения. Это гарантирует, что дубликатов нет! Если требуется правильный порядок вставки, вам придется использовать совершенно другой подход без AtomicInteger и ArrayBlockingQueue. Вместо того, чтобы использовать не поточно-очереди и использовать старый добрый synchronized сделать приращение и вставки атомное:

while (true) { 
    synchronized(queue) { 
     if (i++ >= 500) break; 
     queue.put(... + i + ...); 
    } 
} 

Приведенный выше код не является эффективным. Почти весь код синхронизирован. Не существует реального параллельного исполнения, а параллелизм бесполезен.

Аналогичным образом можно изменить j.

+0

Согласен. Вы также должны указать подсказку, как решить эту проблему. (например, с помощью 'AtomicInteger.incrementAndGet()') – isnot2bad

+0

. Ваше изменение по-прежнему объясняет причину, по которой текущий код НЕ работает, но не совсем, как его исправить. Возможно, вы должны предоставить фрагмент кода. (Легче исправить, если предположить, что порядок написания сообщений не имеет значения.) – isnot2bad

+0

@ isnot2bad Я думал, что использование 'AtomicInteger' было самоочевидным. Во всяком случае, отредактировал ответ с помощью фрагмента кода. – CKing

0

Выполнение переменной volatile не защищает ее от условий гонки, она просто указывает, что объект, к которому привязана переменная, может мутировать, чтобы предотвратить оптимизацию компилятора, в вашем случае это не нужно, потому что I & j изменены в тот же поток (поток цикла). Чтобы преодолеть условие гонки, вам нужно сделать так, чтобы операторы i++ и j++ были синхронизированы, как определение их как атомное целое число и использование incrementAndGet().

+0

Я не согласен! Обеспечение атомных операций не остановит два потока от просмотра того же значения 'i' до того, как' i' будет увеличено. См. Мой ответ. – CKing

+0

Это довольно грубое упрощение, также записывающее на volatile (и что более важно) предоставляет определенные гарантии видимости в модели памяти Java, см. [Модель памяти JLS 17.4] (http://docs.oracle.com/javase/specs/jls /se8/html/jls-17.html#jls-17.4) –

+0

@bot, отредактировал мой ответ соответственно. Это было мое первоначальное намерение. – Lital

Смежные вопросы