2016-08-30 3 views
1

У меня есть экземпляры дела класса Thing, и у меня есть куча запросов для запуска, которые возвращают коллекцию Thing с так:Scala - Пакетный поток из Futures

def queries: Seq[Future[Seq[Thing]]] 

мне нужно собрать все Thing s из всех фьючерсов (как и выше) и группировать их в коллекцию одинакового размера в 10 000, чтобы они могли быть сериализованы в файлы размером 10 000 Thing.

def serializeThings(Seq[Thing]): Future[Unit] 

Я хочу, чтобы быть реализован таким образом, что я не ждать всех запросов скороходов сериализации. Как только 10 000 Thing s возвращаются после завершения фьючерсов первых запросов, я хочу начать сериализацию.

Если я что-то вроде:

Future.sequence(queries) 

Он будет собирать результаты всех запросов, но я понимаю, что такие операции, как map не будет вызываться, пока все запросы полные и все Thing s должны сразу вписывается в память.

Каков наилучший способ реализации объединенного потокового конвейера с использованием коллекций Scala и параллельных библиотек?

+0

Если вы хотите обработать N фьючерсов в партиях 10 000, это может помочь (это http://stackoverflow.com/a/38255964/4496364). Кроме того, у Джеймса Уорда есть сообщение с болгом (здесь) (https://www.jamesward.com/2016/06/16/reactive-web-request-batching-with-scala-and-play-framework/). –

+0

@ insan-e Я не хочу покупать фьючерсы. Я хочу выставить результаты фьючерсов, которые каждое будущее возвращает произвольное количество 'Thing' – Samuel

ответ

1

Я думаю, что мне удалось сделать что-то. Решение основано на моем предыдущем answer. Он собирает результаты от Future[List[Thing]] результатов, пока не достигнет уровня BatchSize. Затем он вызывает serializeThings будущее, когда он заканчивается, цикл продолжается с остальными.

object BatchFutures extends App { 

    case class Thing(id: Int) 

    def getFuture(id: Int): Future[List[Thing]] = { 
    Future.successful { 
     List.fill(3)(Thing(id)) 
    } 
    } 

    def serializeThings(things: Seq[Thing]): Future[Unit] = Future.successful { 
    //Thread.sleep(2000) 
    println("processing: " + things) 
    } 

    val ids = (1 to 4).toList 
    val BatchSize = 5 

    val future = ids.foldLeft(Future.successful[List[Thing]](Nil)) { 
    case (acc, id) => 
     acc flatMap { processed => 
     getFuture(id) flatMap { res => 
      val all = processed ++ res 
      val (batch, rest) = all.splitAt(5) 

      if (batch.length == BatchSize) { // if futures filled the batch with needed amount 
      serializeThings(batch) map { _ => 
       rest // process the rest 
      } 
      } else { 
      Future.successful(all) //if we need more Things for a batch 
      } 
     } 
     } 
    }.flatMap { rest => 
    serializeThings(rest) 
    } 

    Await.result(future, Duration.Inf) 

} 

В результате печатает:

обработка: Список (Вещь (1), Вещь (1), Вещь (1), Вещь (2), Вещь (2))
обработка: Список (Вещь (2), Вещь (3), Вещь (3), Вещь (3), Вещь (4))
обработки: Список (Вещь (4), Вещь (4))

При число Thing s не делится на BatchSize, мы должны ll serializeThings еще раз (последний flatMap). Я надеюсь, что это помогает! :)

0

Прежде чем вы сделаете Future.sequence, сделайте то, что вы хотите сделать с индивидуальным будущим, а затем используйте Future.sequence.

//this can be used for serializing 
def doSomething(): Unit = ??? 

//do something with the failed future 
def doSomethingElse(): Unit = ??? 

def doSomething(list: List[_]) = ??? 

val list: List[Future[_]] = List.fill(10000)(Future(doSomething())) 

val newList = 
list.par.map { f => 
    f.map { result => 
    doSomething() 
    }.recover { case throwable => 
    doSomethingElse() 
    } 
} 

Future.sequence(newList).map (list => doSomething(list)) //wait till all are complete 

вместо newList поколения вы можете использовать Future.traverse

Future.traverse(list)(f => f.map(x => doSomething()).recover {case th => doSomethingElse() }).map (completeListOfValues => doSomething(completeListOfValues)) 
Смежные вопросы