2013-03-08 2 views
1

У меня есть интересная ситуация, которую мне нужно выполнить. Мне нужно иметь цикл EventMachine, который сидит и ждет сообщений в очереди AMQP, но затем прерывает этот цикл, чтобы отправить сообщение в отдельную очередь AMQP на регулярном интервале. Я новичок в EventMachine, и это то, что у меня есть до сих пор, за исключением того, что цикл EventMachine не отправляет необходимое сообщение.AMQP Многозадачность

Прямо сейчас я сделал два прока:

listen_loop = Proc.new { 
     AMQP.start(connection_config) do |connection| 
      AMQP::Channel.new(connection) do |channel| 
       channel.queue("queue1", :exclusive => false, :durable => true) do |requests_queue| 
        requests_queue.once_declared do 
         consumer = AMQP::Consumer.new(channel, requests_queue).consume 
         consumer.on_delivery do |metadata, payload| 
          puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..." 
          response = "responding" 
          channel.default_exchange.publish(response, 
           :routing_key => metadata.reply_to, 
           :correlation_id => metadata.message_id, 
           :mandatory  => true) 
          metadata.ack 
         end 
        end 
       end 
      end 
     end 
     Signal.trap("INT") { AMQP.stop { EM.stop } } 
     Signal.trap("TERM") { AMQP.stop { EM.stop } } 
    } 

    send_message = Proc.new { 
     AMQP.start(connection_config) do |connection| 
      channel = AMQP::Channel.new(connection) 
      queue = channel.queue('queue2') 

      channel.default_exchange.publish("hello world", :routing_key => queue.name) 
      EM.add_timer(0.5) do 
       connection.close do 
        EM.stop{ exit } 
       end 
      end 
     end 
    } 

И тогда у меня есть EventMachine Loop:

EM.run do 
     EM.add_periodic_timer(5) { send_message.call } 
     listen_loop.call 
    end 

Я могу получать сообщения в цикле, но слушать я не могу отсылайте любое из сообщений на регулярном интервале.

ответ

0

Выяснено, что я делаю неправильно. Цикл сообщений не смог открыть новое соединение с сервером RabbitMQ, поскольку он уже подключен. Сконфигурировали все в один цикл EventMachine и повторно использовали соединение, и оно работает.

Для тех, кому интересно это выглядит следующим образом:

EM.run do 

    AMQP.start(connection_config) do |connection| 
     channel = AMQP::Channel.new(connection) 

     EM.add_periodic_timer(5) { channel.default_exchange.publish("foo", :routing_key => 'queue2') } 

     queue = channel.queue("queue1", :exclusive => false, :durable => true) 
     channel.prefetch(1) 
     queue.subscribe(:ack => true) do |metadata, payload| 
      puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..." 
      response = "bar" 
      channel.default_exchange.publish(response, 
       :routing_key => metadata.reply_to, 
       :correlation_id => metadata.message_id, 
       :mandatory  => true) 
      metadata.ack 
     end 
    end 
    Signal.trap("INT") { AMQP.stop { EM.stop } } 
    Signal.trap("TERM") { AMQP.stop { EM.stop } } 

end 
Смежные вопросы