2016-02-03 3 views
11

Почему приведенный ниже код не печатает какой-либо вывод, тогда как если мы удаляем параллель, он печатает 0, 1?параллельная обработка с бесконечным потоком в Java

IntStream.iterate(0, i -> (i + 1) % 2) 
     .parallel() 
     .distinct() 
     .limit(10) 
     .forEach(System.out::println); 

Хотя я знаю, что в идеале предел должен быть перед отчетливые, но мой вопрос в большей степени связана с той разницей, вызванной добавлением параллельной обработки.

+0

На моей машине это блокирует 3/4 ядра при 100% использовании ЦП без получения ответа! Я думаю, что это может быть ошибкой во взаимодействии между пределом и параллелью. – Straw1239

+0

@ Straw1239 - согласитесь, что это должно считаться ошибкой; нет очевидной причины, почему этот код не печатает что-то быстро. – ZhongYu

+1

Он пытается буферизировать содержимое всего потока в 'distinct' op. Это происходит даже без «предела». Хотя ясно, что потоки, обрабатывающие последующие куски, должны ждать своего предыдущего для упорядоченного потока, первый не нужно ждать ... – Holger

ответ

6

Реальная причина в том, что заказал параллельный.distinct() является полной операцией барьера, как described в документации:

сохранение стабильности для distinct() в параллельных трубопроводах является относительно дорогостоящего (требует, что операция действует как полный барьер , с существенными накладными расходами), и стабильность часто не требуется.

«Работа с полным барьером» означает, что все восходящие операции должны выполняться до того, как начнется нисходящий поток. В Stream API всего две полные барьерные операции: .sorted() (каждый раз) и .distinct() (в заказе параллельного корпуса). Поскольку у вас есть бесконечный поток с коротким замыканием, который подается на .distinct(), вы получаете бесконечный цикл. По договору .distinct() не может просто излучать элементы в нисходящем направлении в любом порядке: он всегда должен излучать первый повторяющийся элемент. Хотя теоретически возможно реализовать параллельное упорядоченное .distinct(), это было бы гораздо более сложной реализацией.

Что касается решения, @ user140547 прав: добавить .unordered() перед тем .distinct() это переключает distinct() алгоритма к неупорядоченным один (который просто использует общий ConcurrentHashMap хранить все наблюдаемые элементы и излучает каждый новый элемент вниз по течению). Обратите внимание, что добавление .unordered()после.distinct() не поможет.

4

Stream.iterate возвращает «бесконечный последовательный упорядоченный поток». Следовательно, параллельный параллельный поток не слишком полезен.

Согласно описанию Stream package:

Для параллельных потоков, ослабляя ограничение упорядочения может иногда обеспечить более эффективное выполнение. Некоторые агрегированные операции, такие как фильтрация дубликатов (distinct()) или сгруппированные сокращения (Collectors.groupingBy()), могут быть реализованы более эффективно, если упорядочение элементов не имеет значения. Точно так же операции, которые по своей сути связаны с порядком, такие как limit(), могут требовать буферизации для обеспечения правильного упорядочения, подрывая преимущество параллелизма. В тех случаях, когда поток имеет порядок встреч, но пользователь не особо заботится об этом порядке, явно отключая поток с неупорядоченным(), может улучшить параллельную производительность для некоторых операций с состоянием или терминалом. Однако большинство поточных конвейеров, таких как пример «суммы веса блоков» выше, по-прежнему эффективно распараллеливаются даже при ограничениях порядка.

Это похоже на ваш случай, используя неупорядоченный(), он печатает 0,1.

IntStream.iterate(0, i -> (i + 1) % 2) 
      .parallel() 
      .unordered() 
      .distinct() 
      .limit(10) 
      .forEach(System.out::println); 
1

Этот код имеет большую проблему, даже без параллельного: После .distinct(), поток будет иметь только 2 элементы- поэтому предел никогда не пинает не- он будет печатать эти два элемента, а затем продолжить теряя время вашего процессора на неопределенное время. Возможно, это было то, что вы намеревались.

С параллелью и ограничением, я считаю, что проблема усугубляется из-за того, как работа делится. Я не прослеживал весь путь через параллельный поток кода, но здесь есть моя догадка:

Параллельный код делит работу между несколькими потоками, которые все повторяются бесконечно, поскольку они никогда не заполняют свою квоту. Система, вероятно, ждет завершения каждого потока, чтобы он мог комбинировать свои результаты, чтобы обеспечить четкость по порядку, но это никогда не произойдет в том случае, если вы предоставите.

Без требования к порядку результаты каждого рабочего потока могут быть немедленно использованы после проверки на глобальный набор различий.

Без ограничений я подозреваю, что для обработки бесконечных потоков используется другой код: вместо того, чтобы ждать заполнения 10, результаты сообщаются как обнаруженные. Его немного похоже на создание итератора, который сообщает hasNext() = true, сначала производит 0, затем 1, затем следующий() вызов вешает вечно без создания результата - в параллельном случае что-то ждет несколько отчетов, чтобы он мог правильно комбинировать/заказывайте их перед выводом, в то время как в серийном случае он делает то, что он может затем висит.

Попытайтесь найти точную разницу в стеке вызовов с и без отдельных() или limit(), но пока очень сложно перемещаться по довольно сложной последовательности вызовов библиотеки потоков.

+1

Вы, должно быть, попробовали другую реализацию. Что касается Oracle 1.8.0_60 и 1.8.0_65, не имеет значения, используете ли вы '.limit (10)' или '.limit (2)' и даже опустив 'limit', это не изменит поведение , – Holger

2

Я знаю, что код неверен и, как предлагается в решении, если мы переместим предел до выделения, у нас не будет бесконечного цикла.

Параллельная функция использует концепцию fork и join, чтобы выделить работу, которая выделяет весь доступный поток для работы, а не отдельный поток.

Мы справедливо ожидаем бесконечный цикл, поскольку многократный поток бесконечно обрабатывает данные и ничего не останавливает их, так как предел 10 никогда не ударяется после различий.

Возможно, он продолжает пытаться развиваться и никогда не пытается присоединиться, чтобы продвинуть его вперед. Но все же я считаю, что это дефект в java больше всего на свете.

+1

Перемещение 'limit' перед' distinct' изменит семантику. Кроме того, вы можете сделать свой код правильным, используя 'limit (2)', но проблема все еще остается. – Holger

Смежные вопросы