2016-06-02 2 views
3

Я смотрю, как писать сценарии типа «сокращение карты» непосредственно в erlang. В качестве примера игрушек, представьте, что я хочу решить, какой из нескольких файлов является самым большим. Эти файлы могут быть в любом месте в Интернете, поэтому получение каждого из них может занять некоторое время; поэтому я хотел бы собрать их параллельно. Как только у меня есть все, я могу сравнить их размеры.идиоматическая синхронизация процесса в Erlang

Мой принятый подход заключается в следующем: «» основной процесс

  • А до координировать работу и определять, который является самым большим;
  • «Рабочий» процесс для каждого файла, который извлекает файл и возвращает размер основного процесса.

Вот неуклюжим, но функционирующий пример (используя только локальные файлы, но это показывает намерение):

-module(cmp). 
-export([cmp/2]). 

cmp(Fname1, Fname2) -> 
    Pid1 = fsize(Fname1), 
    Pid2 = fsize(Fname2), 
    {Size1, Size2} = collect(Pid1, Pid2), 

    if 
     Size1 > Size2 -> 
      io:format("The first file is bigger~n"); 
     Size2 > Size1 -> 
      io:format("The second file is bigger~n"); 
     true -> 
      io:format("The files are the same size~n") 
    end. 

fsize(Fname) -> 
    Pid = spawn(?MODULE, fsize, [self(), Fname]), 
    Pid. 

fsize(Sender, Fname) -> 
    Size = filelib:file_size(Fname), 
    Sender ! {self(), Fname, Size}. 

collect(Pid1, Pid2) -> 
    receive 
     {Pida, Fnamea, Sizea} -> 
      io:format("Pid: ~p, Fname: ~p, Size: ~p~n", [Pida, Fnamea, Sizea]) 
    end, 
    receive 
     {Pidb, Fnameb, Sizeb} -> 
      io:format("Pid: ~p, Fname: ~p, Size: ~p~n", [Pidb, Fnameb, Sizeb]) 
    end, 
    if 
     Pida =:= Pid1 -> {Sizea, Sizeb}; 
     Pida =:= Pid2 -> {Sizeb, Sizea} 
    end. 

Конкретные вопросы

  1. ли подход идиоматических? т. е. отнимать каждую «длинную» задачу в отдельный процесс, а затем собирать результаты обратно в «мастер»?
  2. Есть ли библиотека для обработки механики синхронизации? В частности, функция collect в примере выше?

Спасибо.

- Примечание: Я знаю, что функция collect неудобна; его можно обобщить, например, сохранение pids в списке и цикл до тех пор, пока все не завершится.

ответ

3

По моему мнению, лучше всего извлечь уроки из примера, поэтому я посмотрел, как они это делают в otp/rpc, и на основе этого я реализовал немного более короткую/более простую версию parallel eval call.

call(M, F, ArgL, Timeout) -> 
    ReplyTo = self(), 
    Keys = [spawn(fun() -> ReplyTo ! {self(), promise_reply, M:F(A)} end) || A <- ArgL], 

    Yield = fun(Key) -> 
        receive 
         {Key, promise_reply, {error, _R} = E}   -> E; 
         {Key, promise_reply, {'EXIT', {error, _R} = E}} -> E; 
         {Key, promise_reply, {'EXIT', R}}    -> {error, R}; 
         {Key, promise_reply, R}       -> R 
        after Timeout          -> {error, timeout} 
        end 
      end, 
    [Yield(Key) || Key <- Keys]. 
+2

спасибо - намного более элегантная реализация, а также проверяет идиому «многих работников». – sfinnie

2

Я не MapReduce эксперт, но я имел некоторый опыт использования this 3rd party mapreduce module. Поэтому я постараюсь ответить на ваш вопрос, исходя из моих текущих знаний.

  1. Во-первых, ваш вход должен быть организован как пары ключей и значений, чтобы правильно использовать модель mapreduce. В общем, процесс master должен сначала запускать рабочие процессы (или узлы). Каждый работник получает карту и пару ключ и значение, давайте назовите его {K1,V1}. Затем он выполняет команду с ключом и значением и испускает новую пару ключ и значение{K2,V2}. Процесс мастер собирает результаты и ждет, пока все работники закончат работу. После того, как все работники сделаны, мастер начинает уменьшить часть на парах {K2,List[V2]}, которые были выбраны рабочими. Эта часть может выполняться параллельно или нет, она используется для объединения всех результатов в один вывод.Обратите внимание, что List[V2] состоит в том, что может быть больше одного значения, которое было выбрано рабочими для одного ключа K2.

От модуля 3 партии я уже упоминал выше:

%% Input = [{K1, V1}] 
%% Map(K1, V1, Emit) -> Emit a stream of {K2,V2} tuples 
%% Reduce(K2, List[V2], Emit) -> Emit a stream of {K2,V2} tuples 
%% Returns a Map[K2,List[V2]] 

Если мы посмотрим на lists функции эрлангов, то карты часть фактически равна для выполнения lists:map/2 и уменьшить часть в некотором роде аналогичные lists:foldl/3 или lists:foldr/3 и следующие комбинации: lists:mapfoldl/3, lists:mapfoldr/3.

  1. Если вы используете этот шаблон mapreduce, используя наборы ключей и значений, нет необходимости в специальной синхронизации, если это то, что вы имеете в виду. Вам просто нужно ждать, пока все работники закончат свою работу.

Предлагаю вам перейти на сторонний модуль, упомянутый выше. Взгляните также на this example. Как вы можете видеть, единственными вещами, которые вам нужно определить, являются функции Map и Reduce.

+1

спасибо - я не нашел модуль mapreduce. Не совсем то, что я ищу, но хорошая рекомендация. – sfinnie

+0

Вы всегда можете создать свой собственный модуль mapreduce. Например, вы можете изменить часть сокращения, чтобы быть последовательной, и просто использовать какой-то 'foldl'. Я использовал этот сторонний модуль для [общего алгоритма mapreduce друзей] (например, http://stevekrenzel.com/finding-friends-with-mapreduce). –

Смежные вопросы