2015-01-31 4 views
3

В Akka, я хочу отправить сообщение статуса для участников в кластере для их статуса. Этими субъектами могут быть различные состояния здоровья, в том числе мертвые/неспособные реагировать.Что хорошего в Акке, чтобы дождаться, когда группа Актеров ответит?

Я хочу подождать до некоторого времени, скажем 10 секунд, а затем приступить к любым результатам, которые я получил, чтобы получить обратно в этот срок. Я не хочу проваливать все это, потому что у 1 или 2 были проблемы, и они не ответили/не были на 10 секунд.

Я попытался это:

object GetStats { 
    def unapply(n: ActorRef)(implicit system: ActorSystem): Option[Future[Any]] = Try { 
    implicit val t: Timeout = Timeout(10 seconds) 
    n ? "A" 
    }.toOption 
} 
... 
val z = List(a,b,c,d) // where a-d are ActorRefs to nodes I want to status 
val q = z.collect { 
    case GetStats(s) => s 
} 
// OK, so here 'q' is a List[Future[Any]] 
val allInverted = Future.sequence(q) // now we have Future[List[Any]] 
val ok = Await.result(allInverted, 10 seconds).asInstanceOf[List[String]] 
println(ok) 

Проблема с этим кодом является то, что он, кажется, бросить TimeoutException, если один или более не отвечают. Тогда я не могу получить ответы, которые вернулись.

+0

Почему вы не можете спросить их руководителей? –

ответ

4

Предполагая, что вам необходимо собрать как минимум частичную статистику каждые 10 секунд - решение состоит в том, чтобы преобразовать «не реагирующий» на фактический сбой.

Для этого просто увеличьте Await тайм-аут в сравнении с implicit val t:Timeout для запроса. После этого ваши будущие фьючерсы (возвращенные с ?) потерпят неудачу раньше. Таким образом, вы можете recover их:

// Works only when AskTimeout >> AwaitTimeout 
val qfiltered = q.map(_.map(Some(_)).recover{case _ => None}) //it's better to match TimeoutException here instead of `_` 
val allInverted = Future.sequence(q).map(_.flatten) 

Пример:

scala> class MyActor extends Actor{ def receive = {case 1 => sender ! 2; case _ =>}} 
defined class MyActor 

scala> val a = sys.actorOf(Props[MyActor]) 
a: akka.actor.ActorRef = Actor[akka://1/user/$c#1361310022] 

scala> implicit val t: Timeout = Timeout(1 seconds) 
t: akka.util.Timeout = Timeout(1 second) 

scala> val l = List(a ? 1, a ? 100500).map(_.map(Some(_)).recover{case _ => None}) 
l: List[scala.concurrent.Future[Option[Any]]] = List([email protected], [email protected]) 

scala> Await.result(Future.sequence(l).map(_.flatten), 3 seconds) 
warning: there were 1 feature warning(s); re-run with -feature for details 
res29: List[Any] = List(2) 

Если вы хотите знать, какие в будущем не ответил - удалить flatten.

Получение частичного ответа должно быть достаточным для непрерывного сбора статистики, как если бы какой-либо рабочий-актер не отвечал вовремя - он будет отвечать в следующий раз фактическими данными и без каких-либо потерянных данных. Но вы должны соответствовать жизненному циклу рабочего процесса и не потерять (если это имеет значение) любые данные внутри самого актера.

Если причина таймаутов только высокое давление в системе - вы можете рассмотреть следующие вопросы:

  • отдельный бассейн для работников
  • противодавления
  • кэширования для входных запросов (когда система перегружена).

Если причиной таких тайм-аутов является какое-то удаленное хранилище, то частичный ответ является правильным способом его обработки, если клиент готов к этому. Например, WebUI может предупредить пользователя о том, что показанные данные могут быть неполными, используя некоторую прялку. Но не забудьте не блокировать актеров с запросами на хранение (фьючерсы могут помочь) или хотя бы переместить их в разделительный пул потоков.

Если работник не ответил из-за сбоя (например, исключение) - вы все равно можете отправить уведомление отправителю с вашего preRestart - так что вы также можете получить причину, по которой у работника нет статистики. Единственное: здесь вы проверяете наличие отправителя (it may not be)

P.S. Надеюсь, вы не сделаете Await.result внутри какого-то актера - блокировка актера не рекомендуется, по крайней мере, для производительности вашего приложения. В некоторых случаях это может вызвать даже взаимные блокировки или утечки памяти. Поэтому ожидание должно быть размещено где-то на фасаде вашей системы (если базовая структура не поддерживает фьючерсы).

Таким образом, может иметь смысл обрабатывать ваши ответы асинхронно (вы все равно должны восстановить их от провала, если какой-то актер не отвечает):

//actor: 
val parent = sender 
for(list <- Future.sequence(qfiltered)) { 
    parent ! process(list) 
} 

//in sender (outside of the actors): 
Await(actor ? Get, 10 seconds) 
+0

Многого здесь я не знал. Наиболее критически Ожидание внутри актера. Попытка последнего, но проблема. У моего актера у меня есть: for (list <- allInverted) { val resp = list.mkString (",") println ("ЗДЕСЬ:" + resp) Отправитель! resp } (извините, не редактирование кода в комментариях) Вне моего актера у меня есть: неявный val t: Timeout = Timeout (15 секунд) val answer = Await.result (m? "stats", 15 секунд) println (" Ответ: «+ ответ». Поэтому я вижу, что мой код выводит результат очень быстро, но ответ идет на треугольники, а затем, наконец, время ожидания. Почему это? – Greg

+0

1) забыл упомянуть - вы должны использовать постоянную ссылку на родителя в будущем (см. Мое обновление) - отправитель динамически меняется; 2) это не поможет вам с проблемой тайм-аута - так что вам все равно нужно преобразовать тайм-ауты в отказы, как описано выше, иначе код-код внутри 'for' не будет заполняться иногда – dk14

+0

3)' implicit val timeout 'должно быть меньше, чем' Await.timeout' для (2). О SO: вы можете использовать «' »для маркировки кода блока – dk14

Смежные вопросы