Программирование распределенных систем затруднено. Трудно понять. Трудно реализовать правильно.
Исходный код для riak_core может быть очень трудно понять вначале. Вот некоторые ресурсы, которые помогли мне лучше понять riak_core:
- Where to Start with Riak Core (в частности, Try Try Try Райан Zezeski)
- Любой из riak_core проектов в project-fifo. howl, вероятно, самый маленький проект, построенный на вершине riak_core, что довольно легко понять.
- Поймите, что в основе riak_core является последовательным алгоритм хеширования, что позволяет распространять данные и работать по всему кольцу с помощью перегородок единообразно: Why Riak Just Works
- Некоторое время назад я написал erlang-ryng, который является общим последовательным хэш обработчик алгоритма для колец. Это может быть полезно для понимания цели последовательного хэширования в контексте кольца.
- Понимание того, как работы riak_pipe также помогло мне лучше понять, как можно распределять работу единообразно.
В отношении к «Это трудно правильно реализовать», вы можете прочитать Jepsen posts by aphyr примеры и случаи, когда основные базы данных и распределенные системы хранения имеют или ранее имели проблемы в своих реализациях.
Тем не менее, здесь очень упрощенно реализация кольца в Erlang, однако она все еще имеет много отверстий, которые рассматриваются ниже:
-module(node_ring).
-behaviour(gen_server).
% Public API
-export([start_link/0]).
-export([erase/1]).
-export([find/1]).
-export([store/2]).
% Ring API
-export([join/1]).
-export([nodes/0]).
-export([read/1]).
-export([write/1]).
-export([write/2]).
% gen_server
-export([init/1]).
-export([handle_call/3]).
-export([handle_cast/2]).
-export([handle_info/2]).
-export([terminate/2]).
-export([code_change/3]).
-record(state, {
node = node() :: node(),
ring = ordsets:new() :: ordsets:ordset(node()),
data = dict:new() :: dict:dict(term(), term())
}).
% Public API
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
erase(Key) ->
write({erase, Key}).
find(Key) ->
read({find, Key}).
store(Key, Value) ->
write({store, Key, Value}).
% Ring API
join(Node) ->
gen_server:call(?MODULE, {join, Node}).
nodes() ->
gen_server:call(?MODULE, nodes).
read(Request) ->
gen_server:call(?MODULE, {read, Request}).
write(Request) ->
gen_server:call(?MODULE, {write, Request}).
write(Node, Request) ->
gen_server:call(?MODULE, {write, Node, Request}).
% gen_server
init([]) ->
State = #state{},
{ok, State}.
handle_call({join, Node}, _From, State=#state{node=Node}) ->
{reply, ok, State};
handle_call({join, Peer}, From, State=#state{node=Node, ring=Ring}) ->
case net_adm:ping(Peer) of
pong ->
case ordsets:is_element(Peer, Ring) of
true ->
{reply, ok, State};
false ->
monitor_node(Peer, true),
NewRing = ordsets:add_element(Peer, Ring),
spawn(fun() ->
rpc:multicall(Ring, ?MODULE, join, [Peer])
end),
spawn(fun() ->
Reply = rpc:call(Peer, ?MODULE, join, [Node]),
gen_server:reply(From, Reply)
end),
{noreply, State#state{ring=NewRing}}
end;
pang ->
{reply, {error, connection_failed}, State}
end;
handle_call(nodes, _From, State=#state{node=Node, ring=Ring}) ->
{reply, ordsets:add_element(Node, Ring), State};
handle_call({read, Request}, From, State) ->
handle_read(Request, From, State);
handle_call({write, Request}, From, State=#state{node=Node, ring=Ring}) ->
spawn(fun() ->
rpc:multicall(Ring, ?MODULE, write, [Node, Request])
end),
handle_write(Request, From, State);
handle_call({write, Node, _Request}, _From, State=#state{node=Node}) ->
{reply, ok, State};
handle_call({write, _Peer, Request}, From, State) ->
handle_write(Request, From, State);
handle_call(_Request, _From, State) ->
{reply, ignore, State}.
handle_cast(_Request, State) ->
{noreply, State}.
handle_info({nodedown, Peer}, State=#state{ring=Ring}) ->
NewRing = ordsets:del_element(Peer, Ring),
{noreply, State#state{ring=NewRing}};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% @private
handle_read({find, Key}, _From, State=#state{data=Data}) ->
{reply, dict:find(Key, Data), State}.
%% @private
handle_write({erase, Key}, _From, State=#state{data=Data}) ->
{reply, ok, State#state{data=dict:erase(Key, Data)}};
handle_write({store, Key, Value}, _From, State=#state{data=Data}) ->
{reply, ok, State#state{data=dict:store(Key, Value, Data)}}.
Если мы начнем 3 различных узлов с -sname
набором для node0
, node1
и node2
:
erl -sname node0 -setcookie cook -run node_ring start_link
erl -sname node1 -setcookie cook -run node_ring start_link
erl -sname node2 -setcookie cook -run node_ring start_link
Вот как мы присоединяемся узел к кольцу:
([email protected])1> node_ring:nodes().
['[email protected]']
([email protected])2> node_ring:join('[email protected]').
ok
([email protected])3> node_ring:nodes().
['[email protected]', '[email protected]']
Если мы запустим node_ring:nodes()
на node1
мы получаем:
([email protected])1> node_ring:nodes().
['[email protected]', '[email protected]']
Теперь давайте перейдем к node2
и присоединиться к одной из двух других узлов:
([email protected])1> node_ring:nodes().
['[email protected]']
([email protected])2> node_ring:join('node0localhost').
ok
([email protected])3> node_ring:nodes().
['[email protected]', '[email protected]',
'[email protected]']
Обратите внимание, как и node0
и node1
были добавлены node2
, хотя мы только указали node0
на соединение. Это означает, что если у нас было сотни узлов, нам нужно было бы присоединиться к одному из них, чтобы присоединиться ко всему кольцу.
Теперь мы можем использовать store(Key, Value)
на любом из узлов и будет повторен к двум другим:
([email protected])4> node_ring:store(mykey, myvalue).
ok
Давайте попробуем чтение mykey
от двух других, первый node1
:
([email protected])2> node_ring:find(mykey).
{ok,myvalue}
Затем node2
:
([email protected])4> node_ring:find(mykey).
{ok,myvalue}
Давайте использовать erase(Key)
на node2
и попробуйте еще раз прочитать ключ на других узлах:
([email protected])5> node_ring:erase(mykey).
ok
На node0
:
([email protected])5> node_ring:find(mykey).
error
На node1
:
([email protected])3> node_ring:find(mykey).
error
Отлично! У нас есть распределенное децентрализованное кольцо, которое может действовать как простой магазин ключей/ценностей! Это было легко, не сложно! До тех пор, пока у нас нет ни одного узла вниз, потери пакетов, сетевые разделы, узлы, добавленные в кольцо, или какая-либо другая форма хаоса, у нас есть почти идеальное решение здесь. В действительности, однако, вы должны учитывать все эти вещи, чтобы иметь систему, которая в конечном итоге не сведёт вас с ума.
Вот краткий пример того, что наш маленький node_ring
не может справиться:
node1
идет вниз
node0
хранит ключ a
и значение 1
node1
возвращается и присоединяется кольцо
node1
пытается найти ключ a
Во-первых, давайте убьем node1
. Если мы проверяем узлы на node0
:
([email protected])6> node_ring:nodes().
['[email protected]','[email protected]']
И node2
:
([email protected])6> node_ring:nodes().
['[email protected]','[email protected]']
Мы видим, что node1
был автоматически удален из кольца. Давайте хранить что-то на node0
:
([email protected])7> node_ring:store(a, 1).
ok
И читать его из node2
:
([email protected])7> node_ring:find(a).
{ok,1}
Давайте начнем вверх node1
снова и присоединиться кольцо:
([email protected])1> node_ring:join('[email protected]').
ok
([email protected])2> node_ring:nodes().
['[email protected]','[email protected]',
'[email protected]']
([email protected])3> node_ring:find(a).
error
Упс, мы имеем противоречивые данные по кольцо. Дальнейшее изучение других распределенных систем и CAP theorem необходимо, прежде чем мы сможем решить, как мы хотим, чтобы наш маленький node_ring
вел себя в этих разных ситуациях (например, хотим ли мы вести себя как AP или система CP).
Спасибо. Вы мне очень помогли, в основном, как начать. Этот отличный пост вдохновил меня. Я реализую подобное распределенное кольцо. На самом деле, я реализую последовательное хеширование, и я пробую протокол сплетни. – user4854508