2015-12-16 4 views
1

мне нужно конвертировать видео в 4 нитиЗапуск скриптов параллельно в рубине

Например, у меня есть активные модели Запись видео с названиями: video1, video2, Video3, Video4, Video5

Итак, мне нужно выполнить что-то вроде этого

bundle exec script/video_converter start 

Где скрипт будет обрабатывать непрореагировавших видео для 4 потоков, например

Video.where(state: 'unconverted').first.process 

Но если одно из 4 видеороликов преобразовано, следующее видео должно автоматически добавляться в поток

Какое это лучшее решение? Sidekiq gem? Демоны gem + Ruby Threads вручную?

Сейчас я использую этот скрипт:

THREAD_COUNT = 4 
SLEEP_TIME = 5 
logger = CONVERTATION_LOG 
spawns = [] 
loop do 
    videos = Video.where(state:'unconverted').limit(THREAD_COUNT).reorder("ID DESC") 
    videos.each do |video| 
    spawns << Spawnling.new do 
     result = video.process 
     if result.nil? 
     video.create_thumbnail! 
     else 
     video.failured! 
     end 
    end 
    end 
    Spawnling.wait(spawns) 
    sleep(SLEEP_TIME) 
end 

Но этот сценарий ждет 4 видео, и после того, как он занимает еще 4 видео. Я хочу, чтобы после преобразования одного из 4-го видео он будет автоматически добавлен в новый поток, который пуст.

+2

Резьбы не параллельны в некоторых режимах работы, например. МРТ. Что ты используешь? – sschmeck

+0

Даже с МРТ это не так просто, например. если вы вызываете собственный код, который освобождает GVL или порождает процесс ffmpeg –

+0

Я использую [sprown gem] (https://github.com/tra/spawnling), добавлен дополнительный пример к первому сообщению – rs41

ответ

1

Если ваша цель состоит в том, чтобы продолжать обрабатывать видео, используя только 4 потока (или что-то, что Спаполинг настроен для использования - поскольку он поддерживает fork и thread), тогда вы можете использовать Queue для очереди всех ваших видеозаписей, которые будут обработаны , порождают 4 потока и позволяют им обрабатывать записи один за другим, пока очередь не будет пустой.

require "rails" 
require "spawnling" 

# In your case, videos are read from DB, below array is for illustration 
videos = ["v1", "v2", "v3", "v4", "v5", "v6", "..."] 


THREAD_COUNT = 4 

spawns = [] 

q = Queue.new 

videos.each {|i| q.push(i) } 

THREAD_COUNT.times do 
    spawns << Spawnling.new do 
    until q.empty? do 
     v = q.pop 

     # simulate processing 
     puts "Processing video #{v}" 

     # simulate processing time 
     sleep(rand(10)) 
    end 
    end 
end 

Spawnling.wait(spawns) 

Этот ответ вдохновлен this answer

PS: Я добавил несколько указаний требуется и определенный videos массив, чтобы сделать выше кода самодостаточным работает, например.

+1

И если вы Не хотите сразу загружать все необработанные видеоролики, настройте пятый поток, который засыпает, пока очередь не окажется ниже порога, а затем заправьте ее. –

+0

@Wand Maker, спасибо за предложение! 2 замечает: 1) Для вашей логике, если мы имеем этот массив '[ "v1", "v2", "v3", "v4", "v5", "v6"]' Сценарий добавить '[" v1 "," v2 "," v3 "," v4 "," v5 "," v6 "]' для первой нити, и ни к какому-либо второму, третьему и четвертому потоку, поскольку до этого нет цикла с условием Но мне нужно, чтобы в первый поток добавили «v1», на второй «v2», на 3-й «v3», на 4-й «v4», на первый «v5», на второй «v6» и т. Д. – rs41

+0

2) Похоже, что ваш пример добавил видео сразу, а после этого процесса весь массив Но иногда менеджер проекта периодически добавляет новые видео, и я хочу добавить это видео в существующую очередь, а не в новую очередь (So videos should async добавлено в очередь, и скрипт должен добавить это видео в текущие 4 thre Объявления). Что вы думаете об этом? – rs41

Смежные вопросы