Я использую Akka Cluster (версия 2.4.10) с несколькими узлами, предназначенными для роли «front-end», а некоторые другие - «рабочими». Рабочие находятся на удаленных машинах. Входящая работа распределяется передним игроком для работников круговой маршрутизацией. Проблема заключается в отправке ответа от «рабочих» обратно на стороннего актера. Я вижу, что работа завершается рабочими. Но сообщение, отправленное рабочими в front-end, не достигает и заканчивается как мертвые буквы. Я вижу ошибку ниже в журнале.Akka round-robin: Отправка ответа от удаленных маршрутов отправителю
[Cluster-akka.actor.default-dispatcher-21] [akka://Cluster/deadLetters] Message [scala.collection.immutable.$colon$colon] from Actor[akka://Cluster/user] to Actor[akka://Cluster/deadLetters] was not delivered. [6] dead letters encountered.
Я видел this и я следую за то же самое в моем коде. Я также видел this, но предлагаемое решение не применяется в этом случае, потому что я не знаю маршруты вверх. Он приходит через конфигурацию и может меняться. Конфигурация маршрутизатора с круглым маршрутизатором приведена ниже.
akka.actor.deployment {
/frontEnd/hm = {
router = round-robin-group
nr-of-instances = 5
routees.paths = ["/user/hmWorker"]
cluster {
enabled = on
use-role = backend
allow-local-routees = on
}
}
}
Маршрутизатор создается в лицевом лице, как показано ниже.
val router = context.actorOf(FromConfig.props(), name = "hm")
val controller = context.actorOf(Props(classOf[Controller], router))
Контроллер и рабочие коды указаны ниже.
// Node 1 : Controller routes requests using round-robin
class Controller(router: ActorRef) extends Actor {
val list = List("a", "b") // Assume this is a big list
val groups = list.grouped(500)
override def receive: Actor.Receive = {
val futures = groups.map(grp => (router ? Message(grp)).mapTo[List[String]]))
val future = Future.sequence(futures).map(_.flatten)
val result = Await.result(future, 50 seconds)
println(s"Result is $result")
}
}
// Node 2
class Worker extends Actor {
override def receive: Actor.Receive = {
case Message(lst) =>
val future: Future[List[String]] = // Do Something asynchronous
future onComplete {
case Success(r) => sender.!(r)(context.parent) // This message is not delivered to Controller actor.
case Failure(th) => // Error handling
}
}
}
Пожалуйста, дайте мне знать, что я делаю неправильно здесь. Ценю вашу помощь.
Да. Он работает с «pipeTo». Благодарю. – Jegan