2015-01-19 2 views
3

Настройка Scala 2.11.4, Playframework 2.3.7, Reacivemongo (0.10.5.0.akka23/0.11.0-SNAPSHOT пробовал с обоими).Play mongo Enumerator неожиданно останавливается

У нас есть коллекция с 18'000 сущностями, обрабатывающая эту коллекцию асинхронным образом с использованием подхода Enumerator/Iteratee.

Дело 1. Обработка проста (извлечение сущностей в формат CSV и отправка их в виде блоков как ответ REST) ​​все работает нормально, все записи извлекаются и обрабатываются.

Дело 2. Обработка включает вычисления, которые занимают до 10 секунд, а также обновление записей после расчета, расчет производится с помощью foreach Iteratee, который обновляет количество обработанных объектов во внутреннем трекере задач. Обработка может занять некоторое время, но это нормально

 Patient.findByClient(clientName) &> 
      Enumeratee.mapM(patient => { 
       val evaluatedAndSaveTask = patient. 
        evaluate(parser). 
        flatMap(patientOpt => 
         patientOpt. 
          map(evaluatedPatient => evaluatedPatient.saveAndGet().map(Some(_))). 
          getOrElse(Future.successful(None)) 
        ) 
       evaluatedAndSaveTask.recover({ 
        case t => 
         t.printStackTrace() 
         None 
       }) 
      }) 
     // Step 2.1. Running evaluation process through Iteratee 
     val evaluationTask = evaluation run Iteratee.foreach(patientOpt => { 
      collection.update(Json.obj("clientName" -> clientName), Json.obj("$inc" -> Json.obj("processedPatients" -> 1)))) 
     ) 
     // Step 2.3. Log errors 
     evaluationTask.onSuccess({ case _ => Patient.LOG.info("PatientEvaluation DONE") }) 
     evaluationTask.onFailure({ case t => { 
      t.printStackTrace(); 
      Patient.LOG.info("PatientEvaluation FAILED"); 
     }}) 

В этом случае только 575 лиц получают обрабатываются, и Iteratee заканчивается распечатывание «оценки пациента DONE».

Я удалил спасение из уравнения, и это не помогло.

Почему это может быть?

ответ

3

я, наконец, нашел виновника проблемы - Монго автоматически истекает оценку после некоторого промежутка времени, вы можете указать noCursorTimeout флаг, чтобы предотвратить это:

 collection. 
      find(findQ). 
      sort(if(sortQF.values.isEmpty) sortQ else sortQF). 
      options(QueryOpts(skipN = offset + page._1 * page._2).noCursorTimeout). 
      cursor[T]. 

По какой-то причине, ReactiveMongo не выбрасывает исключение в этом случае , и просто закрывает Итератор. После этого я создал проблему в ReactiveMongo https://github.com/ReactiveMongo/ReactiveMongo/issues/250.

В настоящее время для меня может быть более безопасным истек срок действия курсора и перезапустить со смещением.

+0

Это правильно. Mongo только удерживает курсоры в течение 10 минут, после чего они истекают, поэтому ваш случай 1 изматывается после 575 записей. Обратите внимание, что 10 минут = 600 секунд, поэтому вы грубо обрабатывали эти вычисления всего за 1 секунду, а не 10 секунд (в среднем). –

+0

Возможно, последняя партия заняла 10 минут, в то время как другие были быстрее, поэтому до 30 минут – mavarazy

+0

Действительно ли Reactivemongo автоматически закрывает курсор? С помощью этой опции mongo никогда не будет автоматически закрывать курсор. –

Смежные вопросы