2015-03-30 3 views
18

Я надеялся, что код вроде следующего будет ждать обоих фьючерсов, но это не так.Скала ждет последовательность фьючерсов

object Fiddle { 
    val f1 = Future { 
    throw new Throwable("baaa") // emulating a future that bumped into an exception 
    } 

    val f2 = Future { 
    Thread.sleep(3000L) // emulating a future that takes a bit longer to complete 
    2 
    } 

    val lf = List(f1, f2) // in the general case, this would be a dynamically sized list 

    val seq = Future.sequence(lf) 

    seq.onComplete { 
    _ => lf.foreach(f => println(f.isCompleted)) 
    } 
} 

val a = FuturesSequence 

Я предположил, seq.onComplete будет ждать их всех, чтобы завершить до завершения себя, но не так; это приводит:

true 
false 

.sequence было немного трудно следовать в источнике scala.concurrent.Future, интересно, как я бы реализовать параллельно, что ждет всех первоначальных фьючерсов а (динамически размера) последовательности, или что может быть проблемой здесь.

Edit: Смежный вопрос: https://worldbuilding.stackexchange.com/questions/12348/how-do-you-prove-youre-from-the-future :)

ответ

22

Один общий подход к ожиданию всех результатов (неудачный или неудачный) - это «поднять» неудачи в новое представление в будущем, чтобы все фьючерсы заканчивались некоторым результатом (хотя они могут завершиться результатом, который представляет собой сбой). Один естественный способ добиться этого - подняться до Try.

Twitter's implementation of futures обеспечивает liftToTry метод, который делает это тривиально, но вы можете сделать что-то подобное с реализацией стандартной библиотеки:

import scala.util.{ Failure, Success, Try } 

val lifted: List[Future[Try[Int]]] = List(f1, f2).map(
    _.map(Success(_)).recover { case t => Failure(t) } 
) 

Теперь Future.sequence(lifted) будет завершен, когда каждое будущее будет завершено, и будет представлять успехи и неудачи используя Try.

Итак, общее решение для ожидания всех исходных фьючерсов последовательности фьючерсов может выглядеть следующим образом, предполагая, что контекст исполнения, конечно, неявно доступен.

import scala.util.{ Failure, Success, Try } 

    private def lift[T](futures: Seq[Future[T]]) = 
    futures.map(_.map { Success(_) }.recover { case t => Failure(t) }) 

    def waitAll[T](futures: Seq[Future[T]]) = 
    Future.sequence(lift(futures)) // having neutralized exception completions through the lifting, .sequence can now be used 

    waitAll(SeqOfFutures).map { 
    // do whatever with the completed futures 
    } 
+0

Большое спасибо, я, вероятно, буду использовать и адаптировать эту функцию! Не могли бы вы найти точное сообщение о том, что использование 'Try' по существу отображает результат результата результата' возвращаемого значения ' исключение управления потоком' в 'Успех | Ошибка возврата? – matanster

+0

Кстати, я мог бы очень точно реализовать одно и то же преобразование, используя обещания, а не 'Try'. Я уже склоняюсь к обещаниям, когда мне нужно расширить свойство будущего (сутенерство с неявными классами). – matanster

+1

Вам почти наверняка не нужны обещания здесь. «Попробуй» - это, по сути, синхронное «Будущее», где вместо трех состояний (неудовлетворенности, успеха, неудачи) у вас есть только два, что точно соответствует этой проблеме. –

14

Future производства Future.sequence завершается, когда:

  • все фьючерсы завершили успешно, или
  • один из фьючерсов не удалось

Второй момент - это то, что в вашем случае, и имеет смысл закончить, как только один из завернутых Future потерпел неудачу, потому что упаковка Future может содержать только номер Throwable в случае сбоя. Нет смысла ждать других фьючерсов, потому что результат будет тем же самым провалом.

+0

Спасибо, правильно, я должен был собрать это поведение из унарной природы типа '.sequence'. Но дело в том, что я хотел бы дождаться, когда все они закончатся; использование '.sequence' было всего лишь (неправильным) средством. Как я могу выполнить это наиболее прямолинейно? – matanster

+0

Это не отвечает на вторую часть вопроса. Совершенно разумно хотеть собрать все результаты, потерпеть неудачу или нет, и стыдно, что реализация фьючерсов стандартной библиотеки не имеет ничего подобного [TwitterTryToTry] (https://twitter.github.io/util/ docs/index.html # com.twitter.util.Future), чтобы облегчить это. –

+0

@TravisBrown Необязательно использовать этот метод в stdlib Фьючерсы: f.transform (Success (_)) –

3

Это пример, который поддерживает предыдущий ответ. Существует простой способ сделать это, используя только стандартные API Scala.

В примере я создаю 3 фьючерса. Они пройдут через 5, 7 и 9 секунд соответственно. Вызов Await.result будет заблокирован до тех пор, пока все фьючерсы не будут разрешены. После того, как все 3 фьючерса завершены, a будет установлен в List(5,7,9), и исполнение будет продолжено.

Кроме того, если в любом из фьючерсов выбрано исключение, Await.result немедленно разблокирует и выбросит исключение. Раскомментируйте строку Exception(...), чтобы увидеть это в действии.

try { 
    val a = Await.result(Future.sequence(Seq(
     Future({ 
     blocking { 
      Thread.sleep(5000) 
     } 
     System.err.println("A") 
     5 
     }), 
     Future({ 
     blocking { 
      Thread.sleep(7000) 
     } 
     System.err.println("B") 
     7 
     //throw new Exception("Ha!") 
     }), 
     Future({ 
     blocking { 
      Thread.sleep(9000) 
     } 
     System.err.println("C") 
     9 
     }))), 
     Duration("100 sec")) 

    System.err.println(a) 
    } catch { 
    case e: Exception ⇒ 
     e.printStackTrace() 
    } 
1

Мы можем обогатить Seq[Future[T]] со своим собственным onComplete методом с помощью неявного класса:

def lift[T](f: Future[T])(implicit ec: ExecutionContext): Future[Try[T]] = 
    f map { Success(_) } recover { case e => Failure(e) } 

    def lift[T](fs: Seq[Future[T]])(implicit ec: ExecutionContext): Seq[Future[Try[T]]] = 
    fs map { lift(_) } 

    implicit class RichSeqFuture[+T](val fs: Seq[Future[T]]) extends AnyVal { 
    def onComplete[U](f: Seq[Try[T]] => U)(implicit ec: ExecutionContext) = { 
     Future.sequence(lift(fs)) onComplete { 
     case Success(s) => f(s) 
     case Failure(e) => throw e // will never happen, because of the Try lifting 
     } 
    } 
    } 

Затем, в вашем конкретном MWE, вы можете сделать:

val f1 = Future { 
    throw new Throwable("baaa") // emulating a future that bumped into an exception 
    } 

    val f2 = Future { 
    Thread.sleep(3000L) // emulating a future that takes a bit longer to complete 
    2 
    } 

    val lf = List(f1, f2) 

    lf onComplete { _ map { 
    case Success(v) => ??? 
    case Failure(e) => ??? 
    }} 

Это решение имеет преимущество, позволяющее вам называть onComplete в последовательности фьючерсов, как в одном будущем.

+0

Надеюсь, нам не понадобится эта обманка в новой библиотеке коллекций – matanster

+0

@matanster, есть ли что-то особенное в разработке новой библиотеки коллекций, которая заставляет вас надеяться, что новая библиотека коллекций может позволить нам достичь такого же поведения с меньшим обман? – Bruno

Смежные вопросы