2016-02-25 2 views
1

Мне нужно найти заданные и/или рабочие задания и/или неудачные задания для объекта модели, например, например, когда объект модели разрушен, мы хотим найти все и либо решить не удалять, либо уничтожать задания (условно).Как найти связанные задания Resque для объекта модели ActiveRecord?

Есть ли рекомендуемый способ сделать это, прежде чем изобретать велосипед?

Пример:

Если вы хотите создать before_destroy функцию обратного вызова, которая уничтожает все задания, когда объект будет уничтожен (в очереди и неудачные работы), и только уничтожить, если нет рабочих заданий

Некоторые псевдо код что я имею в виду, чтобы сделать для этого примера случая использования:

Отчет модели работы обработка

class Report < ActiveRecord::Base 
    before_destroy :check_if_working_jobs, :destroy_queued_and_failed_jobs 

    def check_if_working_jobs 
    # find all working jobs related to this report object 
    working_jobs = ProcessReportWorker.find_working_jobs_by_report_id(self.id) 
    return false unless working_jobs.empty? 
    end 

    def destroy_queued_and_failed_jobs 
    # find all jobs related to this report object 
    queued_jobs = ProcessReportWorker.find_queued_jobs_by_report_id(self.id) 
    failed_jobs = ProcessReportWorker.find_failed_jobs_by_report_id(self.id) 

    # destroy/remove all jobs found 
    (queued_jobs + failed_jobs).each do |job| 
     # destroy the job here ... commands? 
    end 

    end 
end 

Report er для рабочих мест resque/redis

class ProcessReportWorker 

    # find the jobs by report id which is one of the arguments for the job? 
    # envisioned as separate methods so they can be used independently as needed 

    def self.find_queued_jobs_by_report_id(id) 
    # parse all jobs in all queues to find based on the report id argument? 
    end 

    def self.find_working_jobs_by_report_id(id) 
    # parse all jobs in working queues to find based on the report id argument? 
    end 

    def self.find_failed_jobs_by_report_id(id) 
    # parse all jobs in failed queue to find based on the report id argument? 
    end 
end 

Этот подход подходит к тому, что должно произойти?

Каковы недостающие части выше, чтобы найти очереди или рабочие задания по идентификатору объекта модели, а затем уничтожить его?

Есть ли уже существующие методы для поиска и/или уничтожения связанного идентификатора объекта модели, который я пропустил в документации или моем поиске?

Обновление: Пересмотрено с использованием примера использования, чтобы использовать только рабочие_jobs, чтобы проверить, следует ли нам удалять или нет, или предлагая нам также попытаться удалить work_jobs. (потому что удаление рабочих заданий более сложное, чем просто удаление записей ключа redis)

ответ

1

Здесь было тихо, без ответов, поэтому мне удалось провести день, занявшись этим, следуя пути, который я указывал в своем вопросе. Там может быть лучшее решение или другие доступные методы, но это, похоже, до сих пор работает. Не стесняйтесь комментировать, есть ли лучшие варианты для используемых методов или если их можно улучшить.

Общий подход здесь - вам нужно выполнить поиск по всем заданиям (поставленным в очередь, работать, не удалось) и отфильтровать только задания для class и queue, которые актуальны и соответствуют идентификатору объекта, который вы ищете в правильном индексная позиция массива args. Например (после подтверждения соответствия class и queue), если позиция аргумента 0 является идентификатором объекта, то вы можете проверить, соответствует ли args[0] идентификатору объекта.

По сути, работа связана с ИД объекта, если: job_class == class.name && job_queue == @queue && job_args[OBJECT_ID_ARGS_INDEX].to_i == object_id

  • Queued Работа: Для того, чтобы найти все рабочие места в очереди, нужно собрать все REDIS записи с ключами имени queue:#{@queue} где @queue является имя очереди вашего рабочего класса. Измените соответственно , перейдя через несколько очередей, если вы используете несколько очередей для определенного рабочего класса.Resque.redis.lrange("queue:#{@queue}",0,-1)
  • Failed Работа: Чтобы найти все очереди заданий, нужно собрать все REDIS записи с ключами имени failed (если вы не используете несколько очередей отказа или какой-то другой, чем настройка по умолчанию). Resque.redis.lrange("failed",0,-1)
  • Рабочие вакансии: Чтобы найти все рабочие задания, которые можно использовать Resque.workers , который содержит массив всех рабочих и рабочих мест, на которых выполняется. Resque.workers.map(&:job)
  • Работа: Каждое задание в каждом из перечисленных выше списков будет закодированным хэшем. Вы можете декодировать задание в рубиновый хэш с помощью Resque.decode(job).
  • Класс и арг: Для очереди заданий, в class и args ключи job["class"] и job["args"]. Для не удалось и рабочие места это job["payload"]["class"] и job["payload"]["args"].
  • Очередь: Для каждого из не удалось и рабочих заданий найдено, очередь будет job["queue"]. Перед тестированием списка аргументов для идентификатора объекта вам нужны только задания, соответствующие class и queue. Ваши поставленные в очередь задания список уже будет ограничен вашей очередью.

Ниже приведены примеры рабочего класса и методы модели для поиска (и удаления) заданий, связанных с примером модельного объекта (отчета).

Report класс обработки работника для Resque/Redis поддержке рабочих мест

class ProcessReportWorker 
    # queue name 
    @queue = :report_processing 
    # tell the worker class where the report id is in the arguments list 
    REPORT_ID_ARGS_INDEX = 0 

    # <snip> rest of class, not needed here for this answer 

    # find jobs methods - find by report id (report is the 'associated' object) 

    def self.find_queued_jobs_by_report_id report_id 
    queued_jobs(@queue).select do |job| 
     is_job_for_report? :queued, job, report_id 
    end 
    end 

    def self.find_failed_jobs_by_report_id report_id 
    failed_jobs.select do |job| 
     is_job_for_report? :failed, job, report_id 
    end 
    end 

    def self.find_working_jobs_by_report_id report_id 
    working_jobs.select do |worker,job| 
     is_job_for_report? :working, job, report_id 
    end 
    end 

    # association test method - determine if this job is associated  

    def self.is_job_for_report? state, job, report_id 
    attributes = job_attributes(state, job) 
    attributes[:klass] == self.name && 
     attributes[:queue] == @queue && 
     attributes[:args][REPORT_ID_ARGS_INDEX].to_i == report_id 
    end 

    # remove jobs methods 

    def self.remove_failed_jobs_by_report_id report_id 
    find_failed_jobs_by_report_id(report_id).each do |job| 
     Resque::Failure.remove(job["index"]) 
    end 
    end 

    def self.remove_queued_jobs_by_report_id report_id 
    find_queued_jobs_by_report_id(report_id).each do |job| 
     Resque::Job.destroy(@queue,job["class"],*job["args"]) 
    end 
    end 

    # reusable methods - these methods could go elsewhere and be reusable across worker classes 

    # job attributes method 

    def self.job_attributes(state, job) 
    if state == :queued && job["args"].present? 
     args = job["args"] 
     klass = job["class"] 
    elsif job["payload"] && job["payload"]["args"].present? 
     args = job["payload"]["args"] 
     klass = job["payload"]["class"] 
    else 
     return {args: nil, klass: nil, queue: nil} 
    end 
    {args: args, klass: klass, queue: job["queue"]} 
    end 

    # jobs list methods 

    def self.queued_jobs queue 
    Resque.redis.lrange("queue:#{queue}", 0, -1) 
     .collect do |job| 
     job = Resque.decode(job) 
     job["queue"] = queue # for consistency only 
     job 
     end 
    end 

    def self.failed_jobs 
    Resque.redis.lrange("failed", 0, -1) 
     .each_with_index.collect do |job,index| 
     job = Resque.decode(job) 
     job["index"] = index # required if removing 
     job 
     end 
    end 

    def self.working_jobs 
    Resque.workers.zip(Resque.workers.map(&:job)) 
     .reject { |w, j| w.idle? || j['queue'].nil? } 
    end 

end 

Итак пример использования для отчетов модели становится

class Report < ActiveRecord::Base 
    before_destroy :check_if_working_jobs, :remove_queued_and_failed_jobs 

    def check_if_working_jobs 
    # find all working jobs related to this report object 
    working_jobs = ProcessReportWorker.find_working_jobs_by_report_id(self.id) 
    return false unless working_jobs.empty? 
    end 

    def remove_queued_and_failed_jobs 
    # find all jobs related to this report object 
    queued_jobs = ProcessReportWorker.find_queued_jobs_by_report_id(self.id) 
    failed_jobs = ProcessReportWorker.find_failed_jobs_by_report_id(self.id) 

    # extra code and conditionals here for example only as all that is really 
    # needed is to call the remove methods without first finding or checking 

    unless queued_jobs.empty? 
     ProcessReportWorker.remove_queued_jobs_by_report_id(self.id) 
    end 

    unless failed_jobs.empty? 
     ProcessReportWorker.remove_failed_jobs_by_report_id(self.id) 
    end 

    end 
end 

Решение должно быть изменено, если вы используете несколько очередей для рабочий класс или если у вас несколько очередей сбоев. Также использовался redis сбойный бэкэнд. Если используется другой резервный сервер, могут потребоваться изменения.

Смежные вопросы