2015-09-15 4 views
11

Я хочу сделать параллельную карту по большому списку. Код выглядит примерно так:Task.async в Elixir Stream

big_list 
|> Stream.map(&Task.async(Module, :do_something, [&1])) 
|> Stream.map(&Task.await(&1)) 
|> Enum.filter filter_fun 

Но я проверял реализацию потока и, насколько я понимаю, Stream.map сочетает в себе функцию и применяет комбинированную функцию к элементам в потоке, который будет означать, что последовательность выглядит так:

  1. Возьмите первый элемент
  2. Создать ASync задачу
  3. Дождитесь ее завершения
  4. Возьмите вторую elelemnt ...

В этом случае он не делает это параллельно. Я прав, или я чего-то не хватает?

Если я прав, как насчет этого кода?

Stream.map Task.async ... 
|> Enum.map Task.await ... 

Это будет работать параллельно?

+2

читать это - http://www.theerlangelist.com/2015/07/beyond-taskasync.html – emaillenin

ответ

9

Второй также не делает то, что вы хотите. Вы можете ясно видеть это с помощью этого кода:

defmodule Test do 
    def test do 
    [1,2,3] 
    |> Stream.map(&Task.async(Test, :job, [&1])) 
    |> Enum.map(&Task.await(&1)) 
    end 

    def job(number) do 
    :timer.sleep 1000 
    IO.inspect(number) 
    end 
end 

Test.test 

Вы увидите номер, затем 1 секунду ожидания, другое число и так далее. Ключевым моментом здесь является то, что вы хотите как можно быстрее создать задачи, так что вы не должны использовать ленивый Stream.map. Вместо того, чтобы использовать жадный Enum.map в этой точке:

|> Enum.map(&Task.async(Test, :job, [&1])) 
|> Enum.map(&Task.await(&1)) 

С другой стороны, вы можете использовать Stream.map при ожидании, пока вы делаете некоторые нетерпеливый операцию позже, как ваш filter. Таким образом, ожидания будут чередоваться с любой обработкой, которую вы можете делать по результатам.

4

Elixir 1.4 предоставляет новую функцию Task.async_stream/5, которая будет возвращать поток, который выполняет заданную функцию одновременно по каждому элементу в перечислимом.

Есть также варианты указания максимального числа рабочих и тайм-аута, используя параметры параметров и :timeout.


Это сделает ваш пример работать одновременно:

big_list 
|> Task.async_stream(Module, :do_something, [&1]) 
|> Enum.filter(filter_fun) 
0

Вы можете попробовать Parallel Stream.

stream = 1..10 |> ParallelStream.map(fn i -> i * 2 end) 
stream |> Enum.into([]) 
[2,4,6,8,10,12,14,16,18,20] 

UPD Или лучше использовать Flow

Смежные вопросы