2009-09-16 3 views
5

Есть ли способ, чтобы создать Исполнитель, который будет всегда по крайней мере, 5 нитей, и максимум 20 потоков и неограниченная очередь для задач (имеется в виде не задачи отвергается)с указанием проблемы ThreadPoolExecutor

Я попробовал новые ThreadPoolExecutor(5, 20, 60L, TimeUnit.SECONDS, queue) с все возможности, которые я думал для очереди:

new LinkedBlockingQueue() // never runs more than 5 threads 
new LinkedBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting 
new ArrayBlockingQueue(1000000) // runs more than 5 threads, only when there is more than 1000000 tasks waiting 
new SynchronousQueue() // no tasks can wait, after 20, they are rejected 

и никто не работал, как хотел.

+0

Что значит, что это не сработало? Исключение? – Gandalf

+4

Это слишком близко: Гэндальф и Сармун! – akf

+0

Ents должен быть здесь в ближайшее время. , , и где это надоедливый Eagle ... – Gandalf

ответ

5

Возможно, что-то подобное сработает для вас? Я просто взбивал его, поэтому, пожалуйста, подсунь ему. В основном, он реализует пул потоков переполнения, который используется, чтобы накормить основной ThreadPoolExecutor

Есть два основных обратить спиной я вижу с ним:

  • Отсутствие возвращенного будущего объекта на submit(). Но, возможно, это не проблема для вас.
  • Вторичная очередь будет опущена только в ThreadPoolExecutor, когда будут представлены рабочие места. Там должно быть элегантное решение, но я пока этого не вижу. Если вы знаете, что в StusMagicExecutor будет стоять поток задач, это может не быть проблемой. («Май» - это ключевое слово.) Возможно, вариант состоит в том, чтобы ваши поставленные задачи выкалывали по StusMagicExecutor после их завершения?

Stu Волшебное Исполнитель:

public class StusMagicExecutor extends ThreadPoolExecutor { 
    private BlockingQueue<Runnable> secondaryQueue = new LinkedBlockingQueue<Runnable>(); //capacity is Integer.MAX_VALUE. 

    public StusMagicExecutor() { 
     super(5, 20, 60L, SECONDS, new SynchronousQueue<Runnable>(true), new RejectionHandler()); 
    } 
    public void queueRejectedTask(Runnable task) { 
     try { 
      secondaryQueue.put(task); 
     } catch (InterruptedException e) { 
      // do something 
     } 
    } 
    public Future submit(Runnable newTask) { 
     //drain secondary queue as rejection handler populates it 
     Collection<Runnable> tasks = new ArrayList<Runnable>(); 
     secondaryQueue.drainTo(tasks); 

     tasks.add(newTask); 

     for (Runnable task : tasks) 
      super.submit(task); 

     return null; //does not return a future! 
    } 
} 

class RejectionHandler implements RejectedExecutionHandler { 
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { 
     ((StusMagicExecutor)executor).queueRejectedTask(runnable); 
    } 
} 
+1

Отличный, tnx, это в значительной степени то, что я хотел. С завершением newTasks с runnable, который выдает одну задачу после финиша, только один поток из 20 может быть бездействующим, пока у нас есть задачи во вторичной очереди. – Sarmun

1

javadocs для ThreadPoolExecutor довольно ясно, что после создания потоков corePoolSize новые потоки будут созданы только после того, как очередь будет заполнена. Поэтому, если вы установите core на 5 и max на 20, вы никогда не получите желаемого поведения.

Однако, если вы установили оба core и max на 20, тогда задачи будут добавляться только в очередь, если все 20 потоков заняты. Конечно, это означает, что ваше минимальное требование «5 нитей» немного бессмысленно, так как все 20 будут сохранены (пока они не простаивают).

+0

«бездействовать» никогда не произойдет, если вы не скажете, что все основные нити могут умереть. В любом случае, не видите точку как основного, так и максимального размера, если она не может быть использована должным образом. Есть ли другой класс (кроме ThreadPoolExecutor), который будет соответствовать моим требованиям? – Sarmun

1

Я думаю, что эта проблема является недостатком класса и вводит в заблуждение, учитывая комбинации параметров конструктора. Вот решение, взятое из внутреннего ThreadPoolExecutor от SwingWorker, которое я сделал в классе верхнего уровня. Он не имеет минимума, но, по крайней мере, использует верхнюю границу. Единственное, что я не знаю, - это то, что производительность ударит от выполнения блокировки.

public class BoundedThreadPoolExecutor extends ThreadPoolExecutor { 
    private final ReentrantLock pauseLock = new ReentrantLock(); 
    private final Condition unpaused = pauseLock.newCondition(); 
    private boolean isPaused = false; 
    private final ReentrantLock executeLock = new ReentrantLock(); 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue); 
    } 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, 
     BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue, 
       threadFactory); 
    } 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, 
      BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue, 
       handler); 
    } 

    public BoundedThreadPoolExecutor(int maximumPoolSize, 
      long keepAliveTime, TimeUnit unit, 
      BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, 
      RejectedExecutionHandler handler) { 
     super(0, maximumPoolSize, keepAliveTime, unit, workQueue, 
       threadFactory, handler); 
    } 

    @Override 
    public void execute(Runnable command) { 
     executeLock.lock(); 
     try { 
      pauseLock.lock(); 
      try { 
       isPaused = true; 
      } finally { 
       pauseLock.unlock(); 
      } 
      setCorePoolSize(getMaximumPoolSize()); 
      super.execute(command); 
      setCorePoolSize(0); 
      pauseLock.lock(); 
      try { 
       isPaused = false; 
       unpaused.signalAll(); 
      } finally { 
       pauseLock.unlock(); 
      } 
     } finally { 
      executeLock.unlock(); 
     } 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
     super.afterExecute(r, t); 
     pauseLock.lock(); 
     try { 
      while (isPaused) { 
       unpaused.await(); 
      } 
     } catch (InterruptedException ignore) { 

     } finally { 
      pauseLock.unlock(); 
     } 
    } 
} 
Смежные вопросы