2015-09-11 3 views
1

Я пытаюсь поймать «Terminate Signal» от дочернего к родительскому актеру, однако среди пула сообщений о недопустимых сообщениях сигнал не попадает в очередь родительского актера. Каков наилучший способ решить эту проблему?Как определить приоритеты сообщений, отправленных между актерами в Scala?

Вот фрагмент кода, я работаю над:

class MinerActor extends Actor { 
    var count:Int = 0 
    def receive = { 
     case Mine => 
      //some task here  
      //if success 
       count = count + 1 
      // 
      if (count >= 100) 
      { 
       context.stop(self) 
      } 
} 

class MasterActor extends Actor { 
    val miner = context.actorOf(Props(new MinerActor,name = "miner") 
    context.watch(miner) 

    def receive = { 
     case Foo => 
      while (true) { 
       miner ! Mine 
      } 

     case Terminated(miner) => 
      println("Miner Terminated!!") 
      context.stop(self) 
      context.system.shutdown 
    } 
} 

Здесь «Отменено (шахтер)» дело никогда не вызывается. Вместо этого на stdout я вижу много сообщений с мертвой буквой, отправленных от Master to Miner (что вроде бы ожидаемо, когда актер-минер останавливается). Однако как определить приоритет сигнала Terminate на шине Event, чтобы достичь Master Actor?

Если я ограничиваю цикл while до 200 вместо бесконечности, после 100 сообщений о недоставке я получаю сигнал Terminate, который печатает «Miner Terminated !!». Но как достичь этого, когда цикл находится в бесконечности?

Я новичок в программировании Scala/Akka, моя главная цель - запустить «// some task» за 100 успешных времен, а затем выйти из всей программы. Это хороший способ достичь этой задачи?

ответ

4

Заменить:

case Foo => 
    while (true) { 
    miner ! Mine 
    } 

с

case Foo => 
    miner ! Mine 
    self forward Foo 
+0

это сработало! спасибо за отображение «вперед». – snackbar

2

Проблема в том, что вы бесконечны, а цикл блокирует поток актера. Как следствие, ваш мастер-актер всегда застревает в обработке первого полученного сообщения Foo и никогда не будет обрабатывать какие-либо другие сообщения в почтовом ящике. Причина этого в том, что существует только один поток, который отвечает за прием сообщений. Это имеет некоторые действительно хорошие последствия, потому что вам в основном не нужно беспокоиться о параллелизме внутри одного актера, если ваши изменения состояния происходят только в этом потоке.

Существует несколько способов решения этой проблемы. Я бы рекомендовал использовать планировщик для планирования повторной задачи.

class MasterActor extends Actor { 
    var minerOption: Option[ActorRef] = None 
    var mineMessageOption: Option[Cancellable] = None 

    override def preStart: Unit = { 
    minerOption = Some(context.actorOf(Props(new MinerActor,name = "miner"))) 

    minerOption.foreach(context.watch(_)) 

    import context.dispatcher 

    mineMessageOption = Some(context.system.scheduler.schedule(0 seconds, 1 seconds, self, Foo)) 
    } 

    def receive = { 
    case Foo => 
     minerOption.foreach { 
     _ ! Mine 
     } 

    case Terminated(miner) => 
     println("Miner Terminated!!") 

     mineMessageOption.foreach(_.cancel()) 

     context.stop(self) 
     context.system.shutdown 
    } 
} 

В schedule звонка можно определить интервал сообщения Foo и, таким образом, сколько сообщений будет отправлено шахтера.

+0

Планирование, что это хороший способ. Будучи новичком в scala, я не мог получить большую часть кода, интуитивно я понял, что это решает проблему, планируя сообщение каждую секунду. Однако я понял, что могу изменить код, переместив цикл в «MinerActor» на 1000 итераторов и отправив сообщение «Мастер», который снова запустит «шахтер», если count! = 100 и завершает работу программы. Спасибо за альтернативный подход! – snackbar

Смежные вопросы