2013-02-22 3 views
6

У нас есть сценарий, в котором задачи, переданные ThreadPoolExecutor, выполняются долго. Когда пул потоков запущен, мы запустим его с размером пула ядра = 5, максимальным размером пула = 20 и размером очереди 10. В нашем приложении подано 10 задач. Большую часть времени эти задачи выполняются в течение нескольких минут/час, а затем завершаются. Однако была ситуация, когда все 5 заданий были повесили на ввод-вывод. В результате размер основного пула достиг максимального значения, но моя очередь Threadpoolexecutor не была заполнена. Таким образом, дополнительные 5 задач никогда не имели шансов на успех. Пожалуйста, подскажите, как мы можем справиться с таким сценарием? Имеет ли меньшая очередь лучший вариант в такой ситуации? Каким будет оптимальный размер очереди при инициализации threadPool?ThreadPoolExecutor: задачи ставятся в очередь и не отправляются

Также в отношении повешенных задач есть ли способ вытащить потоки из Threadpool? В этом случае, по крайней мере, другие задачи получат шанс запустить.

+0

Когда вы отправляете() задачу, вам возвращается объект Future. Вы можете использовать этот будущий объект для отмены выполнения. – Aurand

+0

Да, мы делаем это, используя future.cancel. Однако не гарантируется, что задачи будут отменены в ожидании ввода/вывода. – user1269597

+0

вы можете установить значение тайм-аута при получении значения с помощью объекта Future как 'future.get (5000, TimeUnit.MILLISECONDS)' и записать его в блок 'try catch'. –

ответ

4

Общая ситуация такова:

core pool size = 5, 
max pool size = 20 and 
queue size of 10 

10 задач представляются. Из них

  1. 5 Задачи, навешенные на I/O => заняты все потоки основного пула. И, следовательно, нет простоя.
  2. 5 Задачи остаются. Эти 5 потоков помещаются в очередь в очередь, поскольку нет простоя потока, и очередь может вместить 10 задач. Эти запущенные задачи не будут выполняться до тех пор, пока очередь не будет заполнена или ни один из потоков в пуле ядра не станет бесплатным.

Следовательно, ваша программа повешена.

Чтобы узнать больше о динамике ThrePoolExecutor смотреть here. Значимые точки этого документа заключаются в следующем:

  • Если работает меньше потоков corePoolSize, Executor всегда предпочитает добавлять новый поток, а не выполнять очередность.
  • Если corePoolSize или более потоки запущены, Executor всегда предпочитает размещать запрос, а не добавлять новый поток.
  • Если запрос не может быть поставлен в очередь, создается новый поток, если это не будет превышать maximumPoolSize, и в этом случае задача будет отклонена.

EDIT
Если вы хотите, чтобы увеличить базовый размер пула, то вы можете использовать setCorePoolSize(int corePoolSize). Если вы увеличите corepoolsize, тогда новые потоки при необходимости будут запущены для выполнения любых задач в очереди.

3

В Javadocs for ThreadPoolExecutor состоянии:

Любой BlockingQueue может быть использован для передачи и держать представленные задачи. Использование этой очереди взаимодействует с бассейном проклейки:

  • Если меньше corePoolSize потоков работают, Исполнитель всегда предпочитает добавлять новый поток, а не массового обслуживания.
  • Если corePoolSize или запущено больше потоков, Executor всегда предпочитает запрашивать запрос в , а не добавлять новый поток.
  • Если запрос не может быть указан в , создается новый поток, если это не будет превышать максимумPoolSize, и в этом случае задача будет отклонена.

Если вы не превысите размер своей очереди после того, как 5 потоков «висят», вы не получите больше потоков.

Настоящий ответ: устранить проблему, которая приводит к зависанию ваших потоков. В противном случае вам придется реализовать некоторую схему, которая использует Future s, возвращенные submit(), чтобы отменить потоки, если они работают слишком долго.

+0

Согласен. Мы стараемся избегать нависания ниток как можно больше. Но в сценарии производства все еще существует очень низкая вероятность того, что потоки зависают, ожидая некоторых других систем/приложений. В этом случае сценарий, который я наблюдал, полностью остановит мое приложение. – user1269597

+0

Правильно. Вот почему * вам нужно исправить эту проблему *. Нет * причины, чтобы иметь код, который позволяет им «висеть» в первую очередь. –

0

«Путь потока с переменным размером в jdk немного сложнее. В принципе, настройка, которую вы используете, не будет работать очень хорошо по причинам, которые вы указали. Когда я хочу, переменного размера пула потоков, я обычно использую эту установку:

ThreadPoolExecutor executor = new ThreadPoolExecutor(maxPoolSize, maxPoolSize, 
                 DEFAULT_THREAD_TIMEOUT, DEFAULT_THREAD_TIMEOUT_UNIT, 
                 new LinkedBlockingQueue<Runnable>(), 
                 threadFactory); 
    executor.allowCoreThreadTimeOut(true); 

это создаст размерный бассейн переменной нити, которая будет изменяться в пределах от 1 до MaxPoolSize нитей. поскольку размер ядра совпадает с максимальным размером, пул всегда будет предпочитать добавление потоков в очередь, поэтому вы никогда не будете делать резервную копию своей очереди до тех пор, пока количество потоков не будет превышено.

ОБНОВЛЕНИЕ: Что касается проблемы с подвесными задачами, если есть верхний предел длины задачи, вы можете иметь отдельный менеджер, отслеживающий выдающиеся будущие и отменяющие их, если они превышают максимальное время работы. это, конечно, предполагает, что ваши задачи прерываются.

+0

Стоит отметить, что без фиксации его актуальной проблемы ...это будет продолжать создавать потоки, пока не достигнет максимума (или, конечно же, он получит ошибку OOM до этого). Это скорее бандада, чем решение. –

+0

@BrianRoach - как это решение для бандажей? – jtahlborn

+0

Потому что именно из того, что я только что сказал в своем комментарии? В длительном процессе у него закончится поток или память, потому что у него есть задачи, которые висят. Фиксирование реальной проблемы (его потоки, зависающие от ввода-вывода) устраняет необходимость изменения его текущей настройки. Все дело в том, чтобы использовать исполнителя, а не только порождать потоки, - это контролировать количество потоков (и управлять ими). –

0

Основная проблема заключается в том, что пользователь действительно получает потоки CORE_POOL_SIZE с неограниченным сценарием очереди. Таким образом, если в основной пуле есть 5 потоков, это все, что он может когда-либо использовать, максимальный размер не помогает. Хотя сокращение времени выполнения потоков рекомендуется во всех случаях, в сценарии производства мы не можем часто контролировать, как будут работать сторонние службы, и поэтому решение будет заключаться в том, чтобы увеличить размер пула ядра равным максимальному размеру пула или ограничить размер очереди ,

1

Я думаю, что другой подход состоял в том, чтобы установить corePoolSize на основе количества задач, ожидающих очереди. Можно управлять corePoolSize с помощью setCorePoolSize. Образец монитора может контролировать вас threadPoolExecutor. Вы также можете улучшить этот монитор, чтобы настроить степень параллелизма.

public class ExecutorMonitor extends Thread{ 

      ThreadPoolExecutor executor = null; 
      int initialCorePoolSize; 
      public ExecutorMonitor(ThreadPoolExecutor executor) 
      { 
       this.executor = executor; 
       this.initialCorePoolSize = executor.getCorePoolSize(); 
      } 
      @Override 
      public void run() 
      { 
       while (true) 
       { 
        try { 
         Thread.sleep(5000); 
        } catch (InterruptedException e) { 
         e.printStackTrace(); 
        } 
        if (executor.getQueue().size() > 0) 
        { 
         if(executor.getActiveCount() < executor.getMaximumPoolSize()) 
          executor.setCorePoolSize(executor.getCorePoolSize() + 1); 
        } 
        if (executor.getQueue().size() == 0) 
        { 
         if(executor.getCorePoolSize() > initialCorePoolSize) 
          executor.setCorePoolSize(executor.getCorePoolSize() -1); 
        } 
       } 
      } 
     } 
+0

Динамически увеличивая corePoolSize создаст новый поток и назначит задачу с очередью в этот новый поток? – Nishat

+0

@ Нишат Да согласно do java doc: [setCorePoolSize (int)] (https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html#setCorePoolSize (int)) – celik

-2

Я думаю, что лучшим решением было бы переопределить метод выполнения ThreadPoolExecutor и внести изменения в это.

Смежные вопросы