2014-12-30 2 views
0

Предположим, у нас есть актер, который делит задачи на подзадачи и делегирует их дочерним актерам. Этот актер выполняет свою собственную работу после получения ответов от всех субактеров.Как обрабатывать ответы от детей-актеров?

Какова наилучшая практика для управления ответами субакторов?

Поместите свои результаты в, например, Map<ActorRef, Object> и проверьте каждый раз, если все ответы были получены?

ответ

0

Helo, как правило, я реализую его с помощью List, или если вы хотите Map, типа объектов. Либо: Left и Right (для сохранения информации об итогах каждого актера - успех или неудача) и проверка длины карты каждый раз, когда новое сообщение из принимается ребенок, и когда все полученные сообщения отправляются на самообслуживание WorkDone или sth с этим значением.

Где-то в сети я видел модель с отдельным актером, созданной для сбора всех сообщений образуют Чайлдс

superactor:

val n = 1000 
val collector = system.actorOf(Props(classOf[Collector],n) 
for(i<-1 to n){ 
    system.actorOf(Child.props()).!(DoSomeWork)(collector) 
} 

коллектор

class Collector(expectedMsgs: Int) extends Actor { 
    var results = someCollection 

    def endCollecting() = { 
     // You can add here some logic before returning data 
     context.parent ! results.toList //or whatever you want 
     context.become(afterWork) 
    } 

    def receive = { 
     case r: DataDone => 
      results += Right(r) 
      if(results.length>=expectedMsgs){ 
       endCollecting() 
      } 
     case r: DataFail => 
      results += Left(r) 
      if(results.length>=expectedMsgs){ 
       endCollecting() 
      } 
    } 

    def afterWork = { 
     case _ => 
      // for catch some late or mutiple messages 
    } 

} 

Также обратите внимание на http://doc.akka.io/docs/akka/snapshot/contrib/aggregator.html

+0

Что точку отправки A 'WorkDone' сообщение обратно к себе, а не просто делать, что работа инлайн прямо там. Похоже на ненужный дополнительный удар по почтовому ящику. Если бы вы были в обратном вызове 'Future', тогда это мог бы быть правильный подход, но вы этого не сделали, поэтому это кажется ненужным. – cmbaxter

+0

В этом точном коде Да, вы правы, но это хорошее место, чтобы сделать некоторую логику после того, как все сообщения собраны, но перед их возвратом, но, может быть, лучше будет impement с использованием метода, я отредактирую этот ответ – Morgaroth

1

Yo u можете использовать Composing Futures. Лучше использовать его в сочетании с Routing для детей. Что-то вроде:

import scala.concurrent._ 
import akka.pattern._ 
import akka.routing._ 
import akka.actor._ 
import akka.util._ 
import scala.concurrent.duration._ 


case class Req(i: Int) 
case class Response(i: Int) 

class Worker extends Actor { def receive = {case Req(i) => sender ! Response(i) }} 
class Parent extends Actor { 
    import context.dispatcher 
    implicit val timeout = Timeout(10, MINUTES) //timeout for response from worker 
    val router: ActorRef = 
     context.actorOf(RoundRobinPool(5).props(Props[Worker]), "router") 
    def processResults(rs: Seq[Response]) = {println(rs); rs} 
    def receive = { 
     case t: List[Req] => 
      Future.sequence(t.map(router ? _).map(_.mapTo[Response])).map(processResults) 
    } 
} 

Результаты:

scala> ActorSystem().actorOf(Props(classOf[Parent])) ! List(Req(1), Req(2), Req(3)) 

List(Response(1), Response(2), Response(3)) 
+0

Этот шаблон достаточно обычен с аккой, я бы хотел, чтобы был способ избежать накладных расходов. – tariksbl

+0

Вы имеете в виду, что спросить создает новый актер и обещает? я не думаю, что это не так много. Более того, это решение является асинхронным, поэтому родительский актер не блокирует. Если тайм-аут является проблемой - вы можете просто реализовать его с обещанием: 'class Asker (to: ActorRef, p: Promise) = {... case resp: Resp => prom.com (resp); case r: Req => to! REQ} '. Если вы хотите избежать создания актера - просто переместите '(to, p)' на карту внутреннего айзера и отправьте их с сообщением. – dk14

Смежные вопросы