2013-03-09 3 views
12

Я хочу создать Play 2 Enumeratee, который принимает значения и выводит их, разбивая вместе, каждые x секунд/миллисекунд. Таким образом, в многопользовательской среде websocket с большим количеством пользовательского ввода можно было ограничить количество полученных кадров в секунду.Создание временного chunking Enumeratee

Я знаю, что можно сгруппировать заданное число элементов вместе, как это:

val chunker = Enumeratee.grouped(
    Traversable.take[Array[Double]](5000) &>> Iteratee.consume() 
) 

Есть встроенный способ сделать это в зависимости от времени, а не на основе количества элементов?

Я думал об этом как-то с запланированной работой Акки, но на первый взгляд это кажется неэффективным, и я не уверен, что возникнут проблемы со взаимностью.

+0

это плохой вопрос. – tailor

+1

@tailor Не могли бы вы объяснить, почему вы думаете, что это плохо или как его улучшить? – Carsten

ответ

3

Как насчет этого? Надеюсь, это поможет вам.

package controllers 

import play.api._ 
import play.api.Play.current 
import play.api.mvc._ 
import play.api.libs.iteratee._ 
import play.api.libs.concurrent.Akka 
import play.api.libs.concurrent.Promise 

object Application extends Controller { 

    def index = Action { 
    val queue = new scala.collection.mutable.Queue[String] 
    Akka.future { 
     while(true){ 
     Logger.info("hogehogehoge") 
     queue += System.currentTimeMillis.toString 
     Thread.sleep(100) 
     } 
    } 
    val timeStream = Enumerator.fromCallback {() => 
     Promise.timeout(Some(queue), 200) 
    } 
    Ok.stream(timeStream.through(Enumeratee.map[scala.collection.mutable.Queue[String]]({ queue => 
     var str = "" 
     while(queue.nonEmpty){ 
     str += queue.dequeue + ", " 
     } 
     str 
    }))) 
    } 

} 

И этот документ также полезен для вас. http://www.playframework.com/documentation/2.0/Enumerators

UPDATE Это для play2.1 версии.

package controllers 

import play.api._ 
import play.api.Play.current 
import play.api.mvc._ 
import play.api.libs.iteratee._ 
import play.api.libs.concurrent.Akka 
import play.api.libs.concurrent.Promise 
import scala.concurrent._ 
import ExecutionContext.Implicits.global 

object Application extends Controller { 

    def index = Action { 
    val queue = new scala.collection.mutable.Queue[String] 
    Akka.future { 
     while(true){ 
     Logger.info("hogehogehoge") 
     queue += System.currentTimeMillis.toString 
     Thread.sleep(100) 
     } 
    } 
    val timeStream = Enumerator.repeatM{ 
     Promise.timeout(queue, 200) 
    } 
    Ok.stream(timeStream.through(Enumeratee.map[scala.collection.mutable.Queue[String]]({ queue => 
     var str = "" 
     while(queue.nonEmpty){ 
     str += queue.dequeue + ", " 
     } 
     str 
    }))) 
    } 

} 
+1

Спасибо за ваш ответ! К сожалению, «Enumerator.fromCallback» стал устаревшим с версии 2.1 (и они отходят от «Promise» от Play'а и к «Будущему» Scala). Возможно, вы знаете, что это «современный» способ сделать это? – Carsten

+0

Спасибо, что преподаете. Я не знал, что 'Enumerator.fromCallback' имеет пчел, устаревший. Я прочитал [здесь] (https://github.com/playframework/Play20/blob/master/framework/src/iteratees/src/main/scala/play/api/libs/iteratee/Enumerator.scala) и обновил код , – buster84

2

Здесь я быстро определил iteratee, который будет принимать значение от входа для фиксированной длиной времени Т, измеренного в миллисекундах и enumeratee, который позволит вам сгруппировать и дальнейший процесс входного поток, разделенный на сегменты, построенных в пределах такой длины t. Он полагается на JodaTime, чтобы отслеживать, сколько времени прошло с момента начала итерации.

def throttledTakeIteratee[E](timeInMillis: Long): Iteratee[E, List[E]] = { 
    var startTime = new Instant() 

    def step(state: List[E])(input: Input[E]): Iteratee[E, List[E]] = { 
    val timePassed = new Interval(startTime, new Instant()).toDurationMillis 

    input match { 
     case Input.EOF => { startTime = new Instant; Done(state, Input.EOF) } 
     case Input.Empty => Cont[E, List[E]](i => step(state)(i)) 
     case Input.El(e) => 
     if (timePassed >= timeInMillis) { startTime = new Instant; Done(e::state, Input.Empty) } 
     else Cont[E, List[E]](i => step(e::state)(i)) 
    } 
    } 

    Cont(step(List[E]())) 
} 

def throttledTake[T](timeInMillis: Long) = Enumeratee.grouped(throttledTakeIteratee[T](timeInMillis)) 
+0

Спасибо. Ваш метод элегантен. Я принял другой ответ, потому что он использует меньше процессорного времени (по крайней мере, вдвое меньше), я надеюсь, что все в порядке. Кроме того, начиная с Scala 2.10, вы можете использовать собственный класс «Deadline» Scala для обновления таймера. – Carsten