2010-08-10 5 views
42

Я пытаюсь выполнить множество задач с помощью ThreadPoolExecutor. Ниже приведен гипотетический пример:Блок ThreadPoolExecutor, когда очередь заполнена?

def workQueue = new ArrayBlockingQueue<Runnable>(3, false) 
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue) 
for(int i = 0; i < 100000; i++) 
    threadPoolExecutor.execute(runnable) 

Проблема заключается в том, что я быстро получить java.util.concurrent.RejectedExecutionException, так как число задач превышает размер очереди. Тем не менее, желаемое поведение, которое я ищу, состоит в том, чтобы иметь блок основного потока, пока в очереди не будет места. Каков наилучший способ сделать это?

+2

Взгляните на этот вопрос: http://stackoverflow.com/questions/2001086/how-to-make-threadpoolexecutors-submit-method-block-if-it-is-saturated – Kiril

+2

[Этот ответ] (http : //stackoverflow.com/a/4522411/394431) на другой вопрос предлагает использовать пользовательский подкласс BlockingQueue, который блокирует 'offer()' делегированием 'put()'. Я думаю, что он работает более или менее так же, как 'RejectedExecutionHandler', который вызывает' getQueue(). Put() '. –

+2

Ввод непосредственно в очередь будет неправильным, как объяснено в этом ответе http://stackoverflow.com/a/3518588/585903 –

ответ

45

В некоторых очень узких условиях вы можете реализовать java.util.concurrent.RejectedExecutionHandler, который делает то, что вам нужно.

RejectedExecutionHandler block = new RejectedExecutionHandler() { 
    rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
    executor.getQueue().put(r); 
    } 
}; 

ThreadPoolExecutor pool = new ... 
pool.setRejectedExecutionHandler(block); 

Теперь. Это очень плохая идея по следующим причинам

  • Это склонное к тупиковому, потому что все потоки в бассейне могут умереть прежде, чем вещи вы кладете в очереди видно. Смягчите это, установив разумное время сохранения.
  • Задача не завершена так, как может ожидать ваш Исполнитель. Множество реализаций исполнителей завершают свои задачи в каком-то объекте отслеживания перед выполнением. Посмотрите на свой источник.
  • Добавление через getQueue() категорически отвергается API и в какой-то момент может быть запрещено.

Стратегия почти всегда лучше установить ThreadPoolExecutor.CallerRunsPolicy, которая будет дросселировать ваше приложение, запустив задачу в потоке, который вызывает execute().

Однако иногда стратегия блокировки со всеми вытекающими из нее рисками - это то, что вы хотите. Я бы сказал, что в этих условиях

  • У вас есть только один поток вызова выполнить()
  • Вы должны (или хотят) имеют очень малую длину очереди
  • Вам абсолютно необходимо ограничить число потоки, выполняющие эту работу (как правило, по внешним причинам), и стратегия caller-run нарушит это.
  • Ваши задачи имеют непредсказуемый размер, поэтому запуск звонков может вызвать голод, если пул на мгновение занят четырьмя короткими задачами, а ваш один поток, вызывающий выполнение, застрял с большим.

Итак, как я уже сказал. Это редко необходимо и может быть опасно, но там вы идете.

Удачи.

+2

Очень продуманный ответ. Я принимаю незначительные проблемы с вашим состоянием, что> «Вы должны (или хотите) иметь очень маленькую длину очереди». Возможно, вы не сможете предсказать, сколько задач будет выполняться в заданном задании. Возможно, вы выполняете ежедневную работу, которая обрабатывает данные из какой-то БД, а в понедельник 500 записей обрабатываются, а во вторник - 50 000. Вы должны установить верхнюю границу своей очереди, чтобы вы не взорвали свою кучу, когда придет большая работа. В этом случае нет никакого вреда в ожидании завершения некоторых задач до очередности. – skelly

+0

Фантастический ответ, спасибо –

+0

«Он подвержен тупиковой ситуации, потому что все потоки в пуле могут умереть, прежде чем вещь, которую вы помещаете в очередь, видна. Смягчите это, установив разумное сохранение времени». Невозможно избежать полной блокировки, установив минимальный размер пула на значение больше нуля? Каждая другая причина - это падение Java, не имеющее встроенной поддержки для блокировки ставит очереди исполнителей. Это интересно, потому что это, по-видимому, довольно разумная стратегия. Интересно, что такое обоснование. –

1

Вот мой фрагмент кода в этом случае:

public void executeBlocking(Runnable command) { 
    if (threadPool == null) { 
     logger.error("Thread pool '{}' not initialized.", threadPoolName); 
     return; 
    } 
    ThreadPool threadPoolMonitor = this; 
    boolean accepted = false; 
    do { 
     try { 
      threadPool.execute(new Runnable() { 
       @Override 
       public void run() { 
        try { 
         command.run(); 
        } 
        // to make sure that the monitor is freed on exit 
        finally { 
         // Notify all the threads waiting for the resource, if any. 
         synchronized (threadPoolMonitor) { 
          threadPoolMonitor.notifyAll(); 
         } 
        } 
       } 
      }); 
      accepted = true; 
     } 
     catch (RejectedExecutionException e) { 
      // Thread pool is full 
      try { 
       // Block until one of the threads finishes its job and exits. 
       synchronized (threadPoolMonitor) { 
        threadPoolMonitor.wait(); 
       } 
      } 
      catch (InterruptedException ignored) { 
       // return immediately 
       break; 
      } 
     } 
    } while (!accepted); 
} 

Threadpool является локальным экземпляром java.util.concurrent.ExecutorService, который был инициализирован уже.

0

Я решил эту проблему с помощью пользовательского RejectedExecutionHandler, который просто блокирует вызывающий поток на некоторое время, а затем снова пытается представить задачу:

public class BlockWhenQueueFull implements RejectedExecutionHandler { 

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 

     // The pool is full. Wait, then try again. 
     try { 
      long waitMs = 250; 
      Thread.sleep(waitMs); 
     } catch (InterruptedException interruptedException) {} 

     executor.execute(r); 
    } 
} 

Этот класс может только использоваться в резьбе бассейне исполнителем как RejectedExecutionHandler, как и любой другой. В этом примере:

executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull()) 

Единственный недостаток я вижу в том, что вызывающий поток может получить немного заперта больше, чем строго необходимо (до 250 мс). Для многих коротких задач, возможно, уменьшите время ожидания до 10 мс или около того. Более того, поскольку этот исполнитель фактически называется рекурсивно, очень долго ждет, когда поток станет доступным (часы), может привести к переполнению стека.

Тем не менее, мне лично нравится этот метод. Он компактный, понятный и хорошо работает. Не хватает ли чего-нибудь важного?