2016-06-29 1 views
1

Я создал библиотеку разбора, которая принимает предоставленный ввод и возвращает поток записей. Затем программа вызывает эту библиотеку и обрабатывает результаты. В моем случае, моя программа использует что-то вродеКак создать возвращенный поток, который может использовать skip

recordStream.forEach(r -> insertIntoDB(r)); 

Один из типов ввода, которые могут быть предоставлены в библиотеке синтаксического анализа является плоский файл, который может иметь строку заголовка. Таким образом, библиотека синтаксического анализа может быть настроена так, чтобы пропускать строку заголовка. Если строка заголовка сконфигурирована, она добавляет элемент перехода (n) к возврату, например.

Files.lines(input)**.skip(1)**.parallel().map(r -> createRecord(r)); 

Библиотека синтаксического анализа возвращает результирующий поток.

Но, кажется, что пропуск, параллель и forEach не играют хорошо вместе. Конечный программист должен вместо этого запускать forEachOrdered, но это плохой дизайн, чтобы поставить это требование на программиста, чтобы они знали, что они должны использовать forEachOrdered при работе с типом ввода файла со строкой заголовка.

Как я могу принудительно выполнить заказное требование, когда это необходимо, в рамках построения возвращенной цепочки потоков, чтобы вернуть полностью функциональный поток писателю программы вместо потока со скрытыми ограничениями? Ответ на завершение потока в другом потоке?

+0

Простейшее решение - требовать, чтобы JRE обновлялся, то есть '1,8u60' или новее ... – Holger

ответ

2

forEachOrdered необходимо не из-за skip(), а потому, что ваш поток параллелен. Даже если поток параллельно, поток будет пропускать первый элемент, как указано в документации:

Хотя пропустить(), как правило, дешевая операция по последовательных потоков трубопроводов, это может быть довольно дорогим на упорядоченный параллельные конвейеры, особенно для больших значений n, поскольку skip (n) ограничено пропускать не только любые n элементов, но и первые n элементов в порядке встречи.

Очевидно, что forEach не обязательно соблюдает порядок. Не используя forEachOrdered, когда вы заботитесь о порядке просто злоупотребление потока API:

Поведение этой операции явно недетерминировано. Для параллельных поточных конвейеров эта операция не гарантирует уважения порядка встречи потока, так как это принесет жертву преимущества параллелизма.

Я бы не возвращал параллельный поток из библиотеки. Я бы вернул последовательный (где forEach будет уважать порядок), и позвоните вызывающему абоненту parallel() и предположите последствия, если он захочет.

Использование параллельного потока по умолчанию - bad idea.

+0

Даже если я удалю' parallel() ', у конечного программиста будет неожиданное требование использовать' forEachOrdered() 'если они добавляют' parallel() ', только в некоторых случаях. Мне нужно пропустить, чтобы работать надежно, несмотря на то, что делает или не делает конечный программист. Я возвращаюсь к использованию 'skip()' и 'forEachOrdered()' в библиотеке (только там, где это необходимо), чтобы инкапсулировать это из конечного программиста и подавать результаты в новый поток. – Aaron

+0

Это совсем не неожиданно. Он четко документирован и, таким образом, ожидается любым, кто знает, как использовать API потока. И дело не только в некоторых обстоятельствах. Каждый раз, когда вы используете параллельный поток, вы не можете полагаться на forEach для соблюдения порядка. Точно так же, каждый раз, когда вы выполняете одну и ту же задачу из нескольких потоков, вы не можете ожидать, что задачи будут выполняться последовательно. Наконец, как я сказал в своем ответе, 'skip()' ** делает ** работать надежно, является ли поток параллельным или нет. –

+0

Если skip() не надежно пропускает первую строку, когда поток параллелен, это означает, что ваш поток не упорядочен. Но Files.lines() возвращает упорядоченный поток. –

0

Учитывая соответствующий сценарий, в котором

  • Источник потока является настройка с помощью skip
  • код клиента запрашивает parallel() выполнение
  • клиентский код формирования цепочки неупорядоченный терминальный действие как forEach
  • в код работает на JRE старше 1.8u60

У нас есть совершенно особая комбинация обстоятельств, все из которых находятся вне контроля конкретной функции библиотеки, которая свяжет операцию .map(r -> createRecord(r)).

Я не думаю, что ответственность лежит на этом этапе. В общем, код приложения не несет ответственности за исправление вещей, которые уже признаны ошибками JRE и исправлены в обновленных версиях.

Если по какой-либо причине вы считаете, что необходимо обеспечить обход для старых JRE, для этого потребуется источник потока, требующий операции skip.

Для этого конкретного случая это не так сложно. Вы можете create the BufferedReader directly, вызывать readLine(), чтобы пропустить первую строку, а затем вернуть результат lines(), что позволяет обрабатывать все оставшиеся строки. Это может быть еще более эффективным, поскольку параллельный поток имеет операцию skip.

Более общее решение будет операция «стремится пропустить первый», как это:

public static <T> Stream<T> skipFirstImmediately(Stream<T> source) { 
    Spliterator<T> sp=source.spliterator(); 
    sp.tryAdvance(skipped -> {}); 
    return StreamSupport.stream(sp, source.isParallel()); 
} 

Обратите внимание, что при использовании этого метода, из-за свойства текущей реализации потока, это может быть полезным, чтобы включить source Stream перед параллельным вызовом этого метода вместо того, чтобы превращать результирующий поток в параллельный, если требуется параллельное выполнение.

Это может быть проверено путем сравнения выходного

skipFirstImmediately(IntStream.range(0, 10).parallel().boxed()) 
    .peek(x -> System.out.println(Thread.currentThread())) 
    .forEach(System.out::println); 

и

skipFirstImmediately(IntStream.range(0, 10).boxed()).parallel() 
    .peek(x -> System.out.println(Thread.currentThread())) 
    .forEach(System.out::println); 

который будет правильным в любом случае, но не используя возможности SMP в последнем.

Смежные вопросы