2016-09-27 2 views
3

Предположим, что у меня есть 5 потоков, которые должны составлять в общей сложности 1,000,000 вызовы функций для параллельной программы метода Монте-Карло. Я назначил 1,000,000/5 вызовы функций для каждого из 5 потоков. Однако после многих тестов (в некоторых тестах до 1 триллиона итераций) я понял, что некоторые потоки заканчиваются намного быстрее, чем другие. Поэтому вместо этого я хотел бы динамически назначать рабочую нагрузку для каждого из этих потоков. Мой первый подход включал переменную AtomicLong, которая была установлена ​​на начальное значение, скажем, 1 миллиард. После каждого вызова функции, я бы уменьшаем AtomicLong на 1. Перед каждым вызовом функции программа будет проверять, если AtomicLong было больше, чем 0, как это:Динамическое распределение рабочей нагрузки для нескольких потоков в Java

AtomicLong remainingIterations = new AtomicLong(1000000000); 
ExecutorService threadPool = Executors.newFixedThreadPool(5); 

for (int i = 0; i < 5; i++) {//create 5 threads 
    threadPool.submit(new Runnable() { 
     public void run() { 
      while (remainingIterations.get() > 0) {//do a function call if necessary 
      remainingIterations.decrementAndGet();//decrement # of remaining calls needed 
      doOneFunctionCall();//perform a function call 
      } 
     } 
    }); 
}//more unrelated code is not show (thread shutdown, etc.) 

Такой подход, казалось, очень медленно, я Я правильно использую AtomicLong? Есть ли лучший подход?

+0

Вы пытались проверить: - http://stackoverflow.com/questions/12735739/atomiclong-operations? –

ответ

2

Я использую AtomicLong правильно?

Не совсем. То, как вы его используете, два потока могут каждый проверить remainingIterations, каждый видит 1, затем каждый уменьшает его, вводя вас в -1 всего.

Что касается вопроса о медлительности, возможно, если doOneFunctionCall() быстро завершится, ваше приложение увязнет в блокировке, связанной с вашим AtomicLong.

Приятная вещь в ExecutorService заключается в том, что он логически отделяет работу, выполняемую от потоков, которые это делают. Вы можете представить больше рабочих мест, чем у вас есть темы, а ExecutorService выполнит их, как только он способен:

ExecutorService threadPool = Executors.newFixedThreadPool(5); 

for (int i = 0; i < 1000000; i++) { 
    threadPool.submit(new Runnable() { 
     public void run() { 
      doOneFunctionCall(); 
     } 
    }); 
} 

Это может балансировать свою работу немного слишком много в другом направлении: создание слишком много кратко- Живые объекты Runnable. Вы можете экспериментировать, чтобы увидеть, что дает наилучший баланс между распределяя работу и выполнять работу быстро:

ExecutorService threadPool = Executors.newFixedThreadPool(5); 

for (int i = 0; i < 1000; i++) { 
    threadPool.submit(new Runnable() { 
     public void run() { 
      for (int j = 0; j < 1000; j++) { 
       doOneFunctionCall(); 
      } 
     } 
    }); 
} 
+0

Благодарим вас за разъяснение моих заблуждений относительно класса ExecutorService. Это было очень полезно! – Arman

0

Посмотрите на ForkJoinPool. То, что вы пытаетесь, называется делением и победой. В F/J вы устанавливаете количество потоков на 5. Каждый поток имеет очередь ожидающих задач. Вы можете равномерно задавать количество задач для каждого потока/очереди и когда поток заканчивается, он работает - перебирает из очереди другого потока. Таким образом вам не нужен AtomicLong.

Существует множество примеров использования этого класса. Если вам нужна дополнительная информация, дайте мне знать.

+0

Я до сих пор не видел класс ForkJoinPool (все еще изучая), если бы вы могли указать мне направление, которое было бы очень полезно. – Arman

+0

Google java.util.concurrent.ForkJoinPool или просто java divide-and-завоевать. Существует так много примеров, что вы можете клонировать их. Даже смотря на JavaDoc для java.util.concurrent.RecursiveAction или java.util.concurrent.RecursiveTask. – edharned

+0

Вот официальный учебник: https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html Много связанных ссылок на боковой панели слева. – markspace

0

Элегантный подход, позволяющий избежать создания задач 1B, заключается в использовании синхронной очереди и ThreadPoolExecutor, поэтому отправка будет заблокирована до тех пор, пока поток не станет доступен. Я не тестировал фактическую производительность.

BlockingQueue<Runnable> queue = new SynchronousQueue<>(); 
ExecutorService threadPool = new ThreadPoolExecutor(5, 5, 
    0L, TimeUnit.MILLISECONDS, 
    queue); 
for (int i = 0; i < 1000000000; i++) { 
    threadPool.submit(new Runnable() { 
     public void run() { 
      doOneFunctionCall(); 
     } 
    }); 
} 
Смежные вопросы