2013-03-11 3 views
1

Я использую sidekiq для работы с async-заданиями, и после некоторой сложности добавлено, у меня возникают трудности с пониманием состояния заданий.Изменение статуса объекта после завершения набора асинхронных заданий с помощью sidekiq

Вот сделка:

У меня есть модель Batch, которая вызывает метод асинхронного после его поручен:

# app/models/batch.rb 
class Batch < ActiveRecord::Base 

    after_commit :calculate, on: :create 

    def calculate 
    job_id = BatchWorker.perform_async(self.id) 

    # update_column skips callbacks and validations! 
    self.update_column(:job_id, job_id) 
    end 
end 

работника считывает данные из модели и называет работу асинхронной для каждого типа данных, а следующим образом:

# app/workers/batch_worker.rb 
class BatchWorker 

    def perform(batch_id) 
    batch = Batch.find(batch_id) 

    ## read data to 'tab'  

    tab.each do |ts| 
     obj = batch.item.create(name: ts[0], data: ts[1]) 
     job_id = ItemWorker.perform_async(obj.id) 
     obj.update_attribute(:job_id, job_id) 
    end 
    end 

end 

проблема заключается в: Эти асинхронных заданиях выполнять вычисления, и я не могу позволить ссылке результатов загрузки доступна до его завершения, так что я п чтобы узнать, когда все «детские задания» выполнены, поэтому я могу изменить атрибут status из модели Batch. Другими словами, мне не нужно знать, были ли все задания поставлены в очередь, но вместо этого, если все асинхронные задания, созданные ItemWorker, были выполнены и теперь завершены.

  • Какой был бы лучший способ достичь этого? Это имеет смысл в «мире параллельных вычислений»?

Обряд: Я не уверен в сохранении job_id в db, поскольку он кажется изменчивым.

ответ

4

Возможно, с помощью Redis для этого может быть хорошо подходит, видя, как у вас уже есть в вашей инфраструктуре и настроить в приложении Rails (из-за Sidekiq)

Redis имеет встроенные публикации/подписки двигатель, а также как атомные операции с ключами, что делает его подходящим для управления типом параллелизма, который вы ищете.

Может быть что-то примерно так:

class BatchWorker 

    def perform(batch_id) 
    batch = Batch.find(batch_id) 

    redis = Redis.new 
    redis.set "jobs_remaining_#{batch_id}", tab.count 
    redis.subscribe("batch_task_complete.#{batch_id}") do |on| 
     on.message do |event, data| 
     if redis.decr("jobs_remaining_#{batch_id}") < 1 
      #UPDATE STATUS HERE 
      redis.del "jobs_remaining_#{batch_id}" 
     end 
     end 
    end 

    tab.each do |ts| 
     obj = batch.item.create(name: ts[0], data: ts[1]) 
     job_id = ItemWorker.perform_async(obj.id, batch_id) 
    end 
    end 
end 

class ItemWorker 
    def perform item_id, batch_id=nil 
    #DO STUFF 
    if batch_id 
     Redis.new.publish "batch_task_complete.#{batch_id}" 
    end 
    end 
end 
+0

Да! Это здорово, это действительно может решить мою проблему. Однако кажется, что как только переводчик достигнет блока подписки, он никогда не выйдет и продолжит прослушивать сообщения. Таким образом, блок tab.each, который выполняет ItemWorker, никогда не достигается, и задачи никогда не выполняются. Как я могу это сделать? –

+0

Переместите создание ItemWorkers внутри блока подписки, после того как on.message сделает. Когда все задания выполняются, используйте throw/catch для выхода из блока. –

+0

Я фактически перевел 'redis.decr' в ItemWorker, поэтому каждый раз, когда задания завершены, он уменьшает счетчик и проверяет, является ли это' <1'. Если это так, ItemWorker сам изменяет родительский пакетный статус, избегая использования блока подписки. Благодаря! ;) –