2008-09-24 2 views
49

Я ищу простейшее, наиболее простой способ реализовать следующее:Как реализовать простую многопоточность с фиксированным числом рабочих потоками

  • Основной программа конкретизирует работник нити, чтобы сделать задачу.
  • Только задачи n могут быть запущены сразу.
  • Когда n достигнуто, больше не нужно , пока не начисляется бегущих нитей. Спускаются ниже n.

ответ

53

Я думаю, что Executors.newFixedThreadPool соответствует вашим требованиям. Существует несколько способов использования полученного ExecutorService в зависимости от того, хотите ли вы, чтобы результат возвращался к основному потоку или была ли эта задача полностью автономной, и есть ли у вас набор задач для выполнения переднего плана или задаются ли задачи в зависимости от какого-либо события.

Collection<YourTask> tasks = new ArrayList<YourTask>(); 
    YourTask yt1 = new YourTask(); 
    ... 
    tasks.add(yt1); 
    ... 
    ExecutorService exec = Executors.newFixedThreadPool(5); 
    List<Future<YourResultType>> results = exec.invokeAll(tasks); 

В качестве альтернативы, если у вас есть новая асинхронная задача для выполнения в ответ на какое-то событие, вы, вероятно, просто хотите использовать простой метод execute(Runnable) в ExecutorService в.

0

Если вы хотите, чтобы свернуть свой собственный:

private static final int MAX_WORKERS = n; 
private List<Worker> workers = new ArrayList<Worker>(MAX_WORKERS); 

private boolean roomLeft() { 
    synchronized (workers) { 
     return (workers.size() < MAX_WORKERS); 
    } 
} 

private void addWorker() { 
    synchronized (workers) { 
     workers.add(new Worker(this)); 
    } 
} 

public void removeWorker(Worker worker) { 
    synchronized (workers) { 
     workers.remove(worker); 
    } 
} 

public Example() { 
    while (true) { 
     if (roomLeft()) { 
      addWorker(); 
     } 
    } 
} 

Где работник ваш класс, который расширяет тему. Каждый рабочий вызовет метод removeWorker этого класса, передав его в качестве параметра, когда он закончит делать это.

С учетом сказанного структура Executor выглядит намного лучше.

Редактировать: Любой должен объяснить, почему это так плохо, а не просто понижать его?

0

Как другие здесь уже упоминалось, лучше всего это сделать пул потоков с Executors класса:

Однако, если вы хотите, чтобы свернуть свой собственный, этот код должен дать вам представление о том, как действовать. В принципе, просто добавить каждую новую нить к нити группы и убедитесь, что вы никогда не более N активных потоков в группе:

Task[] tasks = getTasks(); // array of tasks to complete 
ThreadGroup group = new ThreadGroup(); 
int i=0; 
while(i<tasks.length || group.activeCount()>0) { 
    if(group.activeCount()<N && i<tasks.length) { 
     new TaskThread(group, tasks[i]).start(); 
     i++; 
    } else { 
     Thread.sleep(100); 
    } 
} 
22
/* Get an executor service that will run a maximum of 5 threads at a time: */ 
ExecutorService exec = Executors.newFixedThreadPool(5); 
/* For all the 100 tasks to be done altogether... */ 
for (int i = 0; i < 100; i++) { 
    /* ...execute the task to run concurrently as a runnable: */ 
    exec.execute(new Runnable() { 
     public void run() { 
      /* do the work to be done in its own thread */ 
      System.out.println("Running in: " + Thread.currentThread()); 
     } 
    }); 
} 
/* Tell the executor that after these 100 steps above, we will be done: */ 
exec.shutdown(); 
try { 
    /* The tasks are now running concurrently. We wait until all work is done, 
    * with a timeout of 50 seconds: */ 
    boolean b = exec.awaitTermination(50, TimeUnit.SECONDS); 
    /* If the execution timed out, false is returned: */ 
    System.out.println("All done: " + b); 
} catch (InterruptedException e) { e.printStackTrace(); } 
1
  1. Если ваша очередь задача не собирается быть неограниченным и задачи могут быть завершены за более короткие промежутки времени, вы можете использовать Executors.newFixedThreadPool(n); как предлагает эксперт.

    Единственный недостаток этого решения - неограниченный размер очереди задач. У вас нет контроля над этим. Огромный сбор в очереди задач приведет к ухудшению производительности приложения и может привести к нехватке памяти в некоторых сценариях.

  2. Если вы хотите использовать ExecutorService и включить work stealing механизм, в котором холостые рабочие потоки разделить нагрузку от занятых рабочих потоков, украв задачи в очереди задач. Он вернет тип службы ForkJoinPool.

    общественности статической ExecutorService newWorkStealingPool (интермедиат параллелизм)

    Создает пул потоков, который поддерживает достаточное количество потоков для поддержки заданного уровня параллелизма, и может использовать несколько очередей, чтобы уменьшить конкуренцию. Уровень параллелизма соответствует максимальному числу потоков, активно вовлеченных в процесс обработки задач или доступных для участия. Фактическое количество потоков может расти и динамически сокращаться. Пул поиска работы не дает никаких гарантий относительно порядка выполнения поставленных задач.

  3. Я предпочитаю ThreadPoolExecutor благодаря гибкости API-интерфейсов для управления многими paratmeters, который контролирует выполнение задания потока.

    ThreadPoolExecutor(int corePoolSize, 
             int maximumPoolSize, 
             long keepAliveTime, 
             TimeUnit unit, 
             BlockingQueue<Runnable> workQueue, 
             ThreadFactory threadFactory, 
             RejectedExecutionHandler handler) 
    

в вашем случае, установить как corePoolSize and maximumPoolSize as N. Здесь вы можете контролировать размер очереди задач, определять свою собственную политику изготовителя нитей и политику обработчика отклонения.

Посмотрите на соответствующий вопрос SE контролировать размер пула динамически:

Dynamic Thread Pool

Смежные вопросы