2015-06-15 2 views
2

Я пытаюсь выяснить, как я могу настроить Master Actor, который вызывает соответствующих детей, в поддержку некоторых маршрутов распыления, где я пытаюсь подражать вызовам db. Я новичок в akka/spray, поэтому просто пытаюсь лучше понять, как правильно настроить спрей -> актеры -> вызовы db (и т. Д.). Я могу получить ответ от актера верхнего уровня, но когда я пытаюсь вернуть его с одного уровня актера ниже родителя, я не могу заставить ничего работать.Spray route получить ответ от дочернего актера

При взгляде на пути актеров кажется, что из-за того, что я делаю звонок от моего маршрута распыления, я перехожу от временного актера. Ниже приводится то, что я имею до сих пор для этого. Это должно быть просто ошибкой пользователя/незнанием, просто не уверен, как действовать. Мы ценим любые предложения.

Demo Spray Service и Redis Ниже перечислены фрагменты кода Actor, где я вызываю актера с моего маршрута и нескольких участников, у которых возникает проблема (хочу, чтобы мой маршрут получил ответ от SummaryActor). Благодаря!

загрузки:

object Boot extends App { 

    // we need an ActorSystem to host our application in 
    implicit val system = ActorSystem("on-spray-can") 

    // create and start our service actor 
    val service = system.actorOf(Props[DemoServiceActor], "demo-service") 

    implicit val timeout = Timeout(5.seconds) 
    // start a new HTTP server on port 8080 with our service actor as the handler 
    IO(Http) ? Http.Bind(service, interface = "localhost", port = 8080) 
} 

Demo Сервис Актер (для спрея)

class DemoServiceActor extends Actor with Api { 

    // the HttpService trait defines only one abstract member, which 
    // connects the services environment to the enclosing actor or test 
    def actorRefFactory = context 

    // this actor only runs our route, but you could add 
    // other things here, like request stream processing 
    // or timeout handling 
    def receive = handleTimeouts orElse runRoute(route) 

    //Used to watch for request timeouts 
    //http://spray.io/documentation/1.1.2/spray-routing/key-concepts/timeout-handling/ 
    def handleTimeouts: Receive = { 
    case Timedout(x: HttpRequest) => 
     sender ! HttpResponse(StatusCodes.InternalServerError, "Too late") 
    } 


} 

//Master trait for handling large APIs 
//http://stackoverflow.com/questions/14653526/can-spray-io-routes-be-split-into-multiple-controllers 
trait Api extends DemoService { 
    val route = { 
    messageApiRouting 
    } 
} 

Demo Spray Service (Route):

trait DemoService extends HttpService with Actor { 
    implicit val timeout = Timeout(5 seconds) // needed for `?` below 
    val redisActor = context.actorOf(Props[RedisActor], "redisactor") 

    val messageApiRouting = 
     path("summary"/Segment/Segment) { (dataset, timeslice) => 
      onComplete(getSummary(redisActor, dataset, timeslice)) { 
      case Success(value) => complete(s"The result was $value") 
      case Failure(ex) => complete(s"An error occurred: ${ex.getMessage}") 
      } 
     } 

    def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] = Future { 

    val dbMessage = DbMessage("summary", dataset + timeslice) 
    val future = redisActor ? dbMessage 
    val result = Await.result(future, timeout.duration).asInstanceOf[String] 
    result 
    } 

} 

Redis Актер (Mock никакого фактического Redis клиент еще)

class RedisActor extends Actor with ActorLogging { 
    // val pool = REDIS 
    implicit val timeout = Timeout(5 seconds) // needed for `?` below 
    val summaryActor = context.actorOf(Props[SummaryActor], "summaryactor") 


    def receive = { 

    case msg: DbMessage => { 
     msg.query match { 
     case "summary" => { 
      log.debug("Summary Query Request") 
      log.debug(sender.path.toString) 
      summaryActor ! msg 
     } 
     } 
    } 

    //If not match log an error 
    case _ => log.error("Received unknown message: {} ") 
    } 
} 

class SummaryActor extends Actor with ActorLogging{ 

    def receive = { 
    case msg: DbMessage =>{ 
     log.debug("Summary Actor Received Message") 
     //Send back to Spray Route 

    } 
    } 
} 

ответ

3

Первая проблема с вашим кодом является то, что вам нужно передать от ведущего актера ребенка так, чтобы sender должным образом распространяются и доступен для ребенка, чтобы ответить. Таким образом, это изменить (в RedisActor):

summaryActor ! msg 

To:

summaryActor forward msg 

Это основная проблема. Исправьте это, и ваш код должен начать работать. Однако есть еще что-то, что требует внимания. Ваш метод getSummary в настоящее время определяется как:

def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] = 
    Future { 
    val dbMessage = DbMessage("summary", dataset + timeslice) 
    val future = redisActor ? dbMessage 
    val result = Await.result(future, timeout.duration).asInstanceOf[String] 
    result 
    } 

Проблема здесь в том, что ask операция (?) уже возвращает Future, так что и вы блокируете на него, чтобы получить результат, обертывание, что в другом Future так что вы можете вернуть Future для работы с onComplete. Вы должны быть в состоянии упростить с помощью Future вернулся из ask непосредственно следующим образом:

def getSummary(redisActor: ActorRef, dataset: String, timeslice: String): Future[String] = { 
    val dbMessage = DbMessage("summary", dataset + timeslice) 
    (redisActor ? dbMessage).mapTo[String]  
} 
+0

спасибо тонну. Я рассмотрю предлагаемые обновления. Благодаря! – scarpacci

+0

Работали как обаяние на обоих счетах. Еще раз спасибо. – scarpacci

1

Просто важный комментарий на вышеуказанных подходов.

Поскольку функция getSummary (...) возвращает объект Future [String], и вы называете его в OnComplete (...) функции вы должны импортировать:

импорт ExecutionContext.Implicits.global

Таким образом, у вас будет ExecutionContext в области видимости, позволяя Future объявить неявный параметр ExecutionContext.

** Если вы этого не сделаете, вы будете в конечном итоге получить несовпадение ошибку так OnComplete (...) ожидает OnComplete будущего магнит объекта, но вы дали будущее [String] Объект.

Смежные вопросы