2012-01-04 4 views
0

У меня есть ситуация, когда у меня есть 2 очереди блокировки. Сначала я вставляю некоторые выполняемые мной задачи. Когда каждая задача завершается, она добавляет задачу во вторую очередь, где они выполняются.Блокировка Java BlockingQueue на take(), с небольшим завихрением

Так что моя первая очередь легко: я просто проверить, чтобы убедиться, что он не пустой и выполнять, иначе я прерываю():

public void run() { 
    try { 
     if (taskQueue1.isEmpty()) { 
      SomeTask task = taskQueue1.poll(); 
      doTask(task); 
      taskQueue2.add(task); 
     } 
     else { 
      Thread.currentThread().interrupt(); 
     } 
    } 

    catch (InterruptedException ex) { 
     ex.printStackTrace(); 
    } 
} 

Второй я делаю следующее, что, как вы можете сказать, не работает:

public void run() { 
    try { 
     SomeTask2 task2 = taskQueue2.take(); 
     doTask(task2); 
    } 

    catch (InterruptedException ex) { 

    } 
    Thread.currentThread().interrupt(); 

} 

Как бы вы ее решить, так что второй BlockingQueue не блокирует на дубле(), но заканчивается только тогда, когда он знает, что нет больше элементов, которые будут добавлены. Было бы хорошо, если бы 2-й поток мог видеть 1-ю блокирующую очередь, возможно, и проверить, было ли это пусто, а вторая очередь была пуста, а затем она прерывалась.

Я мог бы также использовать объект Poison, но предпочел бы что-то другое.

NB: Это не точный код, как раз то, что я написал здесь:

+1

Я не могу понять, что вы здесь делаете. Пожалуйста, объясни. –

ответ

1

Вы делаете это так, как будто поток, обрабатывающий первую очередь, знает, что больше задач не возникает, как только его очередь разряжается. Это звучит подозрительно, но я возьму вас на ваше слово и предлагаю решение в любом случае.

Определить AtomicInteger, видимый для обоих потоков. Инициализировать его до положительный.

Определение операции в первого потока следующим образом:

  • Loop на Queue#poll().
  • Если Queue#poll() возвращает null, позвоните по номеру AtomicInteger#decrementAndGet() по общему целому числу.
    • Если AtomicInteger#decrementAndGet() вернули нуль, прервите вторую нить через Thread#interrupt(). (Это обрабатывает случай, когда никаких предметов не было.)
    • В любом случае выйдите из цикла.
  • В противном случае обработайте извлеченный элемент, вызовите AtomicInteger#incrementAndGet() на общее целое число, добавьте извлеченный элемент во вторую очередь потока и продолжите цикл.

Определение операции в второй потока следующим образом:

  • Loop блокировки на BlockingQueue#take().
  • Если BlockingQueue#take() выбрасывает InterruptedException, поймайте исключение, вызовите Thread.currentThread().interrupt() и выйдите из цикла.
  • В противном случае обработайте извлеченный элемент.
  • Звоните AtomicInteger#decrementAndGet() на общее целое число.
    • Если AtomicInteger#decrementAndGet() возвращено в нуль, выйдите из цикла.
    • В противном случае продолжите цикл.

Убедитесь, что вы понимаете идею, прежде чем пытаться писать фактический код. Контракт состоит в том, что второй поток продолжает ждать большего количества элементов из своей очереди, пока количество ожидаемых задач не достигнет нуля. В этот момент производящая нить (первая) больше не будет выталкивать новые предметы во вторую очередь потока, так что второй поток знает, что можно перестать обслуживать свою очередь.

Визуальный случай возникает, когда нет задач, когда-либо прибывающих в очередь первого потока. Поскольку второй поток только уменьшает и проверяет счет после, он обрабатывает элемент, если он никогда не получает возможности обрабатывать какие-либо предметы, он никогда не будет рассматривать остановку. Мы используем прерывание потока для обработки этого случая за счет другой условной ветви в шагах завершения цикла первого потока. К счастью, эта ветка будет выполняться только один раз.

Существует много проектов, которые могли бы работать здесь. Я просто описал один, который ввел только один дополнительный объект — общее атомное целое число —, но даже тогда это неудобно. Я думаю, что использование таблетки было бы намного чище, хотя я признаю, что ни Queue#add(), ни BlockingQueue#put() принимают null как действительный элемент (из-за договора возврата стоимости Queue#poll()). В противном случае было бы легко использовать нуль в качестве ядовитой таблетки.

+0

Спасибо. Сначала я попробую реализовать ядовитую таблетку –

+0

Я не уверен, что могу использовать яд-таблетку, потому что из очередей приходится много потребителей. –

+0

@ DominicBou-Samra - если вы объяснили, что ваш код пытается достичь, возможно, кто-то может помочь вам сделать это более разумным образом. –

2

Я не могу понять, что вы на самом деле пытаетесь сделать здесь, но я могу сказать, что interrupt() в ваш первый run() метод является либо бессмысленным, либо неправильным.

  • Если вы работаете метод run() в собственном Thread объекта, то, что нить о выходе в любом случае, так что нет никакого смысла прерывать его.

  • Если вы используете метод run() у исполнителя с пулом потоков, то вы, скорее всего, не захотите убить нить или вообще закрыть исполнитель ... в этой точке. И если вы хотите завершить работу исполнителя, вы должны вызвать один из его методов останова.


Например, вот версия, что делает то, что вы представляясь делать не все вещи прерывания и без создания потока/уничтожения маслобойки.

public class TaskExecutor { 

    private ExecutorService executor = new ThreadPoolExecutorService(...); 

    public void submitTask1(final SomeTask task) { 
     executor.submit(new Runnable(){ 
      public void run() { 
       doTask(task); 
       submitTask2(task); 
      } 
     }); 
    } 

    public void submitTask2(final SomeTask task) { 
     executor.submit(new Runnable(){ 
      public void run() { 
       doTask2(task); 
      } 
     }); 
    } 

    public void shutdown() { 
     executor.shutdown(); 
    } 
} 

Если вам нужна отдельная очередь для задач, просто создайте и используйте двух разных исполнителей.

Смежные вопросы