2015-06-04 3 views
50

У меня есть большой файл, содержащий список элементов.Java 8 Stream с пакетной обработкой

Я хотел бы создать партию элементов, сделать HTTP-запрос с этой партией (все элементы необходимы в качестве параметров в HTTP-запросе). Я могу сделать это очень легко с цикла for, но, как любовник Java 8, я хочу попытаться написать это с помощью платформы Stream Java (и воспользоваться преимуществами ленивой обработки).

Пример:

List<String> batch = new ArrayList<>(BATCH_SIZE); 
for (int i = 0; i < data.size(); i++) { 
    batch.add(data.get(i)); 
    if (batch.size() == BATCH_SIZE) process(batch); 
} 

if (batch.size() > 0) process(batch); 

Я хочу сделать что-то длинный линия lazyFileStream.group(500).map(processBatch).collect(toList())

Что бы лучший способ сделать это?

+3

Не могли бы вы привести пример того, что вам нужно, используя цикл 'for'? – Edd

+0

Я не могу понять, как выполнить группировку, извините, но [Files # lines] (https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html # lines-java.nio.file.Path-java.nio.charset.Charset-) будет лениво читать содержимое файла. – Toby

+0

, так что вам в основном нужна обратная функция 'flatMap' (+ дополнительная flatMap, чтобы свернуть потоки снова)? Я не думаю, что что-то подобное существует как удобный метод в стандартной библиотеке.Либо вам придется найти стороннюю библиотеку, либо написать свой собственный на основе разделителей и/или коллектора, излучающего поток потоков. – the8472

ответ

12

Вы можете сделать это с jOOλ, библиотека, которая расширяет Java 8 потоков для однопоточных, последовательных потоков потребительных случаях:

Seq.seq(lazyFileStream)    // Seq<String> 
    .zipWithIndex()     // Seq<Tuple2<String, Long>> 
    .groupBy(tuple -> tuple.v2/500) // Map<Long, List<String>> 
    .forEach((index, batch) -> { 
     process(batch); 
    }); 

За кулисами, zipWithIndex() просто:

static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) { 
    final Iterator<T> it = stream.iterator(); 

    class ZipWithIndex implements Iterator<Tuple2<T, Long>> { 
     long index; 

     @Override 
     public boolean hasNext() { 
      return it.hasNext(); 
     } 

     @Override 
     public Tuple2<T, Long> next() { 
      return tuple(it.next(), index++); 
     } 
    } 

    return seq(new ZipWithIndex()); 
} 

... тогда как groupBy() является удобством API для:

default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) { 
    return collect(Collectors.groupingBy(classifier)); 
} 

(Отказ от ответственности: Я работаю в компании за jOOλ)

+0

Wow. Это ТОЧНО, что я ищу. Наша система, как правило, обрабатывает потоки данных в последовательности, поэтому это было бы хорошо подойдет для перехода на Java 8. –

+8

Обратите внимание, что это решение без необходимости хранит весь входной поток на промежуточной «карте» (в отличие, например, от решения Ben Manes) –

25

Pure Java-8 реализации также возможно:

int BATCH = 500; 
IntStream.range(0, (data.size()+BATCH-1)/BATCH) 
     .mapToObj(i -> data.subList(i*BATCH, Math.min(data.size(), (i+1)*BATCH))) 
     .forEach(batch -> process(batch)); 

Обратите внимание, что в отличие от JOOl он может прекрасно работать параллельно (при условии, что ваш data является случайный список доступа).

+0

если ваши данные на самом деле являются потоком? (скажем, строки в файле или даже из сети). –

+4

@OmryYadan, вопрос состоял в том, что у него есть вход из 'List' (см.' Data.size() ',' data.get() 'в вопросе). Я отвечаю на вопрос. Если у вас есть другой вопрос, спросите его (хотя я думаю, что вопрос о потоке тоже был задан). –

+0

Как обрабатывать партии параллельно? –

6

Вы также можете посмотреть cyclops-react, я являюсь автором этой библиотеки. Он реализует интерфейс jOOλ (и с расширением JDK 8 Streams), но в отличие от JDK 8 Parallel Streams он фокусируется на операциях Asyncrhonous (например, потенциально блокирует вызовы Async I/O). JDK Parallel Streams, напротив, сосредоточены на параллелизме данных для операций с ЦП. Он работает путем управления агрегатами будущих задач под капотом, но представляет собой стандартный расширенный Stream API для конечных пользователей.

Этот пример кода может помочь вам начать работу

LazyFutureStream.parallelCommonBuilder() 
       .react(data) 
       .grouped(BATCH_SIZE)     
       .map(this::process) 
       .run(); 

Существует tutorial on batching here

И more general Tutorial here

Чтобы использовать свой собственный пул потоков (который, вероятно, более подходящим для блокировки ввода/O), вы можете начать обработку с

 LazyReact reactor = new LazyReact(40); 

    reactor.react(data) 
      .grouped(BATCH_SIZE)     
      .map(this::process) 
      .run(); 
7

Вы также можете использовать RxJava:

Observable.from(data).buffer(BATCH_SIZE).forEach((batch) -> process(batch)); 

или

Observable.from(lazyFileStream).buffer(500).map((batch) -> process(batch)).toList(); 

или

Observable.from(lazyFileStream).buffer(500).map(MyClass::process).toList(); 
60

Для полноты, вот Guava решение.

Iterators.partition(stream.iterator(), batchSize).forEachRemaining(this::process); 

В вопросе коллекция доступна поэтому поток не нужен, и его можно записать в виде,

Iterables.partition(data, batchSize).forEach(this::process); 
+0

Это выглядит проще всего и наиболее читаемо для меня. Спасибо, что поделился! – grinch

+4

'Lists.partition' - еще один вариант, который я должен был упомянуть. –

19

Чистое решение Java 8:

Мы можем создать пользовательский коллектор, чтобы сделать это элегантно, который принимает batch size и Consumer для обработки каждой партии:

import java.util.ArrayList; 
import java.util.Collections; 
import java.util.List; 
import java.util.Set; 
import java.util.function.*; 
import java.util.stream.Collector; 

import static java.util.Objects.requireNonNull; 


/** 
* Collects elements in the stream and calls the supplied batch processor 
* after the configured batch size is reached. 
* 
* In case of a parallel stream, the batch processor may be called with 
* elements less than the batch size. 
* 
* The elements are not kept in memory, and the final result will be an 
* empty list. 
* 
* @param <T> Type of the elements being collected 
*/ 
class BatchCollector<T> implements Collector<T, List<T>, List<T>> { 

    private final int batchSize; 
    private final Consumer<List<T>> batchProcessor; 


    /** 
    * Constructs the batch collector 
    * 
    * @param batchSize the batch size after which the batchProcessor should be called 
    * @param batchProcessor the batch processor which accepts batches of records to process 
    */ 
    BatchCollector(int batchSize, Consumer<List<T>> batchProcessor) { 
     batchProcessor = requireNonNull(batchProcessor); 

     this.batchSize = batchSize; 
     this.batchProcessor = batchProcessor; 
    } 

    public Supplier<List<T>> supplier() { 
     return ArrayList::new; 
    } 

    public BiConsumer<List<T>, T> accumulator() { 
     return (ts, t) -> { 
      ts.add(t); 
      if (ts.size() >= batchSize) { 
       batchProcessor.accept(ts); 
       ts.clear(); 
      } 
     }; 
    } 

    public BinaryOperator<List<T>> combiner() { 
     return (ts, ots) -> { 
      // process each parallel list without checking for batch size 
      // avoids adding all elements of one to another 
      // can be modified if a strict batching mode is required 
      batchProcessor.accept(ts); 
      batchProcessor.accept(ots); 
      return Collections.emptyList(); 
     }; 
    } 

    public Function<List<T>, List<T>> finisher() { 
     return ts -> { 
      batchProcessor.accept(ts); 
      return Collections.emptyList(); 
     }; 
    } 

    public Set<Characteristics> characteristics() { 
     return Collections.emptySet(); 
    } 
} 

Необязательно затем создать класс помощник утилиты:

import java.util.List; 
import java.util.function.Consumer; 
import java.util.stream.Collector; 

public class StreamUtils { 

    /** 
    * Creates a new batch collector 
    * @param batchSize the batch size after which the batchProcessor should be called 
    * @param batchProcessor the batch processor which accepts batches of records to process 
    * @param <T> the type of elements being processed 
    * @return a batch collector instance 
    */ 
    public static <T> Collector<T, List<T>, List<T>> batchCollector(int batchSize, Consumer<List<T>> batchProcessor) { 
     return new BatchCollector<T>(batchSize, batchProcessor); 
    } 
} 

Пример использования:

List<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); 
List<Integer> output = new ArrayList<>(); 

int batchSize = 3; 
Consumer<List<Integer>> batchProcessor = xs -> output.addAll(xs); 

input.stream() 
    .collect(StreamUtils.batchCollector(batchSize, batchProcessor)); 

Я отправил свой код на GitHub, а также, если кто-то хочет посмотреть:

Link to Github

4

Я написал специальный разделитель для сценариев, подобных этому. Он будет заполнять списки заданного размера из входного потока. Преимущество такого подхода заключается в том, что он будет выполнять ленивую обработку и будет работать с другими функциями потока.

public static <T> Stream<List<T>> batches(Stream<T> stream, int batchSize) { 
    return batchSize <= 0 
     ? Stream.of(stream.collect(Collectors.toList())) 
     : StreamSupport.stream(new BatchSpliterator<>(stream.spliterator(), batchSize), stream.isParallel()); 
} 

private static class BatchSpliterator<E> implements Spliterator<List<E>> { 

    private final Spliterator<E> base; 
    private final int batchSize; 

    public BatchSpliterator(Spliterator<E> base, int batchSize) { 
     this.base = base; 
     this.batchSize = batchSize; 
    } 

    @Override 
    public boolean tryAdvance(Consumer<? super List<E>> action) { 
     final List<E> batch = new ArrayList<>(batchSize); 
     for (int i=0; i < batchSize && base.tryAdvance(batch::add); i++) 
      ; 
     if (batch.isEmpty()) 
      return false; 
     action.accept(batch); 
     return true; 
    } 

    @Override 
    public Spliterator<List<E>> trySplit() { 
     if (base.estimateSize() <= batchSize) 
      return null; 
     final Spliterator<E> splitBase = this.base.trySplit(); 
     return splitBase == null ? null 
       : new BatchSpliterator<>(splitBase, batchSize); 
    } 

    @Override 
    public long estimateSize() { 
     final double baseSize = base.estimateSize(); 
     return baseSize == 0 ? 0 
       : (long) Math.ceil(baseSize/(double) batchSize); 
    } 

    @Override 
    public int characteristics() { 
     return base.characteristics(); 
    } 

} 
4

У нас была аналогичная проблема для решения. Мы хотели взять поток, который был больше, чем системная память (итерация по всем объектам в базе данных), и рандомизировать порядок как можно лучше - мы думали, что было бы хорошо перезагрузить 10 000 элементов и рандомизировать их.

Целью была функция, которая использовалась в потоке.

Из предложенных решений здесь, кажется, есть целый ряд вариантов:

  • Используйте различные не-Java 8 дополнительных библиотек
  • начать с чего-то, что это не поток - например, произвольного доступа список
  • Есть поток, который может быть легко разделить на spliterator

Наш инстинкт первоначально должен был использовать пользовательский коллектор, но это означало, выпадающие из потокового видео. Вышеупомянутое решение коллектора очень хорошее, и мы его почти использовали.

Вот решение, которое чита, используя тот факт, что Stream s может дать вам Iterator, которые вы можете использовать в качестве аварийного люка, чтобы позволить вам сделать что-то дополнительное, что потоки не поддерживают. Iterator преобразуется обратно в поток, используя другой бит Java 8 StreamSupport колдовство.

/** 
* An iterator which returns batches of items taken from another iterator 
*/ 
public class BatchingIterator<T> implements Iterator<List<T>> { 
    /** 
    * Given a stream, convert it to a stream of batches no greater than the 
    * batchSize. 
    * @param originalStream to convert 
    * @param batchSize maximum size of a batch 
    * @param <T> type of items in the stream 
    * @return a stream of batches taken sequentially from the original stream 
    */ 
    public static <T> Stream<List<T>> batchedStreamOf(Stream<T> originalStream, int batchSize) { 
     return asStream(new BatchingIterator<>(originalStream.iterator(), batchSize)); 
    } 

    private static <T> Stream<T> asStream(Iterator<T> iterator) { 
     return StreamSupport.stream(
      Spliterators.spliteratorUnknownSize(iterator,ORDERED), 
      false); 
    } 

    private int batchSize; 
    private List<T> currentBatch; 
    private Iterator<T> sourceIterator; 

    public BatchingIterator(Iterator<T> sourceIterator, int batchSize) { 
     this.batchSize = batchSize; 
     this.sourceIterator = sourceIterator; 
    } 

    @Override 
    public boolean hasNext() { 
     prepareNextBatch(); 
     return currentBatch!=null && !currentBatch.isEmpty(); 
    } 

    @Override 
    public List<T> next() { 
     return currentBatch; 
    } 

    private void prepareNextBatch() { 
     currentBatch = new ArrayList<>(batchSize); 
     while (sourceIterator.hasNext() && currentBatch.size() < batchSize) { 
      currentBatch.add(sourceIterator.next()); 
     } 
    } 
} 

Простой пример использования это будет выглядеть следующим образом:

@Test 
public void getsBatches() { 
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3) 
     .forEach(System.out::println); 
} 

Вышеприведенные печатает

[A, B, C] 
[D, E, F] 

Для нашего случая использования, мы хотели перетасовать партии, а затем сохранить их как поток - это выглядело так:

@Test 
public void howScramblingCouldBeDone() { 
    BatchingIterator.batchedStreamOf(Stream.of("A","B","C","D","E","F"), 3) 
     // the lambda in the map expression sucks a bit because Collections.shuffle acts on the list, rather than returning a shuffled one 
     .map(list -> { 
      Collections.shuffle(list); return list; }) 
     .flatMap(List::stream) 
     .forEach(System.out::println); 
} 

Это выводит что-то подобное (это рандомизированное, поэтому каждый раз разные)

A 
C 
B 
E 
D 
F 

Секрет соус здесь в том, что всегда есть поток, так что вы можете работать на потоке партий, или сделать что-то для каждой партии а затем flatMap обратно в поток. Еще лучше, все вышеперечисленное работает только как окончательный forEach или collect или другие завершающие выражения PULL данные через поток.

Оказалось, что iterator является специальным типом операции завершения операции по потоку и не вызывает запуск всего потока и попадания в память! Благодаря Java-парням за блестящий дизайн!

+0

И очень хорошо, что вы полностью перебираете каждую партию, когда ее собираете и сохраняете в «Список» - вы не можете отложить итерацию элементов внутри пакета, потому что потребитель может пропустить всю партию, и если вы не сделали этого 't потребляют элементы, тогда они не будут пропускать очень далеко. (Я реализовал один из них на C#, хотя это было значительно проще). – ErikE

0

Простой пример использования Spliterator

// read file into stream, try-with-resources 
    try (Stream<String> stream = Files.lines(Paths.get(fileName))) { 
     //skip header 
     Spliterator<String> split = stream.skip(1).spliterator(); 
     Chunker<String> chunker = new Chunker<String>(); 
     while(true) {    
      boolean more = split.tryAdvance(chunker::doSomething); 
      if (!more) { 
       break; 
      } 
     }   
    } catch (IOException e) { 
     e.printStackTrace(); 
    } 
} 

static class Chunker<T> { 
    int ct = 0; 
    public void doSomething(T line) { 
     System.out.println(ct++ + " " + line.toString()); 
     if (ct % 100 == 0) { 
      System.out.println("====================chunk=====================");    
     }   
    }  
} 

ответ Брюса является более всеобъемлющим, но я искал что-то быстро и грязного обрабатывать кучу файлов.

Смежные вопросы