2013-10-11 2 views
6

Предположим, мне нужно обрабатывать запросы 3-х типов: А, В, и С следующим образом:Параллельная обработка запроса в Java с ограничениями

  • запросы обрабатываются одновременно.
  • Есть не более K (< = 3) запросы обрабатываются одновременно.
  • просит о того же типа не могут быть обработаны одновременно.

В целом, количество типов N и число одновременных запросов является К < = N.

Как бы реализовать его в Java с java.util.concurrent?

ответ

0

запросы обрабатываются одновременно.

Вы можете воспользоваться услугой палача.

Одновременно обрабатывается не более K запросов одновременно.

В исполнителе установить максимальное количество потоков.

Запросы одного типа не могут обрабатываться одновременно.

Возможно, у вас есть разные блокировки для каждого типа запроса. Просто убедитесь, что если поток не может получить блокировку для запроса в течение определенного времени, он должен уступить и перейти к следующей обработке задачи.

+0

бы вы подробнее остановиться на замках вы предложили для удовлетворения ограничений, # 3? – Michael

2

Вы не можете обрабатывать запросы K в то же время, что будет нарушать второе правило. Максимальное количество одновременных запросов - это типы номеров. В вашем случае его три. Итак, сделайте три очереди и прикрепите их к трем потокам. Это единственный способ. Executors.newSingleThreadExecutor реализует эту технику.

public static void main(String[] args) { 
    int N = 2; 
    int K = 3; 
    List<Executor> executors = new ArrayList<Executor>(N); 
    for(int i = 0; i < N; i++){ 
     executors.add(Executors.newSingleThreadExecutor()); 
    } 
    Map<Type, Executor> typeExecutors = new HashMap<Type, Executor>(K); 
    int i = 0; 
    for(Type t : Type.values()){ 
     typeExecutors.put(t, executors.get(i++ % executors.size())); 
    } 
} 

enum Type{ 
    T1, T2, T3 
} 
+0

Согласитесь, после первого прочитанного вопроса я тоже думал об исполнителях (newFixedThreadPool размера k). Однако ограничение только одного задания определенного типа, выполняемого в любой данный момент, значительно улучшает предложение ваших потоков + очередей. –

+0

Что делать, если K = 2? В более общем случае количество типов: N и K Michael

+0

Итак, сделайте K-поток и разделите все очереди между ними. Для случая K = 2, N = 3. Первый поток должен опросить два вопроса в круговом режиме, например, в то время как второй прослушивает только одну очередь. – Mikhail

0

домен вашей проблемы может быть смоделирована на две структуры данных, которые я назвал в ожидании (который отображает типы неограниченных очередей задач - это где задачи ждать, чтобы запустить), и работает (где в большинстве одна задача для каждого типа готов к запуску, или фактически управляется Исполнителем).

The K ограничение должно быть применено к работает: оно имеет не более К Type для Task отображения.

Изюминка состоит в том, что количество потоков, которые вы выделяете для всей обработки задачи, полностью ортогонально для обработки ограничений параллелизма: выбор пула потоков должен быть продиктован (среди прочего) типом задач, которые необходимо выполнить (IO/CPU?), А не ограничениями параллелизма.

Реализация:

public class Foo { 

    enum TaskType { A, B, C } 

    class Task { 
     TaskType type; 
     Runnable runnable; 
     volatile boolean running; 
    } 

    Map<TaskType, Queue<Task>> pending = new HashMap<TaskType, Queue<Task>>(); 

    Map<TaskType, Task> running = new HashMap<TaskType, Task>(); 

    ExecutorService executor = null; // Executor implementation is irrelevant to the problem 

    /** Chooses a task of a random type between those not running. */ 
    TaskType choosePending(){ 
     Set running_types = running.keySet(); 
     running_types.removeAll(Arrays.asList(pending.keySet())); 
     List shuffled = new ArrayList(running_types); 
     Collections.shuffle(shuffled); 
     return (TaskType) shuffled.get(0); 
    } 

    // note that max concurrency != parallelism level (which the executor is responsible for) 
    final int MAX_CONCURRENCY = 3; 

    void produce(){ 
     synchronized(running){ 
      if (running.size() < MAX_CONCURRENCY) { 
       synchronized (pending){ 
        TaskType t = choosePending(); 
        running.put(t, pending.get(t).remove()) ; 
       } 
      } 
     } 
    } 

    { 
     new Thread(new Runnable() { 
      public void run() { 
       while (true) produce(); 
      } 
     }).start(); 
    } 

    Task chooseRunning(){ 
     for (Task t : running.values()){ 
      if (!t.running){ 
       return t; 
      } 
     } 
     return null; 
    } 

    void consume(){ 
     final Task t; 
     synchronized (running){ 
      t = chooseRunning(); 
      if (t != null){ 
       t.running = true; 
       executor.execute(new Runnable() { 
        public void run() { 
         t.runnable.run(); 
         synchronized (running) { 
          running.remove(t); 
         } 
        } 
       }); 
      } 
     } 
    } 

    { 
     new Thread(new Runnable() { 
      public void run() { 
       while (true) consume(); 
      } 
     }).start(); 
    } 

} 
Смежные вопросы