2016-06-07 3 views
0

Следующий пример кода (который вы можете копировать и запускать) показывает MyParentActor, который создает MyChildActor.Akka: Заказ сообщений после перезагрузки Akka

MyChildActor создает исключение для своего первого сообщения, которое заставляет его перезапускать.

Однако я хочу, чтобы «Message 1» все еще обрабатывался до «Message 2» при перезапуске MyChildActor.

Вместо этого происходит то, что сообщение 1 добавляется в хвост очереди почтовых ящиков, и поэтому сначала обрабатывается сообщение 2.

Как добиться упорядочения исходных сообщений при перезапуске актера, не создавая собственный почтовый ящик и т. Д.?

object TestApp extends App { 
    var count = 0 
    val actorSystem = ActorSystem() 


    val parentActor = actorSystem.actorOf(Props(classOf[MyParentActor])) 
    parentActor ! "Message 1" 
    parentActor ! "Message 2" 

    class MyParentActor extends Actor with ActorLogging{ 
    var childActor: ActorRef = null 

    @throws[Exception](classOf[Exception]) 
    override def preStart(): Unit = { 
     childActor = context.actorOf(Props(classOf[MyChildActor])) 
    } 

    override def receive = { 
     case message: Any => { 
     childActor ! message 
     } 
    } 

    override def supervisorStrategy: SupervisorStrategy = { 
     OneForOneStrategy() { 
      case _: CustomException => Restart 
      case _: Exception   => Restart 
     } 
    } 
    } 

    class MyChildActor extends Actor with ActorLogging{ 


    override def preRestart(reason: Throwable, message: Option[Any]): Unit = { 
     message match { 
     case Some(e) => self ! e 
     } 
    } 

    override def receive = { 
     case message: String => { 
     if (count == 0) { 
      count += 1 
      throw new CustomException("Exception occurred") 
     } 
     log.info("Received message {}", message) 
     } 
    } 
    } 

    class CustomException(message: String) extends RuntimeException(message) 
} 

ответ

1

Вы можете отметить сообщение об ошибке специальным конвертом и прикрыть все до получения этого сообщения (см. Реализацию дочернего актера). Просто определите поведение, в котором актер закроет каждое сообщение, за исключением конкретного конверта, обработает его полезную нагрузку, а затем разблокирует все другие сообщения и вернется к нормальному поведению.

Это дает мне:

INFO TestApp$MyChildActor - Received message Message 1 
INFO TestApp$MyChildActor - Received message Message 2 

object TestApp extends App { 
    var count = 0 
    val actorSystem = ActorSystem() 


    val parentActor = actorSystem.actorOf(Props(classOf[MyParentActor])) 
    parentActor ! "Message 1" 
    parentActor ! "Message 2" 

    class MyParentActor extends Actor with ActorLogging{ 
    var childActor: ActorRef = null 

    @throws[Exception](classOf[Exception]) 
    override def preStart(): Unit = { 
     childActor = context.actorOf(Props(classOf[MyChildActor])) 
    } 

    override def receive = { 
     case message: Any => { 
      childActor ! message 
     } 
    } 

    override def supervisorStrategy: SupervisorStrategy = { 
     OneForOneStrategy() { 
      case e: CustomException => Restart 
      case _: Exception => Restart 
     } 
    } 
    } 

    class MyChildActor extends Actor with Stash with ActorLogging{ 


    override def preRestart(reason: Throwable, message: Option[Any]): Unit = { 
     message match { 
      case Some(e) => 
       self ! Unstash(e) 
     } 
    } 

    override def postRestart(reason: Throwable): Unit = { 
     context.become(stashing) 
     preStart() 
    } 

    override def receive = { 
     case message: String => { 
      if (count == 0) { 
       count += 1 
       throw new CustomException("Exception occurred") 
      } 
      log.info("Received message {}", message) 
     } 
    } 

    private def stashing: Receive = { 
     case Unstash(payload) => 
      receive(payload) 
      unstashAll() 
      context.unbecome() 
     case m => 
      stash() 
    } 
    } 

    case class Unstash(payload: Any) 
    class CustomException(message: String) extends RuntimeException(message) 
} 
Смежные вопросы