Я пишу код, который будет считывать строки журнала и выполнять некоторую обработку в фоновом режиме по этим данным. Эта обработка, вероятно, принесет пользу от распараллеливания, такого как методы Stream.parallel, и я пытался использовать это. Это код, который я начал с того, что работает отлично.Использование BufferedReader.lines() прерывается параллельно
public static void main(String[] args) {
try {
final Socket socket = new Socket(ADDRESS, PORT);
final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
socket.getOutputStream().write(QUERY);
reader.lines().forEach(System.out::println);
} catch (IOException e) {
e.printStackTrace();
}
}
Этот код связывает и распечатывает все мои данные. Я очень хотел бы перестроить этот код следующим образом:
public static void main(String[] args) {
try (Socket socket = new Socket(ADDRESS, PORT);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
socket.getOutputStream().write(QUERY);
reader.lines().forEach(System.out::println);
} catch (IOException e) {
e.printStackTrace();
}
}
Но, к сожалению, это не работает. Еще хуже, возвращаясь к исходному коду, это даже не работает:
public static void main(String[] args) {
try {
final Socket socket = new Socket(ADDRESS, PORT);
final BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
socket.getOutputStream().write(QUERY);
reader.lines().parallel().forEach(System.out::println);
} catch (IOException e) {
e.printStackTrace();
}
}
Все, что было добавлено здесь был .parallel вызов, и это совершенно не работает. Он просто сидит там и ничего не распечатывается.
Я могу жить отлично и хорошо без 2-й версии, используя модифицированный try(A a = new A()) {}
, поскольку в этом случае это выглядит не очень хорошо. Я не могу жить без того, чтобы понять, почему этот параллельный вызов нарушает все.
Я предполагаю, что модифицированный оператор try закрывает потоки, как только я выпаду из них (сразу после запуска forEach), поэтому они будут убиты и GC'd перед операцией. Я не могу для жизни понять, что, черт возьми, происходит с параллельным вызовом.
В соответствии с запросом здесь представлен вывод jstack, работающий на .parellel() версии этого кода.
Full thread dump OpenJDK 64-Bit Server VM (25.112-b15 mixed mode):
"Attach Listener" #9 daemon prio=9 os_prio=0 tid=0x00007fd4f4001000 nid=0x4907 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Service Thread" #8 daemon prio=9 os_prio=0 tid=0x00007fd5280be000 nid=0x48d2 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C1 CompilerThread2" #7 daemon prio=9 os_prio=0 tid=0x00007fd5280bb000 nid=0x48d1 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007fd5280b9800 nid=0x48d0 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007fd5280b6800 nid=0x48cf waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007fd5280b5000 nid=0x48ce runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007fd528082000 nid=0x48cd in Object.wait() [0x00007fd515c6d000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
- locked <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007fd52807d800 nid=0x48cc in Object.wait() [0x00007fd515d6e000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000ec006b40> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
- locked <0x00000000ec006b40> (a java.lang.ref.Reference$Lock)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)
"main" #1 prio=5 os_prio=0 tid=0x00007fd528008000 nid=0x48c2 runnable [0x00007fd52fd9f000]
java.lang.Thread.State: RUNNABLE
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
- locked <0x00000000ec086790> (a java.net.SocksSocketImpl)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at java.net.Socket.connect(Socket.java:538)
at java.net.Socket.<init>(Socket.java:434)
at java.net.Socket.<init>(Socket.java:211)
at com.gravypod.Test.main(Test.java:48)
"VM Thread" os_prio=0 tid=0x00007fd528075800 nid=0x48ca runnable
"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007fd52801d800 nid=0x48c4 runnable
"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007fd52801f000 nid=0x48c5 runnable
"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007fd528021000 nid=0x48c6 runnable
"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007fd528022800 nid=0x48c7 runnable
"VM Periodic Task Thread" os_prio=0 tid=0x00007fd5280c0800 nid=0x48d3 waiting on condition
JNI global references: 18
Строка Test.java:48 - это линия Socket socket = new Socket
. Это результат полностью работающего непараллельного кода (просто используя .lines()).
Full thread dump OpenJDK 64-Bit Server VM (25.112-b15 mixed mode):
"Attach Listener" #9 daemon prio=9 os_prio=0 tid=0x00007f9048001000 nid=0x4982 runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Service Thread" #8 daemon prio=9 os_prio=0 tid=0x00007f90800be800 nid=0x496f runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C1 CompilerThread2" #7 daemon prio=9 os_prio=0 tid=0x00007f90800bb000 nid=0x496e waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread1" #6 daemon prio=9 os_prio=0 tid=0x00007f90800b9800 nid=0x496d waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"C2 CompilerThread0" #5 daemon prio=9 os_prio=0 tid=0x00007f90800b6800 nid=0x496c waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Signal Dispatcher" #4 daemon prio=9 os_prio=0 tid=0x00007f90800b5000 nid=0x496b runnable [0x0000000000000000]
java.lang.Thread.State: RUNNABLE
"Finalizer" #3 daemon prio=8 os_prio=0 tid=0x00007f9080082000 nid=0x496a in Object.wait() [0x00007f907018d000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
- locked <0x00000000ec008e98> (a java.lang.ref.ReferenceQueue$Lock)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
"Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x00007f908007d800 nid=0x4969 in Object.wait() [0x00007f907028e000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000ec006b40> (a java.lang.ref.Reference$Lock)
at java.lang.Object.wait(Object.java:502)
at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
- locked <0x00000000ec006b40> (a java.lang.ref.Reference$Lock)
at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)
"main" #1 prio=5 os_prio=0 tid=0x00007f9080008000 nid=0x4961 runnable [0x00007f90884c3000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:170)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
- locked <0x00000000ec08e890> (a java.io.InputStreamReader)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
- locked <0x00000000ec08e890> (a java.io.InputStreamReader)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at java.io.BufferedReader$1.hasNext(BufferedReader.java:571)
at java.util.Iterator.forEachRemaining(Iterator.java:115)
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
at com.gravypod.Test.main(Test.java:51)
"VM Thread" os_prio=0 tid=0x00007f9080075800 nid=0x4968 runnable
"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007f908001d800 nid=0x4963 runnable
"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007f908001f000 nid=0x4964 runnable
"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x00007f9080021000 nid=0x4965 runnable
"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x00007f9080022800 nid=0x4966 runnable
"VM Periodic Task Thread" os_prio=0 tid=0x00007f90800c1000 nid=0x4970 waiting on condition
JNI global references: 319
Линия Test.java:51 является reader.lines().forEach
линия.
Seens как тупиковый или динамический тупик проблемы .. – BrunoDM
Каковы относительные скорости (а) чтение всех данных из сокета и (б) выполнение всей обработки на разных «строках», которые вы читаете из сокета? Если (а) дольше или, по крайней мере, не намного короче, чем (б), вы не можете эффективно распараллелить это, поскольку (а) по своей сути является серийным и будет узким местом. Если (b) длинно, вы можете это сделать, но просто разделите его на два шага - чтение всех строк (однопоточность) и параллельную обработку строк. Затем вы можете наложить их немного, чтобы добиться более высокой производительности. – BeeOnRope
Чтобы диагностировать проблему тупика, возьмите 'jstack' процесса, когда он зависает и публикует его. Возможно, проще заменить буферизованный читатель сокета + StringReader и посмотреть, существует ли проблема, поскольку он позволит вам опубликовать [MVCE] (http://stackoverflow.com/help/mcve). – BeeOnRope