домен вашей проблемы может быть смоделирована на две структуры данных, которые я назвал в ожидании (который отображает типы неограниченных очередей задач - это где задачи ждать, чтобы запустить), и работает (где в большинстве одна задача для каждого типа готов к запуску, или фактически управляется Исполнителем).
The K ограничение должно быть применено к работает: оно имеет не более К Type
для Task
отображения.
Изюминка состоит в том, что количество потоков, которые вы выделяете для всей обработки задачи, полностью ортогонально для обработки ограничений параллелизма: выбор пула потоков должен быть продиктован (среди прочего) типом задач, которые необходимо выполнить (IO/CPU?), А не ограничениями параллелизма.
Реализация:
public class Foo {
enum TaskType { A, B, C }
class Task {
TaskType type;
Runnable runnable;
volatile boolean running;
}
Map<TaskType, Queue<Task>> pending = new HashMap<TaskType, Queue<Task>>();
Map<TaskType, Task> running = new HashMap<TaskType, Task>();
ExecutorService executor = null; // Executor implementation is irrelevant to the problem
/** Chooses a task of a random type between those not running. */
TaskType choosePending(){
Set running_types = running.keySet();
running_types.removeAll(Arrays.asList(pending.keySet()));
List shuffled = new ArrayList(running_types);
Collections.shuffle(shuffled);
return (TaskType) shuffled.get(0);
}
// note that max concurrency != parallelism level (which the executor is responsible for)
final int MAX_CONCURRENCY = 3;
void produce(){
synchronized(running){
if (running.size() < MAX_CONCURRENCY) {
synchronized (pending){
TaskType t = choosePending();
running.put(t, pending.get(t).remove()) ;
}
}
}
}
{
new Thread(new Runnable() {
public void run() {
while (true) produce();
}
}).start();
}
Task chooseRunning(){
for (Task t : running.values()){
if (!t.running){
return t;
}
}
return null;
}
void consume(){
final Task t;
synchronized (running){
t = chooseRunning();
if (t != null){
t.running = true;
executor.execute(new Runnable() {
public void run() {
t.runnable.run();
synchronized (running) {
running.remove(t);
}
}
});
}
}
}
{
new Thread(new Runnable() {
public void run() {
while (true) consume();
}
}).start();
}
}
бы вы подробнее остановиться на замках вы предложили для удовлетворения ограничений, # 3? – Michael