2012-11-12 2 views
0

Мне нужно обрабатывать задания из очереди в процессе, при этом IO выполняется асинхронно. Это довольно просто. Получено, что эти задания могут добавлять дополнительные предметы в очередь.Создание асинхронной очереди в Ruby

Я думаю, что я слишком долго ловил эту проблему, поэтому мой мозг облачно - это не должно быть слишком сложно. Я продолжаю придумывать либо-либо сценарий:

  1. Очередь может выполняться асинхронно, и результаты могут быть объединены впоследствии.
  2. Очередь может выполнять синхронно выполнение заданий до тех пор, пока последняя не завершится, а очередь пуста.

Я возился со всем от EventMachine и Голиафа (оба из которых может использовать EM::HttpRequest) целлулоид (никогда не удосужился построить что-то с ним, хотя), и писать счетчики с использованием волокон. Мой мозг жарится.

То, что я хотел бы, просто, чтобы быть в состоянии сделать это:

items = [1,2,3] 
items.each do |item| 
    if item.has_particular_condition? 
    items << item.process_one_way 
    elsif item.other_condition? 
    items << item.process_another_way 
    # ... 
    end 
end 

#=> [1,2,3,4,5,6] 

... где 4, 5 и 6 были все результаты обработки исходных элементов в наборе, и 7, 8 и 9 являются результатом обработки 4, 5 и 6. Мне не нужно беспокоиться о бесконечной обработке очереди, потому что обработанные мной данные заканчиваются после нескольких итераций.

Руководства, комментарии, ссылки на другие библиотеки и т. Д. Приветствуются, а также примеры кода реализации более низкого уровня.

+0

Да, асинхронные системы могут быть сложными. У вас есть моральная поддержка :) –

+1

Мое первое предложение: спать на нем. Когда вы начинаете ударять головой о стену, и все начинает казаться невероятно сложным. Из опыта я знаю, что пришло время назвать это днем. На следующий день проблема обычно решается сама по себе удивительно легко.Я люблю решать проблемы во сне - это не требует «усилий», и на следующий день вы обычно чувствуете себя гением. – Casper

+0

@ Каспер Я думаю, я не указал, что «слишком долго» влечет за собой работу над этим несколько недель, кое-где. :/ – coreyward

ответ

0

Я закончил реализацию чего-то немного менее идеального - в основном просто обертывание Итератора EM Fiber в цикле, который заканчивается, когда новые результаты не поставлены в очередь.

require 'set' 

class SetRunner 
    def initialize(seed_queue) 
    @results = seed_queue.to_set 
    end 

    def run 
    begin 
     yield last_loop_results, result_bucket 
    end until new_loop_results.empty? 

    return @results 
    end 

    def last_loop_results 
    result_bucket.shift(result_bucket.count) 
    end 

    def result_bucket 
    @result_bucket ||= @results.to_a 
    end 

    def new_loop_results 
    # .add? returns nil if already in the set 
    result_bucket.each { |item| @results.add? item }.compact 
    end 
end 

Затем, чтобы использовать его с EventMachine:

queue = [1,2,3] 
results = SetRunner.new(queue).run do |set, output| 
    EM::Synchrony::FiberIterator.new(set, 3).each do |item| 
    output.push(item + 3) if item <= 6 
    end 
end 
# => [1,2,3,4,5,6,7,8,9] 

Затем каждый набор будет получить работать с уровнем параллелизма, переданного FiberIterator, но результаты из каждого набора будут получать работать в следующей итерации внешнего цикла SetRunner.

0

У меня были аналогичные требования в прошлом, и вам нужна прочная высокопроизводительная рабочая очередь из ее звуков. Я рекомендую вам проверить beanstalkd, который я обнаружил более года назад и с тех пор использовал для надежной обработки тысяч и тысяч рабочих мест в рубине.

В частности, я начал разработку твердых рубиновых библиотек вокруг beanstalkd. В частности, обязательно проверьте backburner, который является готовой рабочей очередью в рубине с использованием beanstalkd. Синтаксис и настройка легки, определяя, как быстро выполняется процесс работы, обрабатываются неудачи работы и повторные попытки, а также планирование работы и многое другое.

Дайте мне знать, если у вас есть вопросы, но я думаю, beanstalkd и backburner будут соответствовать вашим требованиям достаточно хорошо.

+0

Это может сработать, но я выполняю работу во время жизненного цикла HTTP-запроса, и мне нужно разобрать и вернуть результат клиенту, когда все задания будут завершены. Любые советы о том, как справиться с Backburner? – coreyward

Смежные вопросы