Предполагая, что вам необходимо собрать как минимум частичную статистику каждые 10 секунд - решение состоит в том, чтобы преобразовать «не реагирующий» на фактический сбой.
Для этого просто увеличьте Await
тайм-аут в сравнении с implicit val t:Timeout
для запроса. После этого ваши будущие фьючерсы (возвращенные с ?
) потерпят неудачу раньше. Таким образом, вы можете recover их:
// Works only when AskTimeout >> AwaitTimeout
val qfiltered = q.map(_.map(Some(_)).recover{case _ => None}) //it's better to match TimeoutException here instead of `_`
val allInverted = Future.sequence(q).map(_.flatten)
Пример:
scala> class MyActor extends Actor{ def receive = {case 1 => sender ! 2; case _ =>}}
defined class MyActor
scala> val a = sys.actorOf(Props[MyActor])
a: akka.actor.ActorRef = Actor[akka://1/user/$c#1361310022]
scala> implicit val t: Timeout = Timeout(1 seconds)
t: akka.util.Timeout = Timeout(1 second)
scala> val l = List(a ? 1, a ? 100500).map(_.map(Some(_)).recover{case _ => None})
l: List[scala.concurrent.Future[Option[Any]]] = List([email protected], [email protected])
scala> Await.result(Future.sequence(l).map(_.flatten), 3 seconds)
warning: there were 1 feature warning(s); re-run with -feature for details
res29: List[Any] = List(2)
Если вы хотите знать, какие в будущем не ответил - удалить flatten
.
Получение частичного ответа должно быть достаточным для непрерывного сбора статистики, как если бы какой-либо рабочий-актер не отвечал вовремя - он будет отвечать в следующий раз фактическими данными и без каких-либо потерянных данных. Но вы должны соответствовать жизненному циклу рабочего процесса и не потерять (если это имеет значение) любые данные внутри самого актера.
Если причина таймаутов только высокое давление в системе - вы можете рассмотреть следующие вопросы:
- отдельный бассейн для работников
- противодавления
- кэширования для входных запросов (когда система перегружена).
Если причиной таких тайм-аутов является какое-то удаленное хранилище, то частичный ответ является правильным способом его обработки, если клиент готов к этому. Например, WebUI может предупредить пользователя о том, что показанные данные могут быть неполными, используя некоторую прялку. Но не забудьте не блокировать актеров с запросами на хранение (фьючерсы могут помочь) или хотя бы переместить их в разделительный пул потоков.
Если работник не ответил из-за сбоя (например, исключение) - вы все равно можете отправить уведомление отправителю с вашего preRestart
- так что вы также можете получить причину, по которой у работника нет статистики. Единственное: здесь вы проверяете наличие отправителя (it may not be)
P.S. Надеюсь, вы не сделаете Await.result
внутри какого-то актера - блокировка актера не рекомендуется, по крайней мере, для производительности вашего приложения. В некоторых случаях это может вызвать даже взаимные блокировки или утечки памяти. Поэтому ожидание должно быть размещено где-то на фасаде вашей системы (если базовая структура не поддерживает фьючерсы).
Таким образом, может иметь смысл обрабатывать ваши ответы асинхронно (вы все равно должны восстановить их от провала, если какой-то актер не отвечает):
//actor:
val parent = sender
for(list <- Future.sequence(qfiltered)) {
parent ! process(list)
}
//in sender (outside of the actors):
Await(actor ? Get, 10 seconds)
Почему вы не можете спросить их руководителей? –