Давайте посмотрим на фрагмент кода, который печатает правую часть вашего выхода:
if (abq.offer(Thread.currentThread().getName() + " t" + i
+ " ~")) {
i++;
}
Давайте увеличить в условие в, если заявление:
abq.offer(Thread.currentThread().getName() + " t" + i
+ " ~")
Таким образом, вы используете метод предложения. Давайте посмотрим на Java документ для предложения метода:
общественное логическое предложение (Е е)
Вставляет указанный элемент в хвосте этой очереди, если это возможно сделать так сразу без превышая емкость очереди, возвращая true при успешном завершении и false, если эта очередь заполнена. Этот метод обычно предпочтительнее метода add (E), , который может не вставить элемент только путем исключения исключения.
Звонок на предложение не является блокирующим вызовом. Это означает, что несколько потоков могут одновременно вызвать метод предложения. Это то, что, скорее всего, проблема:
t1
предлагает элемент в ArrayBlockingQueue
. i = 0
p2
немедленно принимает элемент из очереди. i = 0
t2
предлагает элемент ArrayBlockingQueue
еще до того, как t1
получает возможность позвонить по телефону i++
. i=0
.
p2
немедленно принимает элемент из очереди. i = 0
.
Вы можете увидеть из 2) и 4), что существует вероятность того, что два потока прочитать значение i
еще до i++
называются и, таким образом, видит то же значение i
.
Итак, как вы исправите эту проблему? Как и другие предложили, вы можете использовать AtomicInteger
, как описано ниже:
Летучие ключевое слово будет гарантировать, что когда прочитал or
в записи на переменную (ячейку памяти) разделен между несколькими потоками сделано, это гарантирует, что каждый В потоке будет отображаться последнее значение переменной. Обратите внимание, что акцент делается на or
.Когда вы говорите i++
или j++
, вы делаете чтение and
a написать вместе. Это не атомная операция, и ключевое слово volatile
не остановит потоки от просмотра несогласованных значений.
Изменить
private static volatile int i = 0;
Кому:
private static final AtomicInteger i = new AtomicInteger();
AtomicInteger
обеспечивает операции атомного приращения incrementAndGet()
(= ++i
) и getAndIncrement()
(= i++
), которые могут быть использованы в данном случае:
int nr=0; // local holder for message number
while ((nr = i.getAndIncrement()) < 500) {
abq.put(Thread.currentThread().getName() + " t" + nr + " ~");
}
(1) Обратите внимание на использование локальной переменной nr
! Это гарантирует, что результат операции инкремента используется как номер сообщения в put(...)
. Если бы мы использовали i
непосредственно вместо nr
, другие потоки могли бы увеличивать i
тем временем, и у нас был бы другой номер сообщения или номер дубликата сообщения!
(2) Также обратите внимание, что offer(...)
заменяется на put(...)
. Поскольку ваша очередь ограничена (только для), операция вставки должна блокироваться, если у нее заканчивается емкость. (offer(...)
немедленно вернет false
в этом случае.)
(3) Обратите внимание, что этот код НЕ обеспечивает правильный порядок вставки в соответствии с номером сообщения. Это гарантирует, что дубликатов нет! Если требуется правильный порядок вставки, вам придется использовать совершенно другой подход без AtomicInteger
и ArrayBlockingQueue
. Вместо того, чтобы использовать не поточно-очереди и использовать старый добрый synchronized
сделать приращение и вставки атомное:
while (true) {
synchronized(queue) {
if (i++ >= 500) break;
queue.put(... + i + ...);
}
}
Приведенный выше код не является эффективным. Почти весь код синхронизирован. Не существует реального параллельного исполнения, а параллелизм бесполезен.
Аналогичным образом можно изменить j
.
Хорошо, что * сделал * вы ожидаете, и почему? Обратите внимание, что 'j ++' и 'i ++' не являются потокобезопасными (потому что они не являются атомарными операциями), хотя переменные нестабильны. Вы должны посмотреть на 'AtomicInteger'. –
Не связано с вопросом, но почему вы чувствуете необходимость расширения 'Thread' вместо реализации' Runnable' – CKing
@JonSkeet. Я мог ошибаться, но проблема здесь не в том, что OP не использовал 'AtomicInteger'. – CKing