2017-01-03 2 views
0

Я пишу код, который будет считывать строки журнала и выполнять некоторую обработку в фоновом режиме по этим данным. Эта обработка, вероятно, принесет пользу от распараллеливания, такого как методы Stream.parallel, и я пытался использовать это. Это код, который я начал с того, что работает отлично.Использование BufferedReader.lines() прерывается параллельно

public static void main(String[] args) { 
    try { 
     final Socket socket = new Socket(ADDRESS, PORT); 
     final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); 
     socket.getOutputStream().write(QUERY); 
     reader.lines().forEach(System.out::println); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 

} 

Этот код связывает и распечатывает все мои данные. Я очень хотел бы перестроить этот код следующим образом:

public static void main(String[] args) { 
    try (Socket socket = new Socket(ADDRESS, PORT); 
     BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) { 
     socket.getOutputStream().write(QUERY); 
     reader.lines().forEach(System.out::println); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 

} 

Но, к сожалению, это не работает. Еще хуже, возвращаясь к исходному коду, это даже не работает:

public static void main(String[] args) { 
    try { 
     final Socket socket = new Socket(ADDRESS, PORT); 
     final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); 
     socket.getOutputStream().write(QUERY); 
     reader.lines().parallel().forEach(System.out::println); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 

} 

Все, что было добавлено здесь был .parallel вызов, и это совершенно не работает. Он просто сидит там и ничего не распечатывается.

Я могу жить отлично и хорошо без 2-й версии, используя модифицированный try(A a = new A()) {}, поскольку в этом случае это выглядит не очень хорошо. Я не могу жить без того, чтобы понять, почему этот параллельный вызов нарушает все.

Я предполагаю, что модифицированный оператор try закрывает потоки, как только я выпаду из них (сразу после запуска forEach), поэтому они будут убиты и GC'd перед операцией. Я не могу для жизни понять, что, черт возьми, происходит с параллельным вызовом.

В соответствии с запросом здесь представлен вывод jstack, работающий на .parellel() версии этого кода.

Full thread dump OpenJDK 64-Bit Server VM (25.112-b15 mixed mode): 

"Attach Listener" #9 daemon prio=9 os_prio=0 tid=0x00007fd4f4001000 nid=0x4907 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"Service Thread" #8 daemon prio=9 os_prio=0 tid=0x00007fd5280be000 nid=0x48d2 runnable [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C1 CompilerThread2" #7 daemon prio=9 os_prio=0 tid=0x00007fd5280bb000 nid=0x48d1 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C2 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007fd5280b9800 nid=0x48d0 waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007fd5280b6800 nid=0x48cf waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007fd5280b5000 nid=0x48ce runnable [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007fd528082000 nid=0x48cd in Object.wait() [0x00007fd515c6d000] 
    java.lang.Thread.State: WAITING (on object monitor) 
    at java.lang.Object.wait(Native Method) 
    - waiting on <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock) 
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143) 
    - locked <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock) 
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164) 
    at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209) 

"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007fd52807d800 nid=0x48cc in Object.wait() [0x00007fd515d6e000] 
    java.lang.Thread.State: WAITING (on object monitor) 
    at java.lang.Object.wait(Native Method) 
    - waiting on <0x00000000ec006b40> (a java.lang.ref.Reference$Lock) 
    at java.lang.Object.wait(Object.java:502) 
    at java.lang.ref.Reference.tryHandlePending(Reference.java:191) 
    - locked <0x00000000ec006b40> (a java.lang.ref.Reference$Lock) 
    at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153) 

"main" #1 prio=5 os_prio=0 tid=0x00007fd528008000 nid=0x48c2 runnable [0x00007fd52fd9f000] 
    java.lang.Thread.State: RUNNABLE 
    at java.net.PlainSocketImpl.socketConnect(Native Method) 
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) 
    - locked <0x00000000ec086790> (a java.net.SocksSocketImpl) 
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) 
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) 
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
    at java.net.Socket.connect(Socket.java:589) 
    at java.net.Socket.connect(Socket.java:538) 
    at java.net.Socket.<init>(Socket.java:434) 
    at java.net.Socket.<init>(Socket.java:211) 
    at com.gravypod.Test.main(Test.java:48) 

"VM Thread" os_prio=0 tid=0x00007fd528075800 nid=0x48ca runnable 

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007fd52801d800 nid=0x48c4 runnable 

"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007fd52801f000 nid=0x48c5 runnable 

"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007fd528021000 nid=0x48c6 runnable 

"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007fd528022800 nid=0x48c7 runnable 

"VM Periodic Task Thread" os_prio=0 tid=0x00007fd5280c0800 nid=0x48d3 waiting on condition 

JNI global references: 18 

Строка Test.java:48 - это линия Socket socket = new Socket. Это результат полностью работающего непараллельного кода (просто используя .lines()).

Full thread dump OpenJDK 64-Bit Server VM (25.112-b15 mixed mode): 

"Attach Listener" #9 daemon prio=9 os_prio=0 tid=0x00007f9048001000 nid=0x4982 runnable [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"Service Thread" #8 daemon prio=9 os_prio=0 tid=0x00007f90800be800 nid=0x496f runnable [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C1 CompilerThread2" #7 daemon prio=9 os_prio=0 tid=0x00007f90800bb000 nid=0x496e waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C2 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007f90800b9800 nid=0x496d waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007f90800b6800 nid=0x496c waiting on condition [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007f90800b5000 nid=0x496b runnable [0x0000000000000000] 
    java.lang.Thread.State: RUNNABLE 

"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007f9080082000 nid=0x496a in Object.wait() [0x00007f907018d000] 
    java.lang.Thread.State: WAITING (on object monitor) 
    at java.lang.Object.wait(Native Method) 
    - waiting on <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock) 
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143) 
    - locked <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock) 
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164) 
    at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209) 

"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007f908007d800 nid=0x4969 in Object.wait() [0x00007f907028e000] 
    java.lang.Thread.State: WAITING (on object monitor) 
    at java.lang.Object.wait(Native Method) 
    - waiting on <0x00000000ec006b40> (a java.lang.ref.Reference$Lock) 
    at java.lang.Object.wait(Object.java:502) 
    at java.lang.ref.Reference.tryHandlePending(Reference.java:191) 
    - locked <0x00000000ec006b40> (a java.lang.ref.Reference$Lock) 
    at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153) 

"main" #1 prio=5 os_prio=0 tid=0x00007f9080008000 nid=0x4961 runnable [0x00007f90884c3000] 
    java.lang.Thread.State: RUNNABLE 
    at java.net.SocketInputStream.socketRead0(Native Method) 
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
    at java.net.SocketInputStream.read(SocketInputStream.java:170) 
    at java.net.SocketInputStream.read(SocketInputStream.java:141) 
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) 
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) 
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) 
    - locked <0x00000000ec08e890> (a java.io.InputStreamReader) 
    at java.io.InputStreamReader.read(InputStreamReader.java:184) 
    at java.io.BufferedReader.fill(BufferedReader.java:161) 
    at java.io.BufferedReader.readLine(BufferedReader.java:324) 
    - locked <0x00000000ec08e890> (a java.io.InputStreamReader) 
    at java.io.BufferedReader.readLine(BufferedReader.java:389) 
    at java.io.BufferedReader$1.hasNext(BufferedReader.java:571) 
    at java.util.Iterator.forEachRemaining(Iterator.java:115) 
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) 
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) 
    at com.gravypod.Test.main(Test.java:51) 

"VM Thread" os_prio=0 tid=0x00007f9080075800 nid=0x4968 runnable 

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007f908001d800 nid=0x4963 runnable 

"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007f908001f000 nid=0x4964 runnable 

"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007f9080021000 nid=0x4965 runnable 

"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007f9080022800 nid=0x4966 runnable 

"VM Periodic Task Thread" os_prio=0 tid=0x00007f90800c1000 nid=0x4970 waiting on condition 

JNI global references: 319 

Линия Test.java:51 является reader.lines().forEach линия.

+0

Seens как тупиковый или динамический тупик проблемы .. – BrunoDM

+1

Каковы относительные скорости (а) чтение всех данных из сокета и (б) выполнение всей обработки на разных «строках», которые вы читаете из сокета? Если (а) дольше или, по крайней мере, не намного короче, чем (б), вы не можете эффективно распараллелить это, поскольку (а) по своей сути является серийным и будет узким местом. Если (b) длинно, вы можете это сделать, но просто разделите его на два шага - чтение всех строк (однопоточность) и параллельную обработку строк. Затем вы можете наложить их немного, чтобы добиться более высокой производительности. – BeeOnRope

+1

Чтобы диагностировать проблему тупика, возьмите 'jstack' процесса, когда он зависает и публикует его. Возможно, проще заменить буферизованный читатель сокета + StringReader и посмотреть, существует ли проблема, поскольку он позволит вам опубликовать [MVCE] (http://stackoverflow.com/help/mcve). – BeeOnRope

ответ

0

Кажется, ваше приложение не висит, технически, а просто ждет большого количества ввода, прежде чем выполнять наблюдаемую работу. Это комбинация двух деталей реализации. Когда вы запускаете операцию параллельного потока, сначала попытайтесь разделить рабочую нагрузку, пока у каждого ядра процессора что-то не произойдет, прежде чем начать процесс обработки элементов. Это сочетается с проблемой Reader#lines() parallelizes badly due to nonconfigurable batch size.

Проще говоря, когда поток имеет неизвестный размер, реализация будет пытаться буферизовать партии с размерами, кратными 1024, растущим на каждом сплите. This great answer показывает, как произойдет расщепление потока неизвестного размера с несколькими ядрами, показывая, что в этом процессе буферизуют кратные элементы 1024. Это может занять очень много времени, прежде чем потребитель перейдет к forEach.

Обратите внимание, что обработка бесконечного источника с помощью некороткого замыкания forEach в любом случае выходит за рамки Stream API.Если предположить своевременный побочный эффект, это предположение о порядке обработки Stream, но об этом нет никакой гарантии.

This answer направляет вас к обходу. Вы можете использовать что-то вроде

try(Socket socket = new Socket(ADDRESS, PORT); 
    BufferedReader reader = new BufferedReader(
     new InputStreamReader(socket.getInputStream()))) { 

    socket.getOutputStream().write(QUERY); 
    Stream.generate(() -> { 
     try { return reader.readLine(); } 
     catch (IOException ex) { throw new UncheckedIOException(ex); } 
    }).parallel().forEach(System.out::println); 
} catch(IOException|UncheckedIOException e) { 
    e.printStackTrace(); 
} 

Но, как было сказано, это не целевое использование случай потока API ...

1

Я предполагаю, что parallel() или forEach() на параллельном потоке ждет, чтобы прочитать все входные данные перед параллелизацией задачи. Поскольку сервер никогда не закрывает соединение, он будет ждать всегда.

Ваша задача не является параллелизуемой. Данные поступают последовательно по кабелю, поэтому чтение его параллельно не может работать.

+0

Это не так, поскольку .lines не блокирует чтение всех входных данных. В строке doc он также говорит: «Возвращает поток, элементами которого являются строки, считанные из этого BufferedReader. Поток лениво заполнен, т. Е. Чтение происходит только во время операции с терминальным потоком. –

+0

Также я хотел бы добавить, что .lines() отлично работает, я просто не хочу тратить время на обработку, вместо того, чтобы читать из поток, который мне нужно делать постоянно (поскольку буфер будет заполняться очень быстро. На самом деле это не значит, что распараллеливать работу, так как она не занимает время, которое я мог бы читать. Я продумал дизайн, и вот как я как это сделать, моя проблема: _why_, это не работает, потому что, если я не понимаю, что это должно быть прекрасно, –

+1

Это блокировка forEach. Она будет ждать завершения всех потоков, но некоторые из них никогда не закончатся поскольку гнездо никогда не закрывается.Попробуйте закрыть сокет на сервере, и вы увидите результат. – Lucian

Смежные вопросы