2014-10-27 6 views
7

Я хочу получить и удалить следующий элемент с Java 8 Stream, без этого Stream будет закрыт.Получение следующего элемента из потока Java 8

Stream<Integer> integerStream = Stream.iterate(0, x -> new Integer(x + 1)); 
Integer zero = integerStream.getNext(); // 0 
Integer one = integerStream.getNext(); // 1 
... 

Возможно ли это?

+1

Вы хотите на самом деле использовать удаленные элементы, или просто отбросьте их? Для последнего «.skip (n)» - это способ сделать это. –

ответ

1

на основе Stuart's answer и с Iterator-to-Stream conversion, я придумал следующий быстрый и грязный класс обертку. Он не протестирован, и он не является потокобезопасным, но он предоставляет мне то, что мне сейчас нужно — удаление и использование отдельных элементов при сохранении поток «открыт».

PeelingStream<T> обеспечивает метод T getNext(), что ограждает прочь someWrappedStream.iterator() «s терминальные операции потока семантика:

public class PeelingStream<T> implements Stream<T> { 

    private Stream<T> wrapped; 

    public PeelingStream(Stream<T> toBeWrapped) { 
     this.wrapped = toBeWrapped; 
    } 

    public T getNext() { 
     Iterator<T> iterator = wrapped.iterator(); 
     T next = iterator.next(); 
     Iterable<T> remainingIterable =() -> iterator; 
     wrapped = StreamSupport.stream(remainingIterable.spliterator(), 
       false); 

     return next; 
    } 

    ///////////////////// from here, only plain delegate methods 

    public Iterator<T> iterator() { 
     return wrapped.iterator(); 
    } 

    public Spliterator<T> spliterator() { 
     return wrapped.spliterator(); 
    } 

    public boolean isParallel() { 
     return wrapped.isParallel(); 
    } 

    public Stream<T> sequential() { 
     return wrapped.sequential(); 
    } 

    public Stream<T> parallel() { 
     return wrapped.parallel(); 
    } 

    public Stream<T> unordered() { 
     return wrapped.unordered(); 
    } 

    public Stream<T> onClose(Runnable closeHandler) { 
     return wrapped.onClose(closeHandler); 

    } 

    public void close() { 
     wrapped.close(); 
    } 

    public Stream<T> filter(Predicate<? super T> predicate) { 
     return wrapped.filter(predicate); 
    } 

    public <R> Stream<R> map(Function<? super T, ? extends R> mapper) { 
     return wrapped.map(mapper); 
    } 

    public IntStream mapToInt(ToIntFunction<? super T> mapper) { 
     return wrapped.mapToInt(mapper); 
    } 

    public LongStream mapToLong(ToLongFunction<? super T> mapper) { 
     return wrapped.mapToLong(mapper); 
    } 

    public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) { 
     return wrapped.mapToDouble(mapper); 
    } 

    public <R> Stream<R> flatMap(
      Function<? super T, ? extends Stream<? extends R>> mapper) { 
     return wrapped.flatMap(mapper); 
    } 

    public IntStream flatMapToInt(
      Function<? super T, ? extends IntStream> mapper) { 
     return wrapped.flatMapToInt(mapper); 
    } 

    public LongStream flatMapToLong(
      Function<? super T, ? extends LongStream> mapper) { 
     return wrapped.flatMapToLong(mapper); 
    } 

    public DoubleStream flatMapToDouble(
      Function<? super T, ? extends DoubleStream> mapper) { 
     return wrapped.flatMapToDouble(mapper); 
    } 

    public Stream<T> distinct() { 
     return wrapped.distinct(); 
    } 

    public Stream<T> sorted() { 
     return wrapped.sorted(); 
    } 

    public Stream<T> sorted(Comparator<? super T> comparator) { 
     return wrapped.sorted(comparator); 
    } 

    public Stream<T> peek(Consumer<? super T> action) { 
     return wrapped.peek(action); 
    } 

    public Stream<T> limit(long maxSize) { 
     return wrapped.limit(maxSize); 
    } 

    public Stream<T> skip(long n) { 
     return wrapped.skip(n); 
    } 

    public void forEach(Consumer<? super T> action) { 
     wrapped.forEach(action); 
    } 

    public void forEachOrdered(Consumer<? super T> action) { 
     wrapped.forEachOrdered(action); 
    } 

    public Object[] toArray() { 
     return wrapped.toArray(); 
    } 

    public <A> A[] toArray(IntFunction<A[]> generator) { 
     return wrapped.toArray(generator); 
    } 

    public T reduce(T identity, BinaryOperator<T> accumulator) { 
     return wrapped.reduce(identity, accumulator); 
    } 

    public Optional<T> reduce(BinaryOperator<T> accumulator) { 
     return wrapped.reduce(accumulator); 
    } 

    public <U> U reduce(U identity, 
      BiFunction<U, ? super T, U> accumulator, 
      BinaryOperator<U> combiner) { 
     return wrapped.reduce(identity, accumulator, combiner); 
    } 

    public <R> R collect(Supplier<R> supplier, 
      BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) { 
     return wrapped.collect(supplier, accumulator, combiner); 
    } 

    public <R, A> R collect(Collector<? super T, A, R> collector) { 
     return wrapped.collect(collector); 
    } 

    public Optional<T> min(Comparator<? super T> comparator) { 
     return wrapped.min(comparator); 
    } 

    public Optional<T> max(Comparator<? super T> comparator) { 
     return wrapped.max(comparator); 
    } 

    public long count() { 
     return wrapped.count(); 
    } 

    public boolean anyMatch(Predicate<? super T> predicate) { 
     return wrapped.anyMatch(predicate); 
    } 

    public boolean allMatch(Predicate<? super T> predicate) { 
     return wrapped.allMatch(predicate); 
    } 

    public boolean noneMatch(Predicate<? super T> predicate) { 
     return wrapped.noneMatch(predicate); 
    } 

    public Optional<T> findFirst() { 
     return wrapped.findFirst(); 
    } 

    public Optional<T> findAny() { 
     return wrapped.findAny(); 
    } 

} 

Небольшой тест:

@Test 
public void testPeelingOffItemsFromStream() { 

    Stream<Integer> infiniteStream = Stream.iterate(0, x -> x + 1); 

    PeelingStream<Integer> peelingInfiniteStream = new PeelingStream<>(infiniteStream); 

    Integer one = peelingInfiniteStream.getNext(); 
    assertThat(one, equalTo(0)); 

    Integer two = peelingInfiniteStream.getNext(); 
    assertThat(two, equalTo(1)); 

    Stream<Integer> limitedStream = peelingInfiniteStream.limit(3); // 2 3 4 
    int sumOf234 = limitedStream.mapToInt(x -> x.intValue()).sum(); 
    assertThat(sumOf234, equalTo(2 + 3 + 4)); 

} 
13

Да, есть способ сделать это, но с некоторыми ограничениями.

Stream<Integer> infiniteStream = Stream.iterate(0, x -> new Integer(x + 1)); 
Iterator<Integer> iter = infiniteStream.iterator(); 
Integer zero = iter.next(); 
Integer one = iter.next(); 

В качестве альтернативы,

Stream<Integer> infiniteStream = Stream.iterate(0, x -> new Integer(x + 1)); 
Spliterator<Integer> spliterator = infiniteStream.spliterator(); 
spliterator.tryAdvance(i -> System.out.println(i)); // zero 
spliterator.tryAdvance(i -> System.out.println(i)); // one 

Учитывая в Stream, можно получить Iterator или Spliterator от него, или запросить ли это параллельный поток, и т.д. Они определяются на интерфейсе BaseStream, суперинтерфейс Stream, что делает их немного простыми.

В этом случае мы знаем поток бесконечен, поэтому нет необходимости вызывать hasNext() метод итератора или проверить возвращаемое значение Spliterator в tryAdvance()

ограничения является, что оба iterator() и spliterator() методов Stream: Операции с терминалами, что означает, что после их вызова возвращенный Iterator или Spliterator имеет эксклюзивный доступ к значениям, представленным Stream. Дальнейшие операции над потоком (такие как filter или map и т. Д.) Не допускаются и будут выполняться с IllegalStateException.

Если вы хотите, чтобы слезть первые пару элементов, а затем возобновить обработку потока, вы можете превратить spliterator обратно в поток таким образом:

Stream<Integer> stream2 = StreamSupport.stream(spliterator, false); 

Это вероятно, будет работать нормально для некоторых вещей, но я Не уверен, что я рекомендую эту технику в целом. Я думаю, что он добавляет несколько дополнительных объектов и, следовательно, дополнительные вызовы методов на пути создания следующего элемента.

Редакционные комментарии (не относящиеся к данному вопросу):

  • Не используйте new Integer(val). Вместо этого используйте Integer.valueOf(val), который будет повторно использовать целое число в штучной упаковке, если оно доступно, что обычно справедливо для значений в диапазоне от -128 до 127.
  • Вы можете использовать IntStream вместо Stream<Integer>, который полностью избегает боксерских накладных расходов. Он не имеет полного дополнения к потоковым операциям, но имеет iterate(), который выполняет функцию, которая работает с примитивными значениями int.
+3

Или вы можете просто использовать 'x -> x + 1', который короче, а также использует' Integer.valueOf' (проверено только сейчас с 8u25 и 'javap -c -p'). –

Смежные вопросы