2013-10-04 3 views
4

Мне было интересно, могу ли я остановить выполнение операции, которая была дефферирована.Ruby Event Machine останавливает или убивает операцию дефиса

require 'rubygems' 
require 'em-websocket' 

EM.run do 
    EM::WebSocket.start(:host => '0.0.0.0', :port => 8080) do |ws| 
    ws.onmessage do |msg| 
     op = proc do 
     sleep 5 # Thread safe IO here that is safely killed 
     true 
     end 

     callback = proc do |result| 
     puts "Done!" 
     end 

     EM.defer(op, callback) 
    end 
    end 
end 

Это пример сервера веб-сокетов. Иногда, когда я получаю сообщение, я хочу сделать несколько ввода-вывода, позже может появиться другое сообщение, которое должно читать одно и то же, следующее всегда имеет приоритет над предыдущей. Поэтому я хочу отменить первый op и сделать второй.

+0

Фактически, вы не используете только одну нить (кроме резьбы реактора) в любой точке? – Kashyap

+2

Я _think_ Я получил ответ, который вы ищете (это работает, кстати). Но я не знаю, есть ли у него какие-либо нежелательные побочные эффекты:/Вот мое решение: https://gist.github.com/kgrz/6826255 – Kashyap

+0

Вопрос не совсем ясен, почему я хочу это сделать. Да, в этом примере я бы использовал только один поток в любой заданной точке. Мой реальный пример мира состоит в том, что у меня есть два типа сообщений, которые входят. Тип 1 делает длинный IO, а тип 2 - множеством вещей. Когда приходит сообщение типа 1, он откладывает и занимает много времени, означает, что при приеме нескольких сообщений типа 2 выполняются их операционные и обратные вызовы. В основном я хочу, чтобы одна операция типа 1 продолжалась постоянно и не мешала никаким операциям типа 2. –

ответ

1

Вот мое решение. Он похож на решение EM.queue, но просто использует хэш.

require 'rubygems' 
require 'em-websocket' 
require 'json' 

EM.run do 
    EM::WebSocket.start(:host => '0.0.0.0', :port => 3333) do |ws| 
    mutex = Mutex.new # to make thread safe. See https://github.com/eventmachine/eventmachine/blob/master/lib/eventmachine.rb#L981 
    queue = EM::Queue.new 
    ws.onmessage do |msg| 
     message_type = JSON.parse(msg)["type"] 
     op = proc do 
     mutex.synchronize do 
      if message_type == "preferred" 
      puts "killing non preferred\n" 
      queue.size.times { queue.pop {|thread| thread.kill } } 
      end 
      queue << Thread.current 
     end 

     puts "doing the long running process" 
     sleep 15 # Thread safe IO here that is safely killed 
     true 
     end 

     callback = proc do |result| 
     puts "Finished #{message_type} #{msg}" 
     end 

     EM.defer(op, callback) 
    end 
    end 
end 
+0

Вам не нужно добавлять к нему мутексы? – Kashyap

+0

Я пробовал много раз, и он работал каждый раз. Документация здесь https://github.com/eventmachine/eventmachine/blob/master/lib/eventmachine.rb#L981, похоже, согласна с вами, поэтому я вложил некоторые из ваших предложений в свой ответ. Должен был опубликовать это в стеке вместо github, и вы получите кредит! – earlonrails

Смежные вопросы