2016-12-12 3 views
1

Java-потоки основывают объем параллелизма на вашем оборудовании. Но что, если я хочу всегда иметь максимальный объем параллелизма?Максимизировать параллелизм с потоками Java 8

Рассмотрите приведенный ниже код. Я хочу, чтобы каждая из 10 задач выполнялась одновременно в течение 100 миллисекунд.

long runUntil = System.currentTimeMillis() + 100; 
IntStream.range(0, 10).parallel().forEach(i -> 
{ 
    int cnt = 0; 
    while(System.currentTimeMillis() < runUntil) 
     cnt++; 
    System.out.println(i + ": " + cnt); 
}); 

Однако результат я получаю:

2: 56443 
1: 67506 
4: 74693 
6: 70549 
0: 0 
3: 0 
5: 0 
7: 0 
8: 0 
9: 0 

Так что только 4 задачи выполняются параллельно, а пятая задача начинается только тогда, когда один из первых 4 закончена. Я хочу, чтобы все задания начинались примерно в одно и то же время, и не дожидайтесь друг друга.

Я не согласен с тем, что это дубликат Custom thread pool in Java 8 parallel stream, потому что этот вопрос касается медленных задач, блокирующих другие задачи, в то время как в моем случае я просто хочу знать, как могу (если можно) максимизировать параллелизм, когда используя Stream API.

+1

Каков ожидаемый результат? –

+6

Похоже, у вас есть 4 ядра. – 4castle

+0

«одновременно работает в течение 100 миллисекунд»: вы хотите, чтобы все задачи выполнялись одновременно), или б) каждый из них не заканчивался, пока он не запустился на 100 мс? –

ответ

4

При выполнении параллельного потока, вы под капотом, ссылающегося на ForkJoinPool, что бассейн имеет ряд рабочих потоков, равно результату:

Runtime.getRuntime().availableProcessors(); // 4 in your case 

так параллельная задача выполняется одновременно по 4 потокам.

К тому времени, когда вы начинаете 5-й задачу (100 милисекунд прошли), так что это условие:

while(System.currentTimeMillis() < runUntil) 

отчетов ложную, только таким образом нули.

Чтобы решить эту проблему, вы можете создать ForkJoinPool самостоятельно, как описано в этом ответе, а также (https://stackoverflow.com/a/22269778/2947592)

long runUntil = System.currentTimeMillis() + 1000; 
ForkJoinPool forkJoinPool = new ForkJoinPool(10); // 10 Threads 
forkJoinPool.submit(() -> 
IntStream.range(0, 10).parallel().forEach(i -> { 
    int cnt = 0; 
    while (System.currentTimeMillis() < runUntil) 
     cnt++; 
    System.out.println(i + ": " + cnt); 
})).get(); 
+1

@wvdz Я видел ответ, который вы предоставили, и да, что * исправляет * вашу проблему, но не рекомендуется собирать больше потоков, чем у вас. Было бы хорошо, если бы вы объяснили свой случай использования? Я действительно заинтригован. Если все, что вы хотите сделать, это измерить, насколько быстро каждое ядро ​​добавляет, а выделение 10 потоков для 4 ядер уменьшит ваши результаты. – Eugene

+0

Это может быть не очень хорошая идея, но это требование для того, над чем я работаю. Другим решением было бы вручную запускать потоки, но я бы предпочел использовать API потока. – wvdz

+2

@wvdz В рабочей среде вы хотите вручную отключить потоки, чтобы люди знали, что происходит. Вы можете «mapToObject» и создавать там темы. – 4castle

1

Так что я уже нашел ответ на свой вопрос. Проблема в том, что это действительно похоже на взлом, а не на правильное решение. Мне не понравилось бы это использовать в производственной среде.

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10"); 

Теперь я получить результат, как это, когда я позволил ему бежать в течение 1000 миллисекунд:

9: 40158551 
8: 41835052 
0: 39087202 
4: 37993773 
6: 37993442 
7: 36503041 
2: 40076207 
1: 37894657 
5: 35785211 
3: 40086037 

Я думаю, что мое требование является разумным, и я удивлен, что это Apparantly не поддерживаются потока API ,

+2

Снова ... оба вопроса и ответа являются дубликатом [Пользовательский пул потоков в параллельном потоке Java 8] (http://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel- поток) –

Смежные вопросы