2015-06-14 2 views
1

В книге Java 8 In Action, раздел 7.1.1, авторы заявляют, что поток может извлечь выгоду из параллельной обработки, добавив функцию .parallel(). Они обеспечивают простой способ, называемый parallelSum(int), чтобы проиллюстрировать это. Мне было интересно посмотреть, насколько хорошо он работал так я выполнил этот код:Java 8, используя .parallel в потоке, вызывает ошибку OOM

package lambdasinaction.chap7; 

import java.util.stream.Stream; 

public class ParallelPlay { 

    public static void main(String[] args) { 
     System.out.println(parallelSum(100_000_000)); 
    } 

    public static long parallelSum(long n) { 
     return Stream.iterate(1L, i -> i + 1) 
       .limit(n) 
       .parallel() 
       .reduce(0L, Long::sum); 
    } 
} 

К моему удивлению, я получил эту ошибку:

Exception in thread "main" java.lang.OutOfMemoryError 
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) 
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) 
    at java.lang.reflect.Constructor.newInstance(Unknown Source) 
    at java.util.concurrent.ForkJoinTask.getThrowableException(Unknown Source) 
    at java.util.concurrent.ForkJoinTask.reportException(Unknown Source) 
    at java.util.concurrent.ForkJoinTask.invoke(Unknown Source) 
    at java.util.stream.SliceOps$1.opEvaluateParallelLazy(Unknown Source) 
    at java.util.stream.AbstractPipeline.sourceSpliterator(Unknown Source) 
    at java.util.stream.AbstractPipeline.evaluate(Unknown Source) 
    at java.util.stream.ReferencePipeline.reduce(Unknown Source) 
    at lambdasinaction.chap7.ParallelPlay.parallelSum(ParallelPlay.java:15) 
    at lambdasinaction.chap7.ParallelPlay.main(ParallelPlay.java:8) 
Caused by: java.lang.OutOfMemoryError: Java heap space 
    at java.util.stream.SpinedBuffer.ensureCapacity(Unknown Source) 
    at java.util.stream.Nodes$SpinedNodeBuilder.begin(Unknown Source) 
    at java.util.stream.AbstractPipeline.copyInto(Unknown Source) 
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) 
    at java.util.stream.SliceOps$SliceTask.doLeaf(Unknown Source) 
    at java.util.stream.SliceOps$SliceTask.doLeaf(Unknown Source) 
    at java.util.stream.AbstractShortCircuitTask.compute(Unknown Source) 
    at java.util.concurrent.CountedCompleter.exec(Unknown Source) 
    at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) 
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(Unknown Source) 
    at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) 
    at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) 

Я бег Java 1.8.0_45 на Windows 7 SP1 с четырехъядерным процессором. Что происходит?

+0

На macbook pro (2,2 ГГц Intel Core i7 с 16 ГБ оперативной памяти) потребовалось 26 секунд и вернулся: 5000000050000000 – alfasin

+0

Похоже, что размер вашей кучи слишком мал, запустите: 'java -XX: + PrintFlagsFinal -version | findstr/i «HeapSize PermSize ThreadStackSize», чтобы проверить его и рассмотреть возможность его увеличения (изменив значения '-Xms' и' -Xmx') и повторите попытку. – alfasin

+0

Кроме того, использование 'iterate()' как источника потока по существу гарантирует, что вы не получите никакой распараллеливания, так как это принципиально последовательное поколение (не может генерировать элемент n + 1, пока вы не сгенерировали элемент n.) Используйте ' Вместо этого: IntStream.range(). –

ответ

5

Здесь вы создаете бесконечный поток и ограничиваете его потом. Известны проблемы параллельной обработки бесконечных потоков. В частности, нет возможности эффективно разбить задачу на равные части. Внутри используются некоторые эвристики, которые не подходят для каждой задачи. В вашем случае это гораздо лучше, чтобы создать конечный поток с помощью LongStream.range:

import java.util.stream.LongStream; 

public class ParallelPlay { 

    public static void main(String[] args) { 
     System.out.println(parallelSum(100_000_000)); 
    } 

    public static long parallelSum(long n) { 
     return LongStream.rangeClosed(1, n).parallel().sum(); 
    } 
} 

В этом случае поток двигатель знает с самого начала, сколько элементов у вас есть, так что он может эффективно разделить задачу. Также обратите внимание, что использование LongStream более эффективно, так как у вас не будет ненужного бокса.

В общем, избегайте бесконечных потоков, если вы можете решить свою задачу с помощью конечных.

+0

Хотя независимо от того, сколько потоков может выполняться параллельно, они не могут бить простой однопоточный '(n + 1) * n/2'. К сожалению, стандартная реализация недостаточно умна, чтобы понять это. – Holger

+0

@Holger: это интересный вопрос, почему нет специализированных потоков (таких как 'EmptyStream',' SingletonStream', 'RangeStream' и т. Д.), Которые могли бы оптимизировать некоторые операции. Вероятно, это приведет к слишком большому количеству кода (учитывая, что вы также должны поддерживать примитивы). Другая возможная причина заключается в том, что это может повредить производительность в обычном случае, поскольку потоковые вызовы станут полиморфными, поэтому JIT будет более тяжело девиртуализировать их (в настоящее время JDK имеет только одну реализацию «Stream»). –

+0

Я не думаю, что реализации полиморфных потоков могут повредить производительность больше, чем полиморфные интерфейсы «Spliterator». В конце концов, Hotspot отлично справляется с ними, и операция терминала состоит из одного вызова метода в любом случае. Но вам все равно их не понадобится, единственное, что вам нужно, - это эти (или некоторые) операции высокого уровня, определенные на внутренних этапах трубопровода. Часто это так или иначе, никто не думал об этом, или оптимизация была отложена. См. 'Stream.count()', который будет сокращен в Java 9 или потоках flatmap, которые еще не ленивы. – Holger

Смежные вопросы