2015-02-14 3 views
11

Я обнаружил, что построение большого количества фьючерсов для одного пользовательского запроса, как правило, является плохой практикой. Эти фьючерсы могут заполнить контекст выполнения, который повлияет на другие запросы. Это маловероятно, чего вы действительно хотите. Хранение фьючерсов число малое - создать новые фьючерсы только для понятий, используя flatMap и т. Д. Но иногда может потребоваться создать Будущее для каждого элемента Seq. Использование проблемы Future.sequence или Future.traverse, описанной выше. Так что я в конечном итоге с этим решением, которое не создает Фьючерс для каждого элемента коллекции одновременно:Есть ли сборка в «медленной» версии Future.traverse?

def ftraverse[A, B](xs: Seq[A])(f: A => Future[B])(implicit ec: ExecutionContext): Future[Seq[B]] = { 
    if(xs.isEmpty) Future successful Seq.empty[B] 
    else f(xs.head) flatMap { fh => ftraverse(xs.tail)(f) map (r => fh +: r) } 
    } 

Интересно, может быть, я изобретать колесо и на самом деле такая функция уже существует где-то в библиотеке стандартной в Scala? Также я хотел бы знать, вы столкнулись с описанной проблемой и как вы ее разрешили? Может быть, если это хорошо известная проблема с Futures, я должен создать запрос на перенос в Future.scala, чтобы эта функция (или более обобщенная версия) была включена в стандартную библиотеку?

UPD: Более общая версия, ти ограниченного параллелизма:

def ftraverse[A, B](xs: Seq[A], chunkSize: Int, maxChunks: Int)(f: A => Future[B])(implicit ec: ExecutionContext): Future[Seq[B]] = { 
    val xss = xs.grouped(chunkSize).toList 
    val chunks = xss.take(maxChunks-1) :+ xss.drop(maxChunks-1).flatten 
    Future.sequence{ chunks.map(chunk => ftraverse(chunk)(f)) } map { _.flatten } 
    } 
+1

Посмотрите на «Задачу» в скалясе. Он реализует 'traverse', как определено в классе Haskell Traversable, который ведет себя таким образом (по одной операции за раз). –

ответ

10

Нет, нет ничего подобного в стандартной библиотеке. Есть ли должно быть быть или нет, я не могу сказать. Я не думаю, что очень часто хочется выполнить команду Future s в строгой последовательности. Но когда вы этого хотите, очень просто реализовать свой собственный метод, чтобы сделать это, как и у вас. Я лично просто держу метод в своих собственных библиотеках для этой цели. Однако было бы удобно иметь способ сделать это со стандартной библиотекой. Если там были, он должен быть более общим.

На самом деле очень просто изменить текущий traverse для обработки Future s последовательно, а не параллельно. Здесь current version, который использует foldLeft вместо рекурсии:

def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] = 
    in.foldLeft(Future.successful(cbf(in))) { (fr, a) => 
     val fb = fn(a) 
     for (r <- fr; b <- fb) yield (r += b) 
    }.map(_.result()) 

В Future ы созданы до flatMap путем присвоения val fb = fn(a) (и, таким образом, выполняется перед). Все, что нужно сделать, это переместить fn(a) внутри flatMap, чтобы задержать создание последующих Future s в коллекции.

def traverseSeq[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] = 
    in.foldLeft(Future.successful(cbf(in))) { (fr, a) => 
     for (r <- fr; b <- fn(a)) yield (r += b) 
    }.map(_.result()) 

Другим способом вы можете ограничить влияние выполнения большого количества Future с это с помощью другого ExecutionContext для них. Например, в веб-приложении я могу оставить один ExecutionContext для вызовов по базе данных, один для вызовов Amazon S3 и один для медленных вызовов базы данных.

Очень простая реализация может использовать фиксированные пулы потоков:

import java.util.concurrent.Executors 
import scala.concurrent.ExecutionContext 
val executorService = Executors.newFixedThreadPool(4) 
val executionContext = ExecutionContext.fromExecutorService(executorService) 

Большое количество Future с исполняющими здесь заполнит ExecutionContext, но было бы предотвратить их от заполнения других контекстов.

Если вы используете Акку, вы можете легко создавать ExecutionContext S из конфигурации с помощью Dispatchers в пределах ActorSystem:

my-dispatcher { 
    type = Dispatcher 
    executor = "fork-join-executor" 
    fork-join-executor { 
    parallelism-min = 2 
    parallelism-factor = 2.0 
    parallelism-max = 10 
    } 
    throughput = 100 
} 

Если у вас есть ActorSystem под названием system вы могли бы получить доступ к нему через:

implicit val executionContext = system.dispatchers.lookup("my-dispatcher") 

Все это зависит от вашего варианта использования. Хотя я отделяю свои асинхронные вычисления в разных контекстах, бывают случаи, когда я все же хочу, чтобы traverse последовательно сглаживал использование этих контекстов.

4

Кажется, что ваша проблема не связана с количеством созданных вами фьючерсов, а с той честностью, с которой они выполнены. Рассмотрим, как обрабатываются обратные вызовы по фьючерсам (map, flatMap, onComplete, fold и т. Д.). Они помещаются в очередь исполнителя и выполняются, когда завершаются результаты их родительских фьючерсов.

Если все ваши фьючерсы имеют один и тот же исполнитель (т. Е. Очередь), они будут действительно сжиматься друг с другом, как вы говорите. Общим способом решения этой проблемы справедливости является использование аккских актеров. Для каждого запроса запустите новый актер (со своей собственной очередью) и у всех участников этого типа поделитесь ExecutionContext. Вы можете ограничить максимальное количество сообщений, которые должен выполнить актер, прежде чем переходить к другому участнику, разделяющему это значение ExecutionContext, используя свойство конфигурации throughput.

0

Разве это не параллельные коллекции?

val parArray = (1 to 1000000).toArray.par 
sum = parArray.map(_ + _) 
res0: Int = 1784293664 

выглядит как обычный синхронный вызов метода, но параллельный сбор будет использовать ThreadPool для вычисления отображения в параллельных (условиях гонки!). Вы можете найти более подробную информацию: http://docs.scala-lang.org/overviews/parallel-collections/overview.html

+1

Спасибо за комментарий, я попытался очистить свой ответ и поставить самую важную информацию там –

+1

Обратите внимание, что вопрос был не в том, как делать что-то параллельно, а наоборот. –

0

Предполагая, что создание фьючерсов не настолько тонко изложено, что накладные расходы будут чрезмерными (в этом случае ответ, предполагающий использование параллельных коллекций, вероятно, наиболее полезен), вы мог бы просто создать другой, неявно определенный контекст исполнения для будущих фьючерсов, под которым работает другой исполнитель с его собственными потоками.

Для этого можно позвонить ExecutionContext.fromExecutorService или ExecutionContext.fromExecutor.

Смежные вопросы