2014-12-04 2 views
0

Я изучаю Erlang с рубинового фона и с некоторым затруднением захватываю мыслительный процесс. Проблема, которую я пытаюсь решить, заключается в следующем: мне нужно сделать один и тот же запрос на api, каждый раз, когда я получаю уникальный идентификатор в ответе, который мне нужно передать в следующий запрос, пока не будет возвращен ID. Из каждого ответа мне нужно извлечь определенные данные и использовать их для других вещей.erlang, извлекать значения из списка кортежей непрерывно

Сначала получите итератор:

ShardIteratorResponse = kinetic:get_shard_iterator(GetShardIteratorPayload). 
{ok,[{<<"ShardIterator">>, 
     <<"AAAAAAAAAAGU+v0fDvpmu/02z5Q5OJZhPo/tU7fjftFF/H9M7J9niRJB8MIZiB9E1ntZGL90dIj3TW6MUWMUX67NEj4GO89D"...>>}]} 

PARSE вне shard_iterator ..

{_, [{_, ShardIterator}]} = ShardIteratorResponse. 

Сделать запрос KINESIS для потоков записей ...

GetRecordsPayload = [{<<"ShardIterator">>, <<ShardIterator/binary>>}]. 
[{<<"ShardIterator">>, 
    <<"AAAAAAAAAAGU+v0fDvpmu/02z5Q5OJZhPo/tU7fjftFF/H9M7J9niRJB8MIZiB9E1ntZGL90dIj3TW6MUWMUX67NEj4GO89DETABlwVV"...>>}] 
14> RecordsResponse = kinetic:get_records(GetRecordsPayload). 
{ok,[{<<"NextShardIterator">>, 
     <<"AAAAAAAAAAFy3dnTJYkWr3gq0CGo3hkj1t47ccUS10f5nADQXWkBZaJvVgTMcY+nZ9p4AZCdUYVmr3dmygWjcMdugHLQEg6x"...>>}, 
    {<<"Records">>, 
     [{[{<<"Data">>,<<"Zmlyc3QgcmVjb3JkISEh">>}, 
     {<<"PartitionKey">>,<<"BlanePartitionKey">>}, 
     {<<"SequenceNumber">>, 
      <<"49545722516689138064543799042897648239478878787235479554">>}]}]}]} 

Я я изо всех сил пытаюсь написать цикл, который продолжает удалять конечную точку кинезита для этого потока, пока не будет e shard iterators ... иначе я хочу все записи. Поскольку я не могу перечислить переменные, как в рубине ... Любое руководство поможет оценить!

ответ

1

ВНИМАНИЕ: Мое кодирование может быть искажено, но оно «близко» ». Я никогда не запускал его и не видел, как выглядит последний итератор.

Я вижу, вы пытаетесь полностью выполнить свою работу в оболочке. Это возможно, но трудно. Вы можете использовать функцию с именем и рекурсию (since release 17.0 it's easier), например:

F = fun (ShardIteratorPayload) -> 
    {_, [{_, ShardIterator}]} = kinetic:get_shard_iterator(ShardIteratorPayload), 
    FunLoop = 
     fun Loop(<<>>, Accumulator) -> % no clue how last iterator can look like 
       lists:reverse(Accumulator); 
      Loop(ShardIterator, Accumulator) -> 
       {ok, [{_, NextShardIterator}, {<<"Records">>, Records}]} = 
        kinetic:get_records([{<<"ShardIterator">>, <<ShardIterator/binary>>}]), 
       Loop(NextShardIterator, [Records | Accumulator]) 
     end, 
    FunLoop(ShardIterator, []) 
end. 
AllRecords = F(GetShardIteratorPayload). 

Но это слишком сложно набрать в раковину ...

Это гораздо проще кодировать его в модулях. Общим образцом в erlang является создание другого процесса или процессов для извлечения ваших данных. Чтобы это было просто, вы можете запустить другой процесс, позвонив по телефону spawn or spawn_link, но не связывайтесь со ссылками и используйте только spawn/3. Скомпилируема простого модуля потребительского:

-module(kinetic_simple_consumer). 

-export([start/1]). 

start(GetShardIteratorPayload) -> 
    Pid = spawn(kinetic_simple_fetcher, start, [self(), GetShardIteratorPayload]), 
    consumer_loop(Pid). 

consumer_loop(FetcherPid) -> 
    receive 
     {FetcherPid, finished} -> 
      ok; 
     {FetcherPid, {records, Records}} -> 
      consume(Records), 
      consumer_loop(FetcherPid); 
     UnexpectedMsg -> 
      io:format("DROPPING:~n~p~n", [UnexpectedMsg]), 
      consumer_loop(FetcherPid) 
    end. 

consume(Records) -> 
    io:format("RECEIVED:~n~p~n",[Records]). 

И Fetcher:

-module(kinetic_simple_fetcher). 

-export([start/2]). 

start(ConsumerPid, GetShardIteratorPayload) -> 
    {ok, [ShardIterator]} = kinetic:get_shard_iterator(GetShardIteratorPayload), 
    fetcher_loop(ConsumerPid, ShardIterator). 

fetcher_loop(ConsumerPid, {_, <<>>}) -> % no clue how last iterator can look like 
    ConsumerPid ! {self(), finished}; 

fetcher_loop(ConsumerPid, ShardIterator) -> 
    {ok, [NextShardIterator, {<<"Records">>, Records}]} = 
     kinetic:get_records(shard_iterator(ShardIterator)), 
    ConsumerPid ! {self(), {records, Records}}, 
    fetcher_loop(ConsumerPid, NextShardIterator). 

shard_iterator({_, ShardIterator}) -> 
    [{<<"ShardIterator">>, <<ShardIterator/binary>>}]. 

Как вы можете видеть оба процесс может выполнять свою работу одновременно. Попробуйте из вашей оболочки:

kinetic_simple_consumer:start(GetShardIteratorPayload). 

Теперь ваши видят, что ваш процесс оболочки обращается к потребителю, и вы будете иметь свою раковину назад после того, как Сборщике пошлет {ItsPid, finished}.

В следующий раз вместо

kinetic_simple_consumer:start(GetShardIteratorPayload). 

пробег:

spawn(kinetic_simple_consumer, start, [GetShardIteratorPayload]). 

Вы должны играть с нерестовых процессов - это Erlang главная сила.

0

В Erlang вы можете написать цикл, используя хвостовые рекурсивные функции. Я не знаю кинетический API, поэтому для простоты я просто предполагаю, что kinetic:next_iterator/1 возвращает {ok, NextIterator} или {error, Reason}, когда больше нет осколков.

loop({error, Reason}) -> 
    ok; 
loop({ok, Iterator}) -> 
    do_something_with(Iterator), 
    Result = kinetic:next_iterator(Iterator), 
    loop(Result). 

Вы заменяете цикл итерацией. Первое предложение касается случая, когда больше нет осколков (всегда начинайте рекурсию с конечным условием). Второе предложение касается случая, когда у нас есть итератор, мы что-то делаем с ним и звоним дальше.

Рекурсивный вызов является последней инструкцией в теле функции, которая называется хвостом рекурсии. Erlang оптимизирует такие вызовы - они не используют стек вызовов, поэтому они могут работать бесконечно в постоянной памяти (вы не получите ничего подобного «Уровень стека слишком глубокий»)

Смежные вопросы