2014-12-02 4 views
0

У меня есть фьючерсный пул, и каждое будущее работает с той же аккой System Actor - некоторые Актеры в системе должны быть глобальными, некоторые из них используются только в одном будущем.akka Выбор актера без условий гонки

val longFutures = for (i <- 0 until 2) yield Future { 
    val p:Page = PhantomExecutor(isDebug=true) 
    Await.result(p.open("http://www.stackoverflow.com/") ,timeout = 10.seconds) 
    } 

PhantomExecutor tryes использовать один общий глобальный актер (простое приращение счетчика) с помощью system.actorSelection

def selectActor[T <: Actor : ClassTag](system:ActorSystem,name:String) = { 
    val timeout = Timeout(0.1 seconds) 
    val myFutureStuff = system.actorSelection("akka://"+system.name+"/user/"+name) 
    val aid:ActorIdentity = Await.result(myFutureStuff.ask(Identify(1))(timeout).mapTo[ActorIdentity], 
     0.1 seconds) 

    aid.ref match { 
     case Some(cacher) => 
     cacher 
     case None => 
     system.actorOf(Props[T],name) 
    } 
    } 

Но в параллельной среде этот подход не работает из-за состояния гонки.

Я знаю только одно решение этой проблемы - создайте глобальных участников перед расщеплением на фьючерсы. Но это означает, что я не могу инкапсулировать много скрытой работы от пользователя верхней библиотеки.

ответ

1

Вы правы в том, что убедитесь, что глобальные участники инициализированы вначале - это правильный подход. Не можете ли вы связать их сопутствующим объектом и ссылаться на них оттуда, чтобы вы знали, что они будут только когда-либо инициализированы? Если вы действительно не можете пойти с таким подходом, тогда вы можете попробовать что-то подобное для поиска или создания актера. Он похож на код, но включать в себя логику, чтобы вернуться через поиск/создание логики (рекурсивно), если условие гонки удара (только до максимального количества раз):

def findOrCreateActor[T <: Actor : ClassTag](system:ActorSystem, name:String, maxAttempts:Int = 5):ActorRef = { 
    import system.dispatcher 
    val timeout = 0.1 seconds 

    def doFindOrCreate(depth:Int = 0):ActorRef = { 
     if (depth >= maxAttempts) 
     throw new RuntimeException(s"Can not create actor with name $name and reached max attempts of $maxAttempts") 

     val selection = system.actorSelection(s"/user/$name") 
     val fut = selection.resolveOne(timeout).map(Some(_)).recover{ 
     case ex:ActorNotFound => None 
     } 
     val refOpt = Await.result(fut, timeout) 

     refOpt match { 
     case Some(ref) => ref 
     case None => util.Try(system.actorOf(Props[T],name)).getOrElse(doFindOrCreate(depth + 1)) 
     } 
    } 

    doFindOrCreate() 
    } 

Теперь повторная попытка логического будет срабатывать для любого исключения при создании актера, поэтому вы можете дополнительно указать, что (возможно, с помощью другого комбинатора recover), чтобы рекоммендовать только когда он получает InvalidActorNameException, но вы получите эту идею.

0

Возможно, вам захочется создать менеджера-менеджера, который позаботится о создании «счетчиков». Таким образом, вы обеспечили бы сериализацию запросов создания счетчика.

object CounterManagerActor { 
    case class SelectActorRequest(name : String) 
    case class SelectActorResponse(name : String, actorRef : ActorRef) 
} 

class CounterManagerActor extends Actor { 
    def receive = { 
    case SelectActorRequest(name) => { 
     sender() ! SelectActorResponse(name, selectActor(name)) 
    } 
    } 

    private def selectActor(name : String) = { 
    // a slightly modified version of the original selectActor() method 
    ??? 
    } 
} 
+0

Но следует избегать, чтобы запустить этот CounterManager актер дважды, или запустить этот актер за пределами будущего {...} называют - на самом деле это был мой вопрос, как избежать этой конструкции без уловок, как игнорирование исключения для двойного актера или непрерывного выбор. – Oleg

Смежные вопросы