2013-06-12 2 views
15

У меня есть актер, который создает дочерний актер для выполнения длительных вычислений.Как справиться с длительной инициализацией актера Акки?

Проблема заключается в том, что инициализация дочернего актера занимает несколько секунд, и все сообщения, которые родительский актер посылает ребенку между ним, создаются и полностью инициализируются, отбрасываются.

Это логика кода, который я использую:

class ChildActor extends Actor { 
    val tagger = IntializeTagger(...) // this takes a few seconds to complete 

    def receive = { 
    case Tag(text) => sender ! tagger.tag(text) 
    case "hello" => println("Hello") 
    case _ => println("Unknown message") 
    } 
} 

class ParentActor extends Actor { 
    val child = context.ActorOf(Props[ChildActor], name = "childactor") 

    // the below two messages seem to get lost 
    child ! "hello" 
    child ! Tag("This is my sample text") 

    def receive = { 
    ... 
    } 
} 

Как я могу обойти эту проблему? Можно ли заставить родительского актера ждать, пока ребенок полностью инициализируется? Я буду использовать дочернего актера с маршрутизацией и, возможно, на удаленных актерских системах.

EDIT

Следуя совету drexin, я есть изменить свой код в:

class ChildActor extends Actor { 
    var tagger: Tagger = _ 

    override def preStart() = { 
    tagger = IntializeTagger(...) // this takes a few seconds to complete 
    } 

    def receive = { 
    case Tag(text) => sender ! tagger.tag(text) 
    case "hello" => println("Hello") 
    case _ => println("Unknown message") 
    } 
} 

class ParentActor extends Actor { 
    var child: ActorRef = _ 

    override def preStart() = { 
    child = context.ActorOf(Props[ChildActor], name = "childactor") 

    // When I add 
    // Thread.sleep(5000) 
    // here messages are processed without problems 

    // wihout hardcoding the 5 seconds waiting 
    // the below two messages seem to get lost 
    child ! "hello" 
    child ! Tag("This is my sample text") 
    } 

    def receive = { 
    ... 
    } 
} 

, но проблема остается. Что мне не хватает?

+0

Какую версию Акку вы используете. У меня проблемы с имитацией проблемы. Кажется, все отлично работает для меня. – cmbaxter

+0

@cmbaxter: Я использую akka 2.2-M2. – twowo

ответ

17

Не инициализируйте tagger в конструкторе, но в крюке preStart, таким образом, сообщения будут собраны в окне сообщения и доставлены, когда актер будет готов.

редактировать:

Вы должны сделать то же самое для создания актера в вашем ParentActor классе, потому что вы бы с той же проблемой, если ChildActor ответит, до ParentActor инициализации.

edit2:

Я создал простой пример, но я не был в состоянии воспроизвести ваши проблемы. Следующий код работает прекрасно:

import akka.actor._ 

case class Tag(x: String) 

class ChildActor extends Actor { 
    type Tagger = String => String 
    var tagger: Tagger = _ 

    override def preStart() = { 
    tagger = (x: String) => x+"@tagged" // this takes a few seconds to complete 
    Thread.sleep(2000) // simulate time taken to initialize Tagger 
    } 

    def receive = { 
    case Tag(text) => sender ! tagger(text) 
    case "hello" => println("Hello") 
    case _ => println("Unknown message") 
    } 
} 

class ParentActor extends Actor { 
    var child: ActorRef = _ 

    override def preStart() = { 
    child = context.actorOf(Props[ChildActor], name = "childactor") 

    // When I add 
    // Thread.sleep(5000) 
    // here messages are processed without problems 

    // wihout hardcoding the 5 seconds waiting 
    // the below two messages seem to get lost 
    child ! "hello" 
    child ! Tag("This is my sample text") 
    } 

    def receive = { 
    case x => println(x) 
    } 
} 

object Main extends App { 

    val system = ActorSystem("MySystem") 

    system.actorOf(Props[ParentActor]) 
} 

Выход:

[info] Running Main 
Hello 
This is my sample [email protected] 
+0

Я боюсь, что это не решит мою проблему. Я обновил сообщение, чтобы объяснить его дальше. – twowo

+0

обновил мой пост – drexin

+0

Спасибо за ваш ответ, благодаря этому я понял, в чем была моя проблема: я делал system.shutdown() в своем основном приложении, прежде чем актеры имели возможность ответить! Это было глупо, но мне пришлось долго размышлять. Спасибо за ваши усилия, я отмечаю ваш ответ как принятый. – twowo

0

Я хотел бы предложить, чтобы послать «готовый» сообщение от детского актера к родителю и отправлять сообщения на ребенка актера только после того, как это сообщение будет получено. Вы можете сделать это только в методе для простых случаев использования или вы можете использовать become или FSM, чтобы изменить поведение родительского актера после того, как ребенок будет инициализирован (например, сохраните сообщения для ребенка в промежуточном хранилище и отправьте их все, когда он будь готов).

+0

Я понимаю ваш вопрос, но что, если актер, который сейчас является ребенком, был запущен не из другого актера, а только из основного потока? Должен быть более простой способ справиться с ситуацией, не так ли? – twowo

+0

Тогда сообщения могут быть спрятаны в «дочернем» акторе (как предположил cmbaxter). –

7

Я думаю, что вы, возможно, ищете комбо Stash и become. Идея будет заключаться в том, что дочерний актер установит исходное состояние в неинициализированное, и в то время как в этом состоянии он будет закрывать все входящие сообщения до тех пор, пока он не будет полностью инициализирован. Когда он полностью инициализирован, вы можете разблокировать все сообщения перед тем, как переключить поведение в состояние инициализации. Простой пример выглядит следующим образом:

class ChildActor2 extends Actor with Stash{ 
    import context._ 
    var dep:SlowDependency = _ 

    override def preStart = { 
    val me = context.self 
    Future{ 
     dep = new SlowDependency 
     me ! "done" 
    } 
    } 

    def uninitialized:Receive = { 
    case "done" => 
     unstashAll 
     become(initialized) 
    case other => stash() 
    } 

    def initialized:Receive = { 
    case "a" => println("received the 'a' message") 
    case "b" => println("received the 'b' message") 
    } 

    def receive = uninitialized 
} 

Обратите внимание на preStart, что я делаю свою инициализацию асинхронно, чтобы не остановить запуск актера. Теперь это немного уродливо, с закрытием над изменчивым dep var.Вы могли бы с уверенностью справиться с этим, отправив сообщение другому актеру, который обрабатывает экземпляр медленной зависимости и отправляет его этому актеру. После получения зависимости он будет звонить become для состояния initialized.

Теперь есть один нюанс с Stash и я вставить его в сторону от Документов Akka:

Please note that the Stash can only be used together with actors that 
have a deque-based mailbox. For this, configure the mailbox-type of the 
dispatcher to be a deque-based mailbox, such as 
akka.dispatch.UnboundedDequeBasedMailbox (see Dispatchers (Scala)). 

Теперь, если это не номера, вы можете попробовать более DI типа подход, и пусть медленная зависимость вводится в дочерний актер через его конструктор. Таким образом, вы бы определить ребенка актера, как так:

class ChildActor(dep:SlowDependency) extends Actor{ 
    ... 
} 

Тогда при запуске этого актера, вы могли бы сделать это следующим образом:

context.actorOf(new Props().withCreator(new ChildActor(slowDep)), name = "child-actor") 
+0

Во-первых, вы никогда не должны закрываться над изменчивым состоянием, а во-вторых, вы никогда не должны закрывать «ActorContext». Призыв стать/невоспитанным извне контекста участников может привести к серьезным проблемам. – drexin

+0

@drexin, спасибо за головы. Я немного изменил свой ответ. Я знаю, что это уродливое закрытие над 'dep', но оно смягчается тем фактом, что ничто не должно использоваться' dep', пока оно не перейдет в состояние 'initialized' – cmbaxter

+0

. Тем не менее, я бы не рекомендовал это делать. Мое предлагаемое решение работает, и preStart уже называется асинхронно. Ключ 'preStart' предназначен для использования кода инициализации. Нет необходимости втягивать его в другое будущее. – drexin

Смежные вопросы