2010-08-23 4 views
22

Я много искал, но не смог найти решение моей проблемы.Как реализовать PriorityBlockingQueue с ThreadPoolExecutor и настраиваемые задачи

У меня есть свой класс, BaseTask, который использует задачи ThreadPoolExecutor.
Если я не хочу приоритезации (т.е. с использованием LinkedBlockingQueue), это работает отлично, но когда я пытаюсь использовать PriorityBlockingQueue я получаю ClassCastException потому что ThreadPoolExecutor оборачивает свои задачи в FutureTask объекта.
Это, очевидно, ОК, потому что FutureTask не реализует Comparable, но как бы я решил решить приоритетную проблему?
Я читал, что вы можете переопределить newTaskFor в ThreadPoolExecutor, но я не могу найти этот метод вообще ...?

Любые предложения были бы высоко оценены!

Некоторый код, чтобы помочь:

В моем BaseTask классе у меня есть

private static final BlockingQueue<Runnable> sWorkQueue = new PriorityBlockingQueue<Runnable>(); 

private static final ThreadFactory sThreadFactory = new ThreadFactory() { 
    private final AtomicInteger mCount = new AtomicInteger(1); 

    public Thread newThread(Runnable r) { 
     return new Thread(r, "AsyncTask #" + mCount.getAndIncrement()); 
    } 
}; 

private static final BaseThreadPoolExecutor sExecutor = new BaseThreadPoolExecutor(
    1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, sWorkQueue, sThreadFactory); 

private final BaseFutureTask<Result> mFuture; 

public BaseTask(int priority) { 
    mFuture = new BaseFutureTask<Result>(mWorker, priority); 
} 

public final BaseTask<Params, Progress, Result> execute(Params... params) { 

    /* Some unimportant code here */ 

    sExecutor.execute(mFuture); 
} 

В BaseFutureTask классе

@Override 
public int compareTo(BaseFutureTask another) { 
    long diff = this.priority - another.priority; 

    return Long.signum(diff); 
} 

В BaseThreadPoolExecutor классе я переопределить 3 submit методы ...
Вызывается конструктор этого класса, но ни один из методов submit

+0

См. Также http://stackoverflow.com/questions/807223/how-do-implement-task-prioritization-using-an-executorservice-in-java-5 –

+0

Тема, на которую ссылается OP, это http: //переполнение стека.com/questions/11430574/how-to-have-un-unbound-sortable-queue-used-by-a-fixed-amount-of-threads/12663571 # 12663571 –

ответ

0

Похоже, что они оставили это из гармонии апача. Существует svn commit log около года назад, фиксируя отсутствие newTaskFor. Возможно, вы можете просто переопределить функции submit в расширенном ThreadPoolExecutor, чтобы создать расширенный FutureTask, который является Comparable. They are not very long.

+0

Нет, вы должны продлить его. Он загружается в FutureTask в различные методы отправки. – Qberticus

+0

Не похоже, что какой-либо из методов 'submit' вызывается ... Добавлен некоторый код, который поможет понять – greve

+0

Не называйте' ThreadPoolExecutor # execute' больше. Все, что вы хотите сделать пулом потоков, должно пройти через вызов 'submit'. Затем методы 'submit' должны вызывать' execute' для вас. – Qberticus

13
public class ExecutorPriority { 

public static void main(String[] args) { 

    PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20, new ComparePriority()); 

    Executor exe = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS, pq); 
    exe.execute(new RunWithPriority(2) { 

     @Override 
     public void run() { 

      System.out.println(this.getPriority() + " started"); 
      try { 
       Thread.sleep(3000); 
      } catch (InterruptedException ex) { 
       Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex); 
      } 
      System.out.println(this.getPriority() + " finished"); 
     } 
    }); 
    exe.execute(new RunWithPriority(10) { 

     @Override 
     public void run() { 
      System.out.println(this.getPriority() + " started"); 
      try { 
       Thread.sleep(3000); 
      } catch (InterruptedException ex) { 
       Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex); 
      } 
      System.out.println(this.getPriority() + " finished"); 
     } 
    }); 

} 

private static class ComparePriority<T extends RunWithPriority> implements Comparator<T> { 

    @Override 
    public int compare(T o1, T o2) { 
     return o1.getPriority().compareTo(o2.getPriority()); 
    } 
} 

}

, как вы можете догадаться, RunWithPriority это абстрактный класс, который является Runnable и имеет поле приоритета Integer

+2

. Я действительно не понимаю ваш пример! Я тупой или первый «Экс-исполнитель» никогда не используется? lol –

+0

Я не понимаю, почему это самый верный ответ. 'new ComparePriority()' не указывает параметр generic type, поэтому я считаю это быстрым и грязным решением. – Timmos

4

Мое решение:

public class XThreadPoolExecutor extends ThreadPoolExecutor 
{ 
    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
     long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue) 
    { 
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); 
    } 

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
     long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue, 
     RejectedExecutionHandler handler) 
    { 
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); 
    } 

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
     long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue, 
     ThreadFactory threadFactory) 
    { 
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); 
    } 

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
     long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue, 
     ThreadFactory threadFactory, RejectedExecutionHandler handler) 
    { 
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); 
    } 

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) 
    { 
     return new ComparableFutureTask<>(runnable, value); 
    } 

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) 
    { 
     return new ComparableFutureTask<>(callable); 
    } 

    protected class ComparableFutureTask<V> 
     extends FutureTask<V> implements Comparable<ComparableFutureTask<V>> 
    { 
     private Object object; 
     public ComparableFutureTask(Callable<V> callable) 
     { 
      super(callable); 
      object = callable; 
     } 

     public ComparableFutureTask(Runnable runnable, V result) 
     { 
      super(runnable, result); 
      object = runnable; 
     } 

     @Override 
     @SuppressWarnings("unchecked") 
     public int compareTo(ComparableFutureTask<V> o) 
     { 
      if (this == o) 
      { 
       return 0; 
      } 
      if (o == null) 
      { 
       return -1; // high priority 
      } 
      if (object != null && o.object != null) 
      { 
       if (object.getClass().equals(o.object.getClass())) 
       { 
        if (object instanceof Comparable) 
        { 
         return ((Comparable) object).compareTo(o.object); 
        } 
       } 
      } 
      return 0; 
     } 
    } 
} 
0

Чтобы ответить на ваш вопрос: newTaskFor() метод находится в ThreadPoolExecutor в суперклассе, AbstractExecutorService. Однако вы можете просто переопределить его в ThreadPoolExecutor.

8

Вы можете использовать эти вспомогательные классы:

public class PriorityFuture<T> implements RunnableFuture<T> { 

    private RunnableFuture<T> src; 
    private int priority; 

    public PriorityFuture(RunnableFuture<T> other, int priority) { 
     this.src = other; 
     this.priority = priority; 
    } 

    public int getPriority() { 
     return priority; 
    } 

    public boolean cancel(boolean mayInterruptIfRunning) { 
     return src.cancel(mayInterruptIfRunning); 
    } 

    public boolean isCancelled() { 
     return src.isCancelled(); 
    } 

    public boolean isDone() { 
     return src.isDone(); 
    } 

    public T get() throws InterruptedException, ExecutionException { 
     return src.get(); 
    } 

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 
     return src.get(); 
    } 

    public void run() { 
     src.run(); 
    } 

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() { 
     public int compare(Runnable o1, Runnable o2) { 
      if (o1 == null && o2 == null) 
       return 0; 
      else if (o1 == null) 
       return -1; 
      else if (o2 == null) 
       return 1; 
      else { 
       int p1 = ((PriorityFuture<?>) o1).getPriority(); 
       int p2 = ((PriorityFuture<?>) o2).getPriority(); 

       return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1); 
      } 
     } 
    }; 
} 

И

public interface PriorityCallable<T> extends Callable<T> { 

    int getPriority(); 

} 

И этот вспомогательный метод:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) { 
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, 
      new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) { 

     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 
      RunnableFuture<T> newTaskFor = super.newTaskFor(callable); 
      return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority()); 
     } 
    }; 
} 

И, используйте его следующим образом:

class LenthyJob implements PriorityCallable<Long> { 
    private int priority; 

    public LenthyJob(int priority) { 
     this.priority = priority; 
    } 

    public Long call() throws Exception { 
     System.out.println("Executing: " + priority); 
     long num = 1000000; 
     for (int i = 0; i < 1000000; i++) { 
      num *= Math.random() * 1000; 
      num /= Math.random() * 1000; 
      if (num == 0) 
       num = 1000000; 
     } 
     return num; 
    } 

    public int getPriority() { 
     return priority; 
    } 
} 

public class TestPQ { 

    public static void main(String[] args) throws InterruptedException, ExecutionException { 
     ThreadPoolExecutor exec = getPriorityExecutor(2); 

     for (int i = 0; i < 20; i++) { 
      int priority = (int) (Math.random() * 100); 
      System.out.println("Scheduling: " + priority); 
      LenthyJob job = new LenthyJob(priority); 
      exec.submit(job); 
     } 
    } 
} 
2

Я попытаюсь объяснить эту проблему с помощью полностью функционального кода. Но прежде чем погрузиться в код, который я хотел бы объяснить, о PriorityBlockingQueue

PriorityBlockingQueue: PriorityBlockingQueue является реализация BlockingQueue. Он принимает задачи вместе с их приоритетом и сначала отправляет задачу с наивысшим приоритетом для выполнения. Если какие-либо две задачи имеют одинаковый приоритет, нам нужно предоставить некоторую пользовательскую логику, чтобы решить, какая задача будет первой.

Теперь можно сразу перейти в код.

Класс водителя: Этот класс создает исполнителя, который принимает задачи и затем отправляет их на исполнение. Здесь мы создаем две задачи с приоритетом LOW, а другая с высоким приоритетом. Здесь мы говорим исполнителю запустить MAX из 1 потока и использовать PriorityBlockingQueue.

 public static void main(String[] args) { 

     /* 
     Minimum number of threads that must be running : 0 
     Maximium number of threads that can be created : 1 
     If a thread is idle, then the minimum time to keep it alive : 1000 
     Which queue to use : PriorityBlockingQueue 
     */ 
    PriorityBlockingQueue queue = new PriorityBlockingQueue(); 
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1, 
     1000, TimeUnit.MILLISECONDS,queue); 

    MyTask task = new MyTask(Priority.LOW,"Low"); 
    executor.execute(new MyFutureTask(task)); 
    task = new MyTask(Priority.HIGH,"High"); 
    executor.execute(new MyFutureTask(task)); 
} 

MyTask класс: MyTask реализует Runnable и принимает приоритет в качестве аргумента в конструкторе. Когда эта задача выполняется, она печатает сообщение, а затем помещает поток в режим ожидания в течение 1 секунды.

public class MyTask implements Runnable { 

    public int getPriority() { 
    return priority.getValue(); 
    } 

    private Priority priority; 

    public String getName() { 
    return name; 
    } 

    private String name; 

    public MyTask(Priority priority,String name){ 
    this.priority = priority; 
    this.name = name; 
    } 

    @Override 
    public void run() { 
    System.out.println("The following Runnable is getting executed "+getName()); 
    try { 
     Thread.sleep(1000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    } 

} 

MyFutureTask класс: Поскольку мы используем PriorityBlocingQueue для проведения наших задач, наши задачи должны быть обернуты внутри FutureTask и наша реализация FutureTask должен реализовывать интерфейс Сопоставимые. Интерфейс Comparable сравнивает приоритет двух разных задач и отправляет задачу с наивысшим приоритетом для выполнения.

public class MyFutureTask extends FutureTask<MyFutureTask> 
     implements Comparable<MyFutureTask> { 

    private MyTask task = null; 

    public MyFutureTask(MyTask task){ 
     super(task,null); 
     this.task = task; 
    } 

    @Override 
    public int compareTo(MyFutureTask another) { 
     return task.getPriority() - another.task.getPriority(); 
    } 
    } 

Класс приоритета: Самоходный пояснительная класс приоритета.

public enum Priority { 

    HIGHEST(0), 
    HIGH(1), 
    MEDIUM(2), 
    LOW(3), 
    LOWEST(4); 

    int value; 

    Priority(int val) { 
    this.value = val; 
    } 

    public int getValue(){ 
    return value; 
    } 


} 

Теперь, когда мы запустим этот пример, мы получаем следующий вывод

The following Runnable is getting executed High 
The following Runnable is getting executed Low 

Даже если мы представили НИЗКИЙ приоритет первой, но первоочередной задачей позже, но так как мы используем PriorityBlockingQueue, в задача с более высоким приоритетом будет выполнена в первую очередь.

Смежные вопросы