2017-02-11 4 views
0

Я новичок в игровой инфраструктуре и как ее использовать с Scala. Я хочу создать прокси для больших объектов Json. Я до сих пор добился того, что json хранится в кеше, а если его нет, запрашивается у веб-службы.Сделайте один запрос webservice только от игрового фрейма

Однако, когда поступают два запроса, таргетинг на ту же конечную точку (webservice и path identicall) должен выполняться только один вызов, а другой запрос должен ждать результата первого вызова. В настоящий момент он выполняет вызов услуги с каждым запросом.

Это мой контроллер:

@Singleton 
class CmsProxyController @Inject()(val cmsService: CmsProxyService) extends Controller { 
    implicit def ec : ExecutionContext = play.api.libs.concurrent.Execution.defaultContext 

    def header(path: String) = Action.async { context => 
    cmsService.head(path) map { title => 
     Ok(Json.obj("title" -> title)) 
    } 
    } 

    def teaser(path: String) = Action.async { context => 
    cmsService.teaser(path) map { res => 
     Ok(res).as(ContentTypes.JSON) 
    } 
    } 
} 

Это услуга:

trait CmsProxyService { 
    def head(path: String): Future[String] 

    def teaser(path: String): Future[String] 
} 

@Singleton 
class DefaultCmsProxyService @Inject()(cache: CacheApi, cmsCaller: CmsCaller) extends CmsProxyService { 

    private val BASE = "http://foo.com" 
    private val CMS = "bar/rest/" 

    private val log = Logger("application") 

    override def head(path: String) = { 
    query(url(path), "$.payload[0].title") 
    } 

    override def teaser(path: String) = { 
    query(url(path), "$.payload[0].content.teaserText") 
    } 

    private def url(path: String) = s"${BASE}/${CMS}/${path}" 

    private def query(url: String, jsonPath: String): Future[String] = { 
    val key = s"${url}?${jsonPath}" 
    val payload = findInCache(key) 

    if (payload.isDefined) { 
     log.debug("found payload in cache") 
     Future.successful(payload.get) 
    } else { 
     val queried = parse(fetch(url)) map { json => 
     JSONPath.query(jsonPath, json).as[String] 
     } 
     queried.onComplete(value => saveInCache(key, value.get)) 
     queried 
    } 
    } 

    private def parse(fetched: Future[String]): Future[JsValue] = { 
    fetched map { jsonString => 
     Json.parse(jsonString) 
    } 
    } 

    //retrieve the requested value from the cache or from ws 
    private def fetch(url: String): Future[String] = { 
    val body = findInCache(url) 

    if (body.isDefined) { 
     log.debug("found body in cache") 
     Future.successful(body.get) 
    } else { 
     cmsCaller.call(url) 
    } 
    } 

    private def findInCache(key: String): Option[String] = cache.get(key) 

    private def saveInCache(key: String, value: String, duration: FiniteDuration = 5.minutes) = cache.set(key, value, 5.minutes) 

} 

И, наконец, вызов WebService:

trait CmsCaller { 
    def call(url: String): Future[String] 
} 

@Singleton 
class DefaultCmsCaller @Inject()(wsClient: WSClient) extends CmsCaller { 
    import scala.concurrent.ExecutionContext.Implicits.global 
    //keep those futures which are currently requested 
    private val calls: Map[String, Future[String]] = TrieMap() 

    private val log = Logger("application") 

    override def call(url: String): Future[String] = { 
    if(calls.contains(url)) { 
     Future.successful("ok") 
    }else { 
     val f = doCall(url) 
     calls put(url, f) 
     f 
    } 
    } 

    //do the final call 
    private def doCall(url: String): Future[String] = { 
    val request = ws(url) 
    val response = request.get() 
    val mapped = mapResponse(response) 
    mapped.onComplete(_ => cmsCalls.remove(url)) 
    mapped 
    } 

    private def ws(url: String): WSRequest = wsClient.url(url) 

    //currently executed with every request 
    private def mapResponse(f: Future[WSResponse]): Future[String] = { 
    f.onComplete(_ => log.debug("call completed")) 
    f map {res => 
     val status = res.status 
     log.debug(s"ws called, response status: ${status}") 
     if (status == 200) { 
     res.body 
     } else { 
     "" 
     } 
    } 
    } 
} 

Мой вопрос: как только может один вызов веб-сервиса beeing выполнен? Даже если есть несколько запросов к одной и той же цели. Я не хочу его блокировать, другой запрос (не уверен, что я использую правильное слово здесь) должен только быть проинформирован о том, что на этом пути уже существует вызов webservice.

Запрос главы и тизера, см. Контроллер, должен выполнять только один вызов веб-службы.

ответ

0

Простого ответ с помощью Scala Lazy ключевого слова

def requestPayload(): String = ??? //do something 

@Singleton 
class SimpleCache @Inject()() { 
    lazy val result: Future[String] = requestPayload() 
} 

//Usage 

@Singleton 
class SomeController @Inject() (simpleCache: SimpleCache) { 
    def action = Action { req => 
    simpleCache.result.map { result => 
    Ok("success") 
    } 
    } 
} 

Первого запроса будет инициировать вызов покоя и все другие запросы будут использовать кэшированные результаты. Используйте map и flatMap, чтобы связать запросы.

сложный ответ, используя Актеры

Использование Actor очереди запросов и кэшировать результат первого успешного результата запроса JSon. Все остальные запросы будут считывать результат первого запроса.

case class Request(value: String) 

class RequestManager extends Actor { 

    var mayBeResult: Option[String] = None 
    var reqs = List.empty[(ActorRef, Request)] 

    def receive = { 
    case req: Request => 
    context become firstReq 
    self ! req 
    } 

    def firstReq = { 
    case req: Request => 
    process(req).onSuccess { value => 
     mayBeResult = Some(value) 
     context become done 
     self ! "clear_pending_reqs" 
    } 
    context become processing 
    } 

    def processing = { 
    case req: Request => 
    //queue requests 
    reqs = reqs ++ List(sender -> req) 
    } 

    def done = { 
    case "clear_pending_reqs" => 
    reqs.foreach { case (sender, _) => 
     //send value to the sender 
     sender ! value. 
    } 
    } 
} 

обрабатывает случай, когда первый запрос не выполнен. В приведенном выше блоке кода, если первый запрос не выполняется, актер никогда не перейдет в состояние завершения.

+0

Большое спасибо за ваш ответ. Однако я все еще испытываю трудности с пониманием. Откуда происходит процесс (req) .onSuccess? Это фактический вызов моей службы, который мне нужно реализовать? – Spindizzy

+0

@Spindizzy Предоставлено простое решение с использованием ленивого val. Посмотрите – pamu

+0

Я следил за актером и заводил его. Однако он все еще звонит несколько раз. Существует еще одна услуга спереди, в которой используется контроллер. Похоже, что количество вызовов (wsClient) связано с количеством запросов. Поэтому я думаю, что решения нет. Я просто должен это принять. – Spindizzy

0

Я решил проблему с синхронизацией кеша в службе. Я не уверен, что это элегантное решение, но оно работает для меня.

trait SyncCmsProxyService { 
    def head(path: String): String 
    def teaser(path: String): String 
} 


@Singleton 
class DefaultSyncCmsProxyService @Inject()(implicit cache: CacheApi, wsClient: WSClient) extends SyncCmsProxyService with UrlBuilder with CacheAccessor{ 

    private val log = Logger("application") 

    override def head(path: String) = { 
    log.debug("looking for head ...") 
    query(url(path), "$.payload[0].title") 
    } 

    override def teaser(path: String) = { 
    log.debug("looking for teaser ...") 
    query(url(path), "$.payload[0].content.teaserText") 
    } 

    private def query(url: String, jsonPath: String) = { 
    val key = s"${url}?${jsonPath}" 
    val payload = findInCache(key) 

    if (payload.isDefined) { 
     payload.get 
    }else{ 
     val json = Json.parse(body(url)) 
     val queried = JSONPath.query(jsonPath, json).as[String] 
     saveInCache(key, queried) 
    } 
    } 

    private def body(url: String) = { 
    cache.synchronized { 
     val body = findInCache(url) 
     if (body.isDefined) { 
     log.debug("found body in cache") 
     body.get 
     } else { 
     saveInCache(url, doCall(url)) 
     } 
    } 
} 

private def doCall(url : String): String = { 
    import scala.concurrent.ExecutionContext.Implicits.global 
    log.debug("calling...") 
    val req = wsClient.url(url).get() 
    val f = req map { res => 
    val status = res.status 
    log.debug(s"endpoint called! response status: ${status}") 
    if (status == 200) { 
     res.body 
    } else { 
     "" 
    } 
    } 
    Await.result(f, 15.seconds) 
    } 
} 

Обратите внимание, что я опустил черты UrlBuilder и CacheAccessor здесь, потому что они тривиальны.

Смежные вопросы