2016-01-09 4 views
1

Мой вопрос прост, о методе Future.traverse. Итак, у меня есть список String-s. Каждая строка является URL-адресом веб-страницы. Затем у меня есть класс, который может принимать URL-адрес, загружать веб-страницу и анализировать некоторые данные. Все это завернуто в Future {}, поэтому результат обрабатывается асинхронно.Future.traverse, похоже, работает последовательно, а не параллельно. Это правда?

Класс упрощен вид:

class RatingRetriever(context:ExecutionContext) { 
    def resolveFilmToRating(url:String):Future[Option[Double]]={ 
    Future{ 
     //here it creates Selenium web driver, loads the url and parses it. 
    }(context) 
    } 
} 

Тогда в другом объекте у меня есть это:

implicit val executionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2)) 
    ....... 
    val links:List[String] = films.map(film => film.asInstanceOf[WebElement].getAttribute("href")) 
    val ratings: Future[List[Option[Double]]] = Future.traverse(links)(link => new RatingRetriever(executionContext).resolveFilmToRating(link)) 

Когда он работает, я могу определенно видеть это проходит сбор последовательно. Если я изменяю контекст выполнения из пула фиксированного размера в пул одиночных потоков, то поведение остается прежним. Так что я действительно удивляюсь, как я могу сделать работу Future.traverse параллельной. Вы можете посоветовать?

+0

'Future.traverse' работает параллельно. Если контекст выполнения имеет два фиксированных потока, он берет первые две ссылки для загрузки и анализа, чем следующие два и так далее. –

+0

@PeterNeyens: Возможно. Если бы это было так, я бы не поставил этот вопрос :) Я отлаживал его довольно много. Итак, для каждой ссылки создается экземпляр веб-драйвера Selenium, который запускает браузер Firefox и загружает страницу. Если бы у меня было 2 ссылки, я бы увидел 2 браузера, но я вижу только одну открытую, загруженную страницу, затем браузер закрыт, затем открывается один и т. Д. Если я изменю размер пула потоков, скажем, 4 (у меня 4 ядра в моем процессоре), то ничего не меняется. Так что, вероятно, дело не в Future.traversal, а в чем-то другом, но я не могу понять это ... –

+0

Тогда, пожалуйста, предоставьте код, который компилирует, который может показать поведение, которое вы описываете. –

ответ

2

Посмотрите на источники траверс по:

in.foldLeft(successful(cbf(in))) { (fr, a) => //we sequentially traverse Collection 
    val fb = fn(a)      //Your function comes here 
    for (r <- fr; b <- fb) yield (r += b) //Just add elem to builder 
}.map(_.result())      //Getting the collection from builder 

Так сколько параллельно вы код зависит от вашей функции Fn, рассмотрим два примера:

1) Этот код:

import scala.concurrent.Future 
import scala.concurrent.ExecutionContext.Implicits.global 
object FutureTraverse extends App{ 
    def log(s: String) = println(s"${Thread.currentThread.getName}: $s") 

    def withDelay(i: Int) = Future{ 
    log(s"withDelay($i)") 
    Thread.sleep(1000) 
    i 
    } 

    val seq = 0 to 10 

    Future { 
    for(i <- 0 to 5){ 
     log(".") 
     Thread.sleep(1000) 
    } 
    } 

    val resultSeq = Future.traverse(seq)(withDelay(_)) 

    Thread.sleep(6000) 
} 

Имеет такой вывод:

ForkJoinPool-1-worker-5: . 
ForkJoinPool-1-worker-3: withDelay(0) 
ForkJoinPool-1-worker-1: withDelay(1) 
ForkJoinPool-1-worker-7: withDelay(2) 
ForkJoinPool-1-worker-5: . 
ForkJoinPool-1-worker-3: withDelay(3) 
ForkJoinPool-1-worker-1: withDelay(4) 
ForkJoinPool-1-worker-7: withDelay(5) 
ForkJoinPool-1-worker-5: . 
ForkJoinPool-1-worker-3: withDelay(6) 
ForkJoinPool-1-worker-1: withDelay(7) 
ForkJoinPool-1-worker-7: withDelay(8) 
ForkJoinPool-1-worker-5: . 
ForkJoinPool-1-worker-3: withDelay(9) 
ForkJoinPool-1-worker-1: withDelay(10) 
ForkJoinPool-1-worker-5: . 
ForkJoinPool-1-worker-5: . 

2) Просто измените функцию withDelay:

def withDelay(i: Int) = { 
    Thread.sleep(1000) 
    Future { 
     log(s"withDelay($i)") 
     i 
    } 
    } 

и вы получите последовательный вывод:

ForkJoinPool-1-worker-7: . 
ForkJoinPool-1-worker-7: . 
ForkJoinPool-1-worker-5: withDelay(0) 
ForkJoinPool-1-worker-7: . 
ForkJoinPool-1-worker-1: withDelay(1) 
ForkJoinPool-1-worker-7: . 
ForkJoinPool-1-worker-1: withDelay(2) 
ForkJoinPool-1-worker-7: . 
ForkJoinPool-1-worker-1: withDelay(3) 
ForkJoinPool-1-worker-7: . 
ForkJoinPool-1-worker-1: withDelay(4) 
ForkJoinPool-1-worker-7: withDelay(5) 
ForkJoinPool-1-worker-1: withDelay(6) 
ForkJoinPool-1-worker-1: withDelay(7) 
ForkJoinPool-1-worker-7: withDelay(8) 
ForkJoinPool-1-worker-7: withDelay(9) 
ForkJoinPool-1-worker-7: withDelay(10) 

Так Future.traverse не должны быть параллельны одна, она просто подчиняется задач, он может сделать это последовательно, целая параллельная вещь в вашей поданной функции.

+0

спасибо. Я понимаю ваше объяснение и пример, но не совсем понимаю, почему в примере 2) вы не просто меняете строки журнала() и Trhead.sleep(), а перемещаете Thread.sleep() из оболочки Future {}? –

+0

Будущее отправляет задачу для параллельного выполнения. Больше вещей внутри будущего блока - больше parrallel ваш код будет, потому что вещи за пределами будущего блока исполняются последовательно - это главное, что я хотел сказать, а Thread.sleep символизирует длительную операцию – nikiforo

0

@nikiforo прозрачный, спасибо. Что касается моей конкретной проблемы, то было так, что веб-драйвер selenium хочет, чтобы каждый экземпляр был создан в отдельном потоке, если я хочу, чтобы несколько браузеров работали одновременно. Поэтому мне нужно было использовать реализацию пользовательских темы:

class FireFoxThread(r:Runnable) extends Thread(r:Runnable){ 
    val driver = new FirefoxDriver 

    override def interrupt()={ 
    driver.quit 
    super.interrupt 
    } 
} 

А затем создать его экземпляр из ThreadFactory:

val executorService:ExecutorService = Executors.newFixedThreadPool(3, new ThreadFactory { 
     override def newThread(r: Runnable): Thread = new FireFoxThread(r) 
}) 

Таким образом, я был в состоянии обрабатывать мои URL-адреса в разных браузерах.

1

Future.traverse действительно работает последовательно. Он принимает каждый из элементов в TraversableOnce, который вы передаете в качестве аргумента (links в этом случае), и создает будущее, используя вашу функцию сопоставления. Однако это только создает это будущее, когда предыдущее будущее закончило выполнение, что обеспечивает соблюдение последовательного поведения, которое вы видели.

Вы можете увидеть это ясно на простом примере кода:

import scala.util.Random 
import scala.concurrent.Future 
import scala.concurrent.ExecutionContext.Implicits.global 

def sleep = Thread.sleep(100 + Random.nextInt(5000)) 

Future.traverse((1 to 100)){n => sleep; println(n); Future.successful(n)} 

Это печатает числа от 1 до 100 в серии, и никогда не испорченный. Если бы фьючерсы выполнялись параллельно, случайный сон обеспечивал бы, чтобы некоторые предметы были заполнены раньше, чем отправленные перед ними, но этого не происходит.

Глядя на источник Future.traverse, мы можем понять, почему это так:

def traverse(in: M[A])(fn: A => Future[B]) = 
    in.foldLeft(successful(cbf(in))) { (fr, a) => 
    val fb = fn(a) 
    for (r <- fr; b <- fb) yield (r += b) 
    }.map(_.result()) 

for (r <- fr; b <- fb) часть является для понимания, что звонит flatMap о будущем вы предоставляете. После того, как ваш ответ будет создан (fb), он будет добавлен в список результатов. Этого не происходит до тех пор, пока предыдущее будущее (fr) не завершится и может привести к его результатам.

Если вы хотите, чтобы представить набор фьючерсов параллельно, вы можете использовать Future.sequential:

val retriver = new RatingRetriever(executionContext) 
Future.sequence(links.map(link => retriver.resolveFilmToRating(link)) 

В этом случае, вы создаете фьючерсы в links.map вызова и поэтому все они начинают немедленно выполняться. Future.sequence делает относительно простую работу по преобразованию списка фьючерсов в список их результатов.

+0

Я не могу видеть 'Future.sequential 'в 2.11.9 или 2.12.3 исходный код объекта-компаньона' Future' ... Не могли бы вы уточнить? Я вижу «Future.sequence». –

+0

спасибо @AlexanderArendar! Я отредактировал ответ, чтобы указать Future.sequence. – mcobrien

Смежные вопросы