0

Я использую Play Framework с Scala, Akka и ReactiveMongo. Я хочу использовать коллекцию в MongoDB как круговую очередь. Несколько участников могут вставлять в нее документы; один актер извлекает эти документы, как только они доступны (своего рода система публикации-подписки). Я использую capped коллекции и tailable cursor. Каждый раз, когда я извлекаю некоторые документы, мне нужно запустить команду EmptyCapped, чтобы очистить ограниченную коллекцию (удалить из нее элементы REMOVE), иначе я получаю всегда один и тот же документ. есть ли альтернативное решение? например, есть способ сдвинуть курсор, не удаляя элементы? или лучше не использовать ограниченную коллекцию в моем случае?Play + ReactiveMongo: capped collection и tailable cursor

object MexDB { 

def db: reactivemongo.api.DB = ReactiveMongoPlugin.db 
val size: Int = 10000 

// creating capped collection 
val collection: JSONCollection = { 

    val c = db.collection[JSONCollection]("messages") 

    val isCapped = coll.convertToCapped(size, None) 

    Await.ready(isCapped, Duration.Inf) 

    c 
} 

def insert(mex: Mex) = { 

    val inserted = collection.insert(mex) 

    inserted onComplete { 
     case Failure(e) => 
     Logger.info("Error while inserting task: " + e.getMessage()) 
     throw e 

     case Success(i) => 
     Logger.info("Successfully inserted task") 
    } 

} 


def find(): Enumerator[Mex] = { 

    val cursor: Cursor[Mex] = collection 
    .find(Json.obj()) 
    .options(QueryOpts().tailable.awaitData) 
    .cursor[Mex] 

    // meaning of maxDocs ??? 
    val maxDocs = 1 
    cursor.enumerate(maxDocs) 
} 


def removeAll() = { 
    db.command(new EmptyCapped("messages")) 
} 

}

/*** part of receiver actor code ***/ 

// inside preStart 
val it = Iteratee.fold[Mex, List[Mex]](Nil) { 
    (partialList, mex) => partialList ::: List(mex) 
} 

// Inside "receive" method 
case Data => 

    val e: Enumerator[Mex] = MexDB.find() 

    val future = e.run(it) 

    future onComplete { 
    case Success(list) => 
     list foreach { mex => 
     Logger.info("Mex: " + mex.id) 
     } 
     MexDB.removeAll() 
     self ! Data 

    case Failure(e) => Logger.info("Error: "+ e.getMessage()) 
    } 

ответ

0

Ваш tailable курсор закрывается после каждого найденного док как maxDocs = 1. Чтобы держать его открытым на неопределенный срок, вы должны опустить этот предел.

С awaitData, .onComplete будет вызываться только в случае явного выключения RM.

Вам нужно использовать некоторую функцию потоковой передачи из курсора, например .enumerate, и обрабатывать каждый новый шаг/результат. См. https://github.com/sgodbillon/reactivemongo-tailablecursor-demo/

+0

Спасибо. Так что, если я опускаю этот предел, я могу создать перечислитель только один раз и повторно использовать его правильно? Есть ли альтернативный способ получения документов из закрытой коллекции, такой как круговая очередь, без запуска EmptyCapped каждый раз? –

+0

Макс. 1 читать только один документ. Удалите предел, и курсор останется открытым в текущей «позиции», а не обратно к первому документу. – cchantep

+0

Я удалил предел, созданный сразу после указателя и перечислителя. Я периодически выполняю: val future = enum.run (it) future onComplete {***} Я периодически вставляю mex в коллекцию, но enum никогда не извлекает некоторые mex ... что я делаю неправильно? –

Смежные вопросы