2013-04-26 4 views
5

Я пытаюсь использовать пакет scalaz iteratee для обработки большого zip-файла в постоянном пространстве. У меня есть длительный процесс, который мне нужно выполнить для каждого файла в zip-файле. Эти процессы могут (и должны) выполняться параллельно.Scalaz 7 Iteratee для обработки большого zip-файла (OutOfMemoryError)

Я создал EnumeratorT, который надувает каждый ZipEntry в объект File. Подпись выглядит следующим образом:

def enumZipFile(f:File):EnumeratorT[IoExceptionOr[IO[File]], IO] 

Я хочу приложить IterateeT, которая будет выполнять длительный процесс по каждому файлу. Я в основном в конечном итоге что-то вроде:

type IOE[A] = IoExceptionOr[A] 

def action(f:File):IO[List[Promise[IOE[File]]]] = (
    consume[Promise[IOE[File]], IO, List] %= 
    map[IOE[File], Promise[IOE[File]], IO](longRunningProcess) %= 
    map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &= 
    enumZipFile(f) 
).run 

def longRunningProcess:(iof:IOE[File]):Promise[IOE[File]] = 
    Promise { Thread.sleep(5000); iof } 

Когда я пытаюсь запустить его:

action(new File("/really/big/file.zip")).unsafePerformIO.sequence.get 

Я получаю java.lang.OutOfMemoryError: Java heap space сообщений. Это имеет смысл для меня, поскольку он пытается создать массивный список в память обо всех этих объектах IO и Promise.

Несколько вопросов:

  • Кто-нибудь есть какие-либо идеи о том, как избежать этого? Похоже, что я неправильно подхожу к проблеме, потому что я действительно забочусь только об longRunningProcess за ее побочные эффекты.
  • Подходит ли подход Enumerator к неправильному подходу?

У меня довольно много идей, поэтому все поможет.

Спасибо!

Update # 1

Вот трассировки стека:

[error] java.lang.OutOfMemoryError: Java heap space 
[error]   at scalaz.Free.flatMap(Free.scala:46) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:61) 
[error]   at scalaz.effect.IOFunctions$$anon$5.apply(IO.scala:222) 
[error]   at scalaz.effect.IO$$anonfun$flatMap$1.apply(IO.scala:62) 

Я в настоящее время совет nadavwr, чтобы убедиться, что все действует, как я думаю, что это. Я буду сообщать о любых обновлениях.

Update # 2

Используя идеи из обоих ответов ниже, я нашел достойное решение. Как предположил huynhjl (и я проверил использование предложения nadavwr по анализу дампа кучи), consume заставляло каждый завышенный ZipEntry удерживаться в памяти, поэтому процесс заканчивался из памяти. Я изменил consume на foldM и обновил длительный процесс, чтобы просто вернуть Promise[IOE[Unit]] вместо ссылки на файл. Таким образом, у меня есть коллекция всех IoExceptions в конце. Вот рабочее решение:

def action(f:File):IO[List[Promise[IOE[Unit]]]] = (
    foldM[Promise[IOE[Unit]], IO, List[Promise[IOE[Unit]]]](List.empty)((acc,x) => IO(x :: acc)) %= 
    map[IOE[File], Promise[IOE[Unit]], IO](longRunningProcess) %= 
    map[IOE[IO[File]], IOE[File], IO](_.unsafePerformIO) &= 
    enumZipFile(f) 
).run 

def longRunningProcess:(iof:IOE[File]):Promise[IOE[Unit]] = 
    Promise { Thread.sleep(5000); iof.map(println) } 

Это решение раздувает каждую запись при асинхронной загрузке. В конце концов, у меня есть огромный список выполненных объектов Promise, содержащих какие-либо ошибки. Я до сих пор не полностью убежден, что это правильное использование Iteratee, но теперь у меня есть несколько многоразовых, составных частей, которые я могу использовать в других частях нашей системы (это очень распространенная картина для нас).

Спасибо за вашу помощь!

+0

Что делает длительный процесс? Вычисляет ли это что-то из zip-контента? – huynhjl

+0

Каждый файл в zip-файле - это изображение. Длительный процесс загружает этот файл в Rackspace CloudFiles. Как только я это выясню, мне нужно будет добавить дополнительные процессы, которые изменят размер изображений, а затем загрузите их. –

+0

Iteratees чувствует себя как неправильная абстракция для этой работы, поскольку вы хотите распараллелить рабочую нагрузку. Думаю, актеры будут работать лучше. – huynhjl

ответ

4

Не использовать consume. См. Другой мой недавний ответ: How to use IO with Scalaz7 Iteratees without overflowing the stack?

foldM может быть лучшим выбором.

Также попробуйте сопоставить файл с чем-то другим (например, кодом возврата успеха), чтобы узнать, позволяет ли JVM делать мусор, собирать завышенные почтовые записи.

+0

Спасибо за ваш ответ. В конце концов, использование 'foldM' казалось ключевым. –

0

Я начал ответ после того, как быстро прочитать, и каким-то образом была «переполнение стека» застрял в моей голове, а не «из памяти ошибки» ... Должно быть URL :-)

Тем не менее, функциональные вычисления, которые полагаются на рекурсии, восприимчивы к переполнениям стека, поэтому я оставил ответ на месте для любого спотыкающегося тела и обещаю попытаться найти более релевантный ответ.

Если вы получили переполнение стека, вам понадобится «трамплин», конструкция, которая ускоряет вычисление из стека между рекурсиями.

См. Раздел «Бесступенчатая скала с бесплатными монадами» в Learning Scalaz Day 18, часть превосходной серии сообщений @ eed3si9n.

См. Также this gist от @mpilquist, демонстрируя батут-итерацию.

+1

Haha, stackoverflow.com - неудачное имя, когда вы говорите о длительных функциональных процессах. –

1

Как дорого (с точки зрения памяти ваш longRunningProcess? Как насчет файла дефляции? Они выполняется количество раз, вы ожидаете, что они будут? (Простой счетчик будет полезно)

Стек трассировки будет помогите определить солому, которая сломала спину верблюда - иногда это преступник.

Если вы хотите быть уверенным в том, что занимает столько памяти, вы можете использовать аргумент JVM -XX:+HeapDumpOnOutOfMemoryError, а затем проанализировать его с помощью VisualVM, Eclipse MAT или других анализаторов кучи.

Follo wup

Мне кажется странным, что вы перечисляете обещания. Непоследовательно начинать вычисление, не зависящее как от счетчика, так и от итерации. Решение на основе итераций может быть лучше обслуживаться счетчиком, который возвращает «инертные» элементы вместо обещаний. К сожалению, это сделает вашу обработку отдельных файлов последовательной, но это повторится для ya - неблокирующей обработки потока.

Решение, основанное на актерах, будет лучше соответствовать ИМХО, но оба участника и итерации (особенно последние) кажутся излишними для того, что вы пытаетесь выполнить (по крайней мере, части, которыми вы делитесь).

Пожалуйста, обратите внимание на простые фьючерсы/обещания от пакета scala.concurrent от Scala 2.10 и обязательно взгляните на параллельные коллекции Scala. Я бы не вводил дополнительные понятия в код до того, как они оказались недостаточными. Попробуйте определить ExecutionContext фиксированного размера для ограничения вашего параллелизма.

+0

Большой совет. Я прохожу шаг за шагом, чтобы убедиться, что все выполняется, как я предполагаю. Я обновил свой вопрос выше с помощью трассировки стека. Теперь я собираюсь попробовать кучу кучи. Благодаря! –

+0

Что касается вашего последующего действия: я согласен с вашими соображениями относительно использования Iteratee для этого процесса. Из того, что я разместил, это определенно кажется излишним. Тем не менее, шаблон загрузки файла (или файлов), потоковой передачи содержимого, обработки каждой записи, а затем выполнения чего-то с результатом используется повсеместно в нашем приложении. Я чувствую, что Iteratee дал мне несколько приятных многоразовых фрагментов кода, которые я могу использовать для создания этих больших процессов. Большое спасибо за ваше время и помощь! –

Смежные вопросы