2014-05-01 2 views
7

При попытке изучить Java lambdas я наткнулся на статью (указанную ниже), где в разделе, посвященном ограничениям API потока, он утверждает, что: «Ярлыки с ясными состояниями обычно не являются проблема при выполнении последовательно, но когда расписание потока распараллеливается, оно ломается ». Затем он дает этот код в качестве примера проблем из-за выполнения заказа:Java lambdas, lambdas без апатии и параллельное выполнение

List<String> ss = ...; 
List<String> result = ...; 

Stream<String> stream = ss.stream(); 

stream.map(s -> { 
    synchronized (result) { 
     if (result.size() < 10) { 
     result.add(s); 
     } 
    } 
}) 
.forEach(e -> { }); 

Я могу видеть, как это было бы не-детерминированным, если оно было распараллеливание, но то, что я не могу видеть, как вы бы это исправить с безгарантийными лямбдами - не существует чего-то неотъемлемо недетерминированного о добавлении вещей в список параллельно. Примером того, что шестилетний ребенок в шляпе мог понять, возможно, на C#, был бы очень признателен.

Ссылка на оригинал статьи http://blog.hartveld.com/2013/03/jdk-8-33-stream-api.html

+0

В чем вопрос? Вы хотите увидеть, как параллельные потоки Java 8 могут использоваться для выполнения некоторого кода для первых элементов? Или вы хотите понять, как работают алгоритмы fork-join, обходя проблему общих данных? – nosid

+0

Последнее в контексте безгражданских лямбдов. –

ответ

9

Я знаю, где вы намекаете на свой вопрос, и я сделаю все возможное, чтобы объяснить.

Рассмотрим список ввода, состоящий из 8 элементов:

[1, 2, 3, 4, 5, 6, 7, 8]

И предположим, потоки будут parallellize его следующим образом, в действительности они не делают, точный процесс parallellization довольно трудно понять.
Но пока предположите, что они разделили бы размер на два, пока не останется два элемента.

разветвленности деление будет выглядеть следующим образом:

  1. Первое деление:

    [1, 2, 3, 4]
    [5, 6, 7, 8]

  2. Второе деление:

    [1, 2]
    [3, 4]
    [5, 6]
    [7, 8]

Теперь у нас есть четыре порции, которые (в нашей теории) должны обрабатываться четырьмя различными потоками, которые не имеют никакого знания друг друга.
Это действительно может быть исправлено путем синхронизации на каком-то внешнем ресурсе, но тогда вы теряете преимущества параллелиализации, поэтому нам нужно предположить, что мы не синхронизируем, а когда мы не синхронизируем, другие потоки не будут видеть, что другие потоки сделали, поэтому наш результат будет мусором.

Теперь на вопрос о том, где вы спрашиваете о безгражданстве, как его можно правильно обрабатывать параллельно? Как вы можете добавлять элементы, которые обрабатываются параллельно в правильном порядке в список?

Сначала предположим, что простая функция сопоставления, в которой вы указываете карту лямбда i -> i + 10, а затем печатайте ее с помощью System.out::println в foreach.

Теперь после второго деления произойдет следующее:

[1, 2] -> [11, 12] -> { System.out.println(11); System.println(12); }
[3, 4] -> [13, 14] -> { System.out.println(13); System.println(14); }
[5, 6] -> [15, 16] -> { System.out.println(15); System.println(16); }
[7, 8] -> [17, 18] -> { System.out.println(17); System.println(18); }

Там нет никакой гарантии, по порядку, кроме, что все элементы, обработанные с помощью одной и той же нити (внутреннее состояние, чтобы не полагаться) обрабатывать в порядке.

Если вы хотите обработать их по порядку, то вам необходимо использовать forEachOrdered, что гарантирует, что все потоки будут работать в правильном порядке, и вы не потеряете слишком много преимуществ от параллелизма, поскольку это применимо только к до конечного состояния.

Чтобы увидеть, как вы можете добавить элементы parelllized в список, посмотрите на это, с помощью Collectors.toList(), который предоставляет методы:

  • Создание нового списка.
  • Добавление значения в список.
  • Слияние двух списков.

Теперь произойдет следующее после второго раздела:

Для каждых четырех потоков будет делать следующее (только показывая одну нить здесь):

  1. Мы были [1, 2].
  2. Мы сопоставляем его с [11, 12].
  3. Мы создаем пустой List<Integer>.
  4. В список добавляется 11.
  5. В список добавляется 12.

Теперь все потоки сделали это, и у нас есть четыре списка из двух элементов.

Теперь следующие слияния происходят в указанном порядке:

  1. [11, 12] ++ [13, 14] = [11, 12, 13, 14]
  2. [15, 16] ++ [17, 18] = [15, 16, 17, 18]
  3. Наконец [11, 12, 13, 14] ++ [15, 16, 17, 18] = [11, 12, 13, 14, 15, 16, 17, 18]

И, таким образом, результирующий список в порядке и отображение было сделано в параллели. Теперь вы также должны уяснить, почему параллаллизация требует более высокого минимума, а всего лишь два элемента, так как создание новых списков и слияние становятся слишком дорогими.

Я надеюсь, что вы понимаете, почему операции потока должна быть без гражданства, чтобы получить все преимущества parallellization.

+0

Спасибо. Все еще переваривая это. Я думаю, что моя проблема заключается в том, чтобы понять, что теперь есть безгражданная лямбда :-( –

+1

@ JohnBarça Лучше всего иметь в виду, что безгражданная лямбда - это та, которая не изменяет состояние других переменных. – skiwi

+0

Это аналогично сокращению карты ? –

0

уменьшая эту проблему, похоже, это просто найти первые 10 элементов в потоке. и отдельно делать foreach на весь поток. s.limit(10).collect(...); и s.forEach(...);. также вызов карты фактически не возвращает ничего, поэтому я сомневаюсь, что это компилируется.

0

Это был хороший пример из @skiwi, позвольте мне посмотреть, могу ли я добавить немного.

Термин «упорядоченный» в параллельных вычислениях обычно означает возвращение результата в том же порядке, что и последовательный процесс. То есть, вызовите sequential.method() или parallel.method(), результат будет выглядеть одинаково.

Проблема с forEachOrdered() заключается в том, что инфраструктура не может создавать уникальные объекты для результатов каждой задачи и заказывать их при завершении без остановки. Поэтому он обрабатывает поток как сбалансированное дерево. Структура создает ConcurrentHashMap с родительскими/дочерними ассоциациями. Он запускает левый ребенок сначала, потом правый дочерний, а затем родительский, принудительно выполняющий отношения «раньше», когда обработка должна быть параллельной. От упорядоченных результатов до упорядоченной последовательной обработки.

Что вам нужно сделать, это заказать результаты, а не обрабатывать их в порядке сбоев. Создайте объекты, содержащие часть массива для каждого окончательного деления (здесь мы используем второе разделение @ skiwi), результаты обработки должны быть заполнены вычислением и порядковым номером для каждого объекта. Пусть потоки обрабатывают объекты одновременно. Когда все потоки завершены, закажите объекты по порядковому номеру и завершите работу.

Смежные вопросы