3

У меня есть задания sidekiq, обрабатывающие много разных ресурсов. Однако для определенного типа ресурса, например: Resource X, мне нужно убедиться, что только одно действие sidekiq может обрабатывать этот конкретный ресурс в любой момент времени.Заблокировать/переустановить другие задания sidekiq от обработки, когда существующее задание sidekiq обрабатывает определенный ресурс

Например, если у меня есть 3 задания sidekiq, которые одновременно ставятся в очередь и хотят взаимодействовать с ресурсом X, то только одно действие sidekiq может обрабатывать ресурс X, в то время как 2 оставшихся задания sidekiq придется ждать (или переупорядочиваться), пока работа sidekiq, которая в настоящее время не обрабатывает ресурсы, не заканчивается.

В настоящее время я пытаюсь добавить запись в таблицу базы данных, когда задание sidekiq обрабатывает ресурс и использует это, чтобы остановить другие задания sidekiq от обработки ресурса до тех пор, пока эта запись не будет удалена из базы данных с помощью sidekiq job который добавляет его (когда он завершает обработку ресурса X) или по прошествии определенного прошедшего времени (например: если запись была создана более 5 минут назад, то считается, что она больше не имеет эксклюзивного доступа к ресурсу X и следующей сторонеkiqq работа, которая хочет обработать ресурс X, может изменить эту запись и потребовать эксклюзивный доступ к ресурсу X).

псевдокод моей текущей реализации:

def perform(res_id, res_type) 

    # Only applies to "RESOURCE_X" 
    if res_type == RESOURCE_X 
    if ResourceProcessor.where(res_id).empty? || ((Time.now-ResourceProcessor.where(res_id).first.created_at) > 5.minutes) 
     ResourceProcessor.create(res_id: res_id).save 
     process_resource_x(res_id) 
    else 
     SidekiqWorker.delayed(res_id, res_type, 5.minutes) #Try again later 
     return 
    end 

    #Letting other sidekiq jobs know they can now fight over who gets to process resource X 
    ResourceProcessor.where(res_id).destroy 

    else 
    process_other_resource(res_id) 
    end 

end 

К сожалению, мое решение не работает. Он отлично работает, если есть задержка между заданиями sidekiq, которые хотят обработать ресурс X. Однако, если задания, которые хотят обрабатывать ресурс X, поступают одновременно, тогда мое решение разваливается.

Можно ли каким-либо образом обеспечить синхронизацию только при обработке ресурса X?

Btw, мои задания sidekiq могут быть распределены между несколькими машинами (но они получают доступ к тому же серверу redis на выделенной машине).

+2

Возможно, вы ищете ** блокировку **, чтобы иметь ресурс, измененный только одним потоком/процессом за раз. Здесь хорошо читается, что предлагает использовать базу данных для координации блокировок: https://makandracards.com/makandra/31937-differences-between-transactions-and -locking. Некоторые стратегии блокировки реализованы в [with_advisory_lock gem] (https://github.com/mceachen/with_advisory_lock). –

ответ

0

Я сделал больше исследований на основе комментариев, предоставленных Томасом.

Ссылка, которую он предоставил, была чрезвычайно полезна. Они реализовали свой собственный класс Lock для достижения желаемых результатов. Однако я не использовал их собственный код блокировки, потому что мне нужно другое поведение.

Конкретное поведение, которое я искал для реализации, это «Повторная очередь, если заблокирована», а не «Подождите, если заблокирован».

Есть альтернативные инструменты, которые я мог бы использовать, например redis-semaphore и with_advisory_gem. Я тестировал redis-семафор и нашел его багги. Он не возвращал состояние блокировки и количество ресурсов правильно. Кроме того, после проверки проблем в Github, в некоторых ситуациях redis-semaphore может оказаться в своем тупике, поэтому я решил отказаться от его использования. В результате я также решил не использовать параметр with_advisory_gem из-за его более низкого количества звезд, чем redis-семафор.

В конце концов я нашел способ реализовать шаблон блокировки, описанный в моем вопросе, который должен блокировать задания sidekiq на основе значения в моей базе данных. Я имел дело с проблемой параллелизма нескольких заданий sidekiq, считывая устаревшие значения, блокируя всю строку базы данных с собственным классом Locking-pessimistic рельса. Это гарантировало, что только 1 сторонний рабочий может получить доступ к строке базы данных, которая имеет значение блокировки в любой момент времени. Период блокировки поддерживается минимальным, поскольку только чтение и, когда это применимо, выполняется операция записи при блокировке строки базы данных. Последующие операции, такие как выполнение запроса и очистка, выполняются после.

Смежные вопросы