2014-12-20 3 views
3

Я обнаружил, что существует простой пул потоков, используемый JVM для параллельной обработки потоков. У нас была блокированная функция ввода-вывода на большом потоке, которая вызывала проблемы с живимостью для несвязанных и в противном случае быстрых функций, используемых с несвязанными параллельными потоками.Исправление потока потоковой обработки потока

В потоке нет методов, позволяющих использовать альтернативный пул потоков.

Есть ли простой способ избежать этой проблемы, возможно, каким-то образом определить, какой пул потоков использовать?

+2

Возможный дубликат [Пользовательский пул потоков в Java-параллельном потоке] (http://stackoverflow.com/questions/21163108/custom-thread-pool-in-java-8-parallel-stream) – mkobit

+1

Есть трюк - предлагается в вышеупомянутом вопросе, но вы должны иметь в виду, что потоки - это прежде всего механизм распараллеливания * вычислений *, а не * IO *, и поэтому вы работаете на полях. Мы изучаем способы довести некоторые из этих вариантов использования в большей части. –

+0

@BrianGoetz, в то время как у меня есть ваше ухо, рассмотрите расширение API, возможно, параметр для метода 'parallel()', который позволяет использовать отдельный пул потоков для параллельной обработки потока. Некоторые вычисления также тяжелы и могут вызывают проблемы с живительностью. Хотя я вижу, что «очень длинные» времена (например, IO) выходят за рамки дизайна, достаточно естественным образом использовать потоки для управления этим использованием, и IMHO должен быть простым способом смягчить влияние такого вида использования , Для меня это нарушило правило Джоша «наименьшее удивление». Веселый Xmas :) – Bohemian

ответ

0

Это, возможно, похож на Custom thread pool in Java 8 parallel stream

Проблема дополнительно обсуждается в this blog.

ForkJoinPool forkJoinPool = new ForkJoinPool(2); 
forkJoinPool.submit(() -> 
    //parallel task here, for example 
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()) 
).get(); 
1

Я написал небольшую библиотеку под названием StreamEx, которая может представить задачу к пользовательскому ПСС. Таким образом, вы можете написать

ForkJoinPool forkJoinPool = new ForkJoinPool(2); 
int[] primes = IntStreamEx.range(1, 1_000_000) 
    .parallel(forkJoinPool) 
    .filter(PrimesPrint::isPrime).toArray(); 

Он просто запоминает ваш пул и запускает операцию терминала внутри, соединяющую результат. Просто синтаксический сахар для вышеупомянутого решения.

2

Вы можете обернуть блокирующей операцию в ForkJoinPool.ManagedBlocker, вдоль линий этого:

static <T> Supplier<T> blocking(Supplier<T> supplier) { 
    return new Supplier<T>() { 
     volatile T result; 

     @Override 
     public T get() { 
      try { 
       ForkJoinPool.managedBlock(new ManagedBlocker() { 
        @Override 
        public boolean block() { 
         result = supplier.get(); 
         return true; 
        } 

        @Override 
        public boolean isReleasable() { 
         return result != null; 
        } 
       }); 
      } 
      catch (InterruptedException e) { 
       throw new RuntimeException(e); 
      } 

      return result; 
     } 
    }; 
} 

использовать его тогда, например, такие:

Stream.generate(blocking(() -> ...)) 
     .parallel() 
     ... 
     .collect(...); 

Более подробную информацию можно найти в это сообщение в блоге: http://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health/

jOOλ предлагает обертки для всех Java 8 FunctionalInterface типы, как выше, с помощью org.jooq.lambda.Blocking, так что вы можете написать:

Stream.generate(Blocking.supplier(() -> ...)) 
     .parallel() 
     ... 
     .collect(...); 

Или, например, когда фильтр блокируется:

Stream.... 
     .parallel() 
     .filter(Blocking.predicate(t -> blockingTest(t))) 
     .collect(...); 

(Отказ от ответственности, я работаю в компании позади jOOλ).

+0

IIRC вы утверждали, что jOOλ не заботится о параллельной обработке :-) На самом деле ваше решение (как и мое) имеет другую проблему. Количество подзадач, созданных Stream API, обычно составляет 4 * число процессоров (может быть больше для бесконечных потоков). Это число не изменится, если мы создадим собственный пул со 100 потоками или отметьте задачи как блокирующие ваш подход. Для блокировки ввода-вывода на самом деле лучше использовать что-то другое (например, CompletableFutures). –

+0

@TagirValeev: Цель jOOλ - «исправить» JDK в целом (например, также предоставить функциональные интерфейсы, которые выдают проверенные исключения). Цель 'Seq' jOOλ -« исправить »« Поток »для последовательного (и упорядоченного) использования. 'CompletableFuture' также отправляет задания в' ForkJoinPool', поэтому вы можете выпускать те же потоки. Я согласен, что это использование, вероятно, не то, что люди должны делать. Но опять же, что * * блокирует (в этом контексте)? Это I/O? Это цикл до '1_000_000_000_000'? Это глупый экспоненциальный алгоритм? –

+0

OP явно сказал, что это I/O. Используя CF, вы можете создать столько задач, сколько хотите, но используя параллельный поток, вы не сможете его контролировать. На 4-ядерной машине обычно создаются до 16 задач, и если 15 из них блокируются в операции ввода-вывода, то даже с помощью трюка будет продолжать работать только одна задача. Stream API просто не будет делить вход больше и не создаст больше задач для вас. –

Смежные вопросы