2016-11-30 3 views
1

Предположим, у меня есть ленивый итератор [Item]. Объект items создается лениво, только когда мы итерируем итератор. Элементы дорогие для создания.Scala: как «предварительно загрузить» содержимое ленивого итератора?

Я хотел бы сериализовать в качестве массива JSON этот итератор. Он работает (с модулем Jackson scala), но мне кажется, что он недостаточно эффективен.

Насколько я понимаю, в настоящее время он работает так:

  • Compute Следующий товар
  • Serialize товар
  • Compute Следующий товар
  • Serialize товар
  • Compute Следующий товар
  • Сериализовать товар

Я бы хотел, чтобы вычисления элементов и сериализация элементов происходили параллельно.

Я бы хотел, чтобы Iterator начал вычислять определенное количество следующих предметов при чтении следующего элемента.

Например, я хотел бы, чтобы при выполнении iterator.next() за сценой вычислялись следующие 50 элементов без блокировки итерационного потока (он должен ждать только следующего элемента).

Я видел «BufferedIterator», но это не совсем то, что мне нужно, так как я не очень хочу, чтобы запросить «головы» в явном виде, и мне нужно больше чем 1 деталь для предварительной загрузки

Любой идея о том, как это можно достичь?

Я нормально для решения заменяющего итератора с потоком тоже, но отдают предпочтение итератора из-за снижения использования памяти

+0

Вы пробовали [GroupedIterator] (http://scala-lang.org/api/2.11.8/#scala.collection.Iterator$GroupedIterator)? – laughedelic

+0

@laughedelic Я уже использую 'inputIterator.grouped (chunkSize) .map (computeItemsChunk) .flatten', так что мои объекты вычисляются в кусках, но я не уверен в том, что вы предлагаете –

+0

. Вы говорите:« при выполнении итератора .next(), за сценой, вычисляются следующие 50 элементов ». Это то, что происходит с 'GroupedIterator': каждый раз, когда вы вызываете' .next', вычисляется новый фрагмент. Разве вы этого не хотите? – laughedelic

ответ

1

Если я понял вашу проблему правильно, вот пример того, что вы могли бы сделать. Вы можете обернуть вычисление каждого элемента в Future, чтобы вы могли выполнять итерацию по вашему входному потоку без блокировки и обработки/сериализации каждого фрагмента, когда он будет готов. Я собираюсь сделать это в РЕПЛ и печати, когда каждая часть оценивается, так что вы можете видеть, когда каждая вещь происходит:

@ import concurrent._, ExecutionContext.Implicits.global 
import concurrent._, ExecutionContext.Implicits.global 

@ def futureItem(i: Int): Future[Int] = Future { 
    Thread.sleep(1000) 
    println(s"item: ${i}") 
    i 
} 
defined function futureItem 

@ val inputIterator = (1 to 9).toIterator.map(futureItem) 
inputIterator: Iterator[Future[Int]] = non-empty iterator 

Так вычисляя каждый элемент занимает по меньшей мере 1 секунду. А теперь мы хотим обрабатывать детали в куски, которые также занимает некоторое время:

@ def computeItemsChunk(items: Seq[Int]): Int = { 
    Thread.sleep(1000) 
    val s = items.sum 
    println(s"chunk ${items}: ${s}") 
    s 
} 
defined function computeItemsChunk 

Теперь группа входной поток, применяется Future.sequence и вычислительными ломти:

@ case object foo { 
    val chunksIterator = inputIterator.grouped(3).map { futureItems => 
    Future.sequence(futureItems).map(computeItemsChunk) 
    } 
} 
defined object foo 

(я определил его в объекте, потому что иначе группировка (или что-то еще) заставит оценить первый кусок). Теперь давайте посмотрим, как она оценивается:

@ Await.result(Future.sequence(foo.chunksIterator), Duration.Inf) 
item: 2 
item: 3 
item: 4 
item: 1 
item: 7 
item: 6 
chunk List(1, 2, 3): 6 
item: 5 
item: 8 
chunk List(4, 5, 6): 15 
item: 9 
chunk List(7, 8, 9): 24 
res5: Iterator[Int] = non-empty iterator 

Вы можете видеть, что ломти вычисляются после того, как элементы доступны и итератор авансов, не дожидаясь каждой оценки куска.

+0

Ваш код почти то, что я уже использую (за исключением того, что в конце я сглаживаю 'Iterator [Iterator [Item]]'. Но у меня была проблема в прошлом с такой реализацией, потому что для обработки элемента требуется один поток для каждого элемента и делает слишком много параллельной работы и переполняет систему. Поэтому я вычисляю каждый кусок элементов параллельно, по одному фрагменту за раз, чтобы было не более параллельных вычислений 'chunkSize'. Выполнение слишком большой работы сразу задерживает вычисление первый элемент и, следовательно, время отклика итератора и, следовательно, мой API. –

+0

Обратите внимание, что мне не нужно складывать элементы в результат, как сумма ('computeItemsChunk' должен принимать' Seq [Id] 'и возвращать например, 'Seq [Item]'). Если все вычисления запущены, то мне кажется, что мне больше не нужны ленивые свойства Iterator или Stream, а скорее «Future [List [Item] ]] 'в результате может быть достаточно –

+0

Итак, общая идея заключается в том, что Я не хочу начинать все вычисления сразу, и что вычисления должны запускаться только при прогрессировании потребителя итератора (т. Е. Не загружать все сразу, а загружать только n следующих элементов итератора, а не всех элементов) –

Смежные вопросы