2017-01-14 1 views
4

Я видел некоторые версии для Java 8 stream API, но все они, похоже, превращают поток в непараллельный поток. Например this одно:Как реализовать параллельную поддержку takeWhile для Stream API в Java 8?

static <T> Spliterator<T> takeWhile(
    Spliterator<T> splitr, Predicate<? super T> predicate) { 
    return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) { 
    boolean stillGoing = true; 
    @Override public boolean tryAdvance(Consumer<? super T> consumer) { 
     if (stillGoing) { 
     boolean hadNext = splitr.tryAdvance(elem -> { 
      if (predicate.test(elem)) { 
      consumer.accept(elem); 
      } else { 
      stillGoing = false; 
      } 
     }); 
     return hadNext && stillGoing; 
     } 
     return false; 
    } 
    }; 
} 

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) { 
    return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false); 
} 

Здесь StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false); поворачивает поток, передаваемый takeWhile в последовательный поток. Кто-нибудь знает о реализации, которая поддерживает параллельные потоки, или как я могу изменить этот код, чтобы поддерживать/поддерживать параллельные потоки?

+3

Вы не можете, действительно. Извините, но вам придется иметь дело с этим; это действительно последовательная операция. Вы можете использовать ограниченный параллелизм по умолчанию, который работает со всем, что вы получите в результате использования '.parallel' в этом потоке, но это так хорошо, как вы можете получить. –

+0

Чтобы извлечь какой-либо реальный параллелизм здесь, предикат должен быть безумно дорогим (например, пытаясь учитывать очень большие числа). Это не невозможно, но это маловероятно. –

ответ

2

Если ваш источник, как известно, неупорядоченное, то следующая реализация должна работать:

static final class UnorderedTakeWhileSpliterator<T> implements Spliterator<T>, Consumer<T>, Cloneable { 
    private final Predicate<? super T> predicate; 
    private final AtomicBoolean checked = new AtomicBoolean(); 
    private Spliterator<T> source; 
    private T cur; 

    UnorderedTakeWhileSpliterator(Spliterator<T> source, Predicate<? super T> predicate) { 
     this.predicate = predicate; 
     this.source = source; 
    } 

    @Override 
    public void accept(T t) { 
     this.cur = t; 
    } 

    @Override 
    public boolean tryAdvance(Consumer<? super T> action) { 
     if (!checked.get() && source.tryAdvance(this)) { 
      if (predicate.test(cur)) { 
       action.accept(cur); 
       return true; 
      } else { 
       checked.set(true); 
      } 
     } 
     return false; 
    } 

    @Override 
    public Spliterator<T> trySplit() { 
     Spliterator<T> prefix = source.trySplit(); 
     if(prefix == null) { 
      return null; 
     } 
     if(checked.get()) { 
      return Spliterators.emptySpliterator(); 
     } 
     UnorderedTakeWhileSpliterator<T> clone; 
     try { 
      clone = (UnorderedTakeWhileSpliterator<T>) clone(); 
     } catch (CloneNotSupportedException e) { 
      throw new InternalError(e); 
     } 
     clone.source = prefix; 
     return clone; 
    } 

    @Override 
    public long estimateSize() { 
     return source.estimateSize(); 
    } 

    @Override 
    public int characteristics() { 
     return source.characteristics() & (DISTINCT | SORTED | NONNULL); 
    } 

    @Override 
    public Comparator<? super T> getComparator() { 
     return source.getComparator(); 
    } 
} 

Создание потока с помощью следующего метода:

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) { 
    return StreamSupport.stream(UnorderedTakeWhileSpliterator<>(stream.spliterator(), predicate), stream.isParallel()); 
} 

Заказанный реализация будет гораздо сложнее, как он должен буферизовать не префиксные элементы и распространять отмену на суффиксы. Что-то вроде этого реализовано в JDK-9 (не как spliterator, а как обычная операция потока), но я сомневаюсь, что даже эта сложная реализация во многих случаях выигрывает в последовательном потоке.

+0

Спасибо, но у меня нет класса с именем UnorderedTDOfRef? Где я могу это найти? – Johan

+1

@Johan, исправленный, извините. –

+0

Это работает :) Не в тему, но это то, что вы включите в библиотеку StreamEx? Это было бы круто! – Johan

Смежные вопросы