2015-11-28 2 views
2

Что может быть самой элегантной реализацией (циклического) барьера для Elixir? Алгоритм, который будет реализован (раскраска вершин), имеет цикл с фазой ожидания для порожденных процессов («выполнить ... синхронно параллельно», а затем проверить условие завершения с использованием результатов всех процессов), это алгоритм 5 «6-цветный «из принципов распределенных вычислений», гл. 1.Синхронизационный барьер в Эликсире?

Большинство ссылок предназначено для .NET, pthreads и других связанных с потоком вычислений, поэтому я не уверен, что барьер является правильным шаблоном, которым я пользуюсь. Может быть, есть больше «эликсиризма».

У меня нет никакого кода пока (в поисках шаблона), но вот код, реализации «медленных» версии одной и той же проблемы: https://codereview.stackexchange.com/questions/111487/coloring-trees-in-elixir

Идея я должен иметь процесс верхнего уровня (тот, который порождает один процесс на узел графа) для отправки и получения сообщений, которые будут синхронизировать процессы узла. Следует отметить, что процессы узла также взаимодействуют друг с другом: родители отправляют сообщения детям во время одной итерации цикла верхнего уровня. Однако усложнение состоит в том, что ни один процесс не должен продолжаться после сообщения верхнего уровня полученного узла и до того, как все узлы выполнили свою итерацию (скорее всего, я буду использовать рекурсию хвоста). Вот почему я подумал о барьерном механизме.

ответ

3

Я не уверен, что это именно то, что вы ищете, но вот циклический барьер, основанный на java.util.concurrent.CyclicBarrier class in java и Concurrent::CyclicBarrier class in ruby.

defmodule CyclicBarrier do 

    require Record 
    Record.defrecordp :barrier, CyclicBarrier, 
    pid: nil 

    def start(parties, action \\ nil) 
     when (is_integer(parties) and parties > 0) 
     and (action === nil or is_function(action, 0)), 
    do: barrier(pid: spawn(CyclicBarrier.Server, :init, [parties, action])) 

    def stop(barrier(pid: pid)) do 
    call(pid, :stop) 
    true 
    end 

    def alive?(barrier(pid: pid)) do 
    Process.alive?(pid) 
    end 

    def broken?(barrier(pid: pid)) do 
    case call(pid, :status) do 
     :waiting -> 
     false 
     _ -> 
     true 
    end 
    end 

    def number_waiting(barrier(pid: pid)) do 
    case call(pid, :number_waiting) do 
     n when is_integer(n) -> 
     n 
     _ -> 
     false 
    end 
    end 

    def parties(barrier(pid: pid)) do 
    case call(pid, :parties) do 
     n when is_integer(n) -> 
     n 
     _ -> 
     false 
    end 
    end 

    def reset(barrier(pid: pid)) do 
    case call(pid, :reset) do 
     :reset -> 
     true 
     :broken -> 
     true 
     _ -> 
     false 
    end 
    end 

    def wait(barrier = barrier()), 
    do: wait(nil, barrier) 

    def wait(timeout, barrier = barrier(pid: pid)) when timeout === nil or is_integer(timeout) do 
    case call(pid, :wait, timeout) do 
     :fulfilled -> 
     true 
     :broken -> 
     false 
     :timeout -> 
     reset(barrier) 
     false 
     _ -> 
     false 
    end 
    end 

    defp call(pid, request, timeout \\ nil) do 
    case Process.alive?(pid) do 
     false -> 
     {:EXIT, pid, :normal} 
     true -> 
     trap_exit = Process.flag(:trap_exit, true) 
     Process.link(pid) 
     ref = make_ref() 
     send(pid, {ref, self(), request}) 
     case timeout do 
      nil -> 
      receive do 
       {^ref, reply} -> 
       Process.unlink(pid) 
       Process.flag(:trap_exit, trap_exit) 
       reply 
       exited = {:EXIT, ^pid, _} -> 
       Process.flag(:trap_exit, trap_exit) 
       exited 
      end 
      _ -> 
      receive do 
       {^ref, reply} -> 
       Process.unlink(pid) 
       Process.flag(:trap_exit, trap_exit) 
       reply 
       exited = {:EXIT, ^pid, _} -> 
       Process.flag(:trap_exit, trap_exit) 
       exited 
      after 
       timeout -> 
       Process.unlink(pid) 
       Process.flag(:trap_exit, trap_exit) 
       :timeout 
      end 
     end 

    end 
    end 

    defmodule Server do 

    require Record 
    Record.defrecordp :state_data, 
     waiting: 0, 
     parties: nil, 
     action: nil, 
     q:  :queue.new() 

    def init(parties, action), 
     do: loop(:waiting, state_data(parties: parties, action: action)) 

    defp loop(:waiting, sd = state_data(waiting: same, parties: same, action: action, q: q)), 
     do: loop(done(:fulfilled, action, q), state_data(sd, waiting: 0, q: :queue.new())) 
    defp loop(state_name, sd) do 
     receive do 
     {ref, pid, request} when is_reference(ref) and is_pid(pid) and is_atom(request) -> 
      handle(state_name, request, {ref, pid}, sd) 
     end 
    end 

    defp handle(:waiting, :wait, from, sd = state_data(waiting: w, q: q)), 
     do: loop(:waiting, state_data(sd, waiting: w + 1, q: :queue.in(from, q))) 
    defp handle(:waiting, :reset, from, sd = state_data(waiting: 0, q: q)), 
     do: loop(done(:reset, nil, :queue.in(from, q)), sd) 
    defp handle(:waiting, :reset, from, sd = state_data(q: q)), 
     do: loop(done(:broken, nil, :queue.in(from, q), false), state_data(sd, waiting: 0, q: :queue.new())) 
    defp handle(:broken, :reset, from, sd = state_data(q: q)), 
     do: loop(done(:reset, nil, :queue.in(from, q)), sd) 
    defp handle(:broken, :wait, from, sd) do 
     cast(from, :broken) 
     loop(:broken, sd) 
    end 
    defp handle(state_name, :number_waiting, from, sd = state_data(waiting: number_waiting)) do 
     cast(from, number_waiting) 
     loop(state_name, sd) 
    end 
    defp handle(state_name, :parties, from, sd = state_data(parties: parties)) do 
     cast(from, parties) 
     loop(state_name, sd) 
    end 
    defp handle(state_name, :status, from, sd) do 
     cast(from, state_name) 
     loop(state_name, sd) 
    end 
    defp handle(_state_name, :stop, _from, _sd) do 
     exit(:normal) 
    end 

    defp broadcast(q, message), 
     do: for from <- :queue.to_list(q), 
     do: cast(from, message) 

    defp cast({ref, pid}, message), 
     do: send(pid, {ref, message}) 

    defp done(state, action, q, continue \\ true) do 
     run(action) 
     broadcast(q, state) 
     case continue do 
     true -> 
      :waiting 
     false -> 
      state 
     end 
    end 

    defp run(nil), 
     do: nil 
    defp run(action), 
     do: action.() 

    end 

end 

Ниже приведен пример с использованием CyclicBarrier в оболочке IEX для эликсира:

iex> barrier = CyclicBarrier.start(5, fn -> IO.puts("done") end) 
{CyclicBarrier, #PID<0.281.0>} 
iex> for i <- 1..5, do: spawn(fn -> IO.puts("process #{i}: #{barrier.wait}") end) 
done 
process 5: true 
process 1: true 
process 3: true 
process 2: true 
process 4: true 
[#PID<0.283.0>, #PID<0.284.0>, #PID<0.285.0>, #PID<0.286.0>, #PID<0.287.0>] 

Точный порядок выполнения процесса не является детерминированным.

Другие примеры функций на CyclicBarrier ниже:

iex> barrier = CyclicBarrier.start(2) 
{CyclicBarrier, #PID<0.280.0>} 
iex> barrier.alive? 
true 
iex> barrier.broken? 
false 
iex> barrier.number_waiting 
0 
iex> barrier.parties 
2 
iex> # let's spawn another process which will wait on the barrier 
iex> spawn(fn -> IO.puts("barrier returned: #{barrier.wait}") end) 
#PID<0.288.0> 
iex> barrier.number_waiting 
1 
iex> # if we reset the barrier while another process is waiting 
iex> # on the barrier, it will break 
iex> barrier.reset 
barrier returned: false 
true 
iex> barrier.broken? 
true 
iex> # however, the barrier can be reset again to its initial state 
iex> barrier.reset 
true 
iex> barrier.broken? 
false 
iex> # if a timeout is exceeded while waiting for a barrier, it 
iex> # will also break the barrier 
iex> barrier.wait(100) 
false 
iex> barrier.broken? 
true 
iex> # let's reset the barrier, spawn another process to wait, 
iex> # and wait with a timeout in the current process 
iex> barrier.reset 
true 
iex> spawn(fn -> IO.puts("barrier returned: #{barrier.wait}") end) 
#PID<0.289.0> 
iex> barrier.wait(100) 
barrier returned: true 
true 
iex> # if stop is called on the barrier, the barrier process will 
iex> # exit and all future calls to the barrier will return false 
iex> barrier.stop 
true 
iex> barrier.alive? 
false 
iex> barrier.reset 
false 
iex> barrier.wait 
false 
+0

Спасибо! Я принял этот ответ. Тем временем я преобразовал код Erlang (http://www.cse.chalmers.se/edu/year/2014/course/TDA382_LP3/lecture8.html) в Elixir, который проще понять, но не циклический. Невозможно, возможно, разместить его здесь в качестве другого ответа, потому что лицензия не ясна. –