2016-10-06 1 views
0

Я использую Akka Cluster (версия 2.4.10) с несколькими узлами, предназначенными для роли «front-end», а некоторые другие - «рабочими». Рабочие находятся на удаленных машинах. Входящая работа распределяется передним игроком для работников круговой маршрутизацией. Проблема заключается в отправке ответа от «рабочих» обратно на стороннего актера. Я вижу, что работа завершается рабочими. Но сообщение, отправленное рабочими в front-end, не достигает и заканчивается как мертвые буквы. Я вижу ошибку ниже в журнале.Akka round-robin: Отправка ответа от удаленных маршрутов отправителю

[Cluster-akka.actor.default-dispatcher-21] [akka://Cluster/deadLetters] Message [scala.collection.immutable.$colon$colon] from Actor[akka://Cluster/user] to Actor[akka://Cluster/deadLetters] was not delivered. [6] dead letters encountered. 

Я видел this и я следую за то же самое в моем коде. Я также видел this, но предлагаемое решение не применяется в этом случае, потому что я не знаю маршруты вверх. Он приходит через конфигурацию и может меняться. Конфигурация маршрутизатора с круглым маршрутизатором приведена ниже.

akka.actor.deployment { 
    /frontEnd/hm = { 
    router = round-robin-group 
    nr-of-instances = 5 
    routees.paths = ["/user/hmWorker"] 
    cluster { 
     enabled = on 
     use-role = backend 
     allow-local-routees = on 
    } 
    } 
} 

Маршрутизатор создается в лицевом лице, как показано ниже.

val router = context.actorOf(FromConfig.props(), name = "hm") 
val controller = context.actorOf(Props(classOf[Controller], router)) 

Контроллер и рабочие коды указаны ниже.

// Node 1 : Controller routes requests using round-robin 
class Controller(router: ActorRef) extends Actor { 

    val list = List("a", "b") // Assume this is a big list 

    val groups = list.grouped(500) 

    override def receive: Actor.Receive = { 
     val futures = groups.map(grp => (router ? Message(grp)).mapTo[List[String]])) 
     val future = Future.sequence(futures).map(_.flatten) 
     val result = Await.result(future, 50 seconds) 
     println(s"Result is $result") 
    } 
} 

// Node 2 
class Worker extends Actor { 

    override def receive: Actor.Receive = { 
     case Message(lst) => 
      val future: Future[List[String]] = // Do Something asynchronous 
      future onComplete { 
       case Success(r) => sender.!(r)(context.parent) // This message is not delivered to Controller actor. 
       case Failure(th) => // Error handling 
      } 
    } 
} 

Пожалуйста, дайте мне знать, что я делаю неправильно здесь. Ценю вашу помощь.

ответ

2

Вы не должны использовать sender() в обратном вызове на Future. К моменту обработки обратного вызова sender(), вероятно, ссылается на нечто иное, чем это было, когда вы получили сообщение.

Рассмотрим либо сохранение ссылок за пределами обратного вызова первого типа:

override def receive: Actor.Receive = { 
    case Message(lst) => 
     val future: Future[List[String]] = // Do Something asynchronous 
     val replyTo: ActorRef = sender() 
     future onComplete { 
      case Success(r) => replyTo.!(r)(context.parent) // This message is not delivered to Controller actor. 
      case Failure(th) => // Error handling 
     } 
} 

Или еще лучше, использовать шаблон трубы:

import akka.pattern.pipe 
override def receive: Actor.Receive = { 
    case Message(lst) => 
    val future: Future[List[String]] = // Do Something asynchronous 
    future.pipeTo(sender()) 
} 
+0

Да. Он работает с «pipeTo». Благодарю. – Jegan

Смежные вопросы