2015-06-07 4 views
0

RabbiMQ RPC генерирует исключение, когда я создаю шаблон издателя-потребителя. Это мой код: sender.pyrabbitmq (pika) выдает исключение при использовании RPC

# coding:utf-8 
import pika 
import uuid 


class MessageSender(object): 

    def __init__(self): 
     self.connention = pika.BlockingConnection(
      pika.ConnectionParameters(host='localhost')) 
     self.channel = self.connention.channel() 

     result = self.channel.queue_declare(exclusive=True) 
     self.callback_queue = result.method.queue 
     print 'MessageSender callback queue is ------', self.callback_queue 
     self.channel.basic_consume(
      self.on_response, no_ack=True, queue=self.callback_queue) 

    def on_response(self, ch, method, props, body): 
     if self.corr_id == props.correlation_id: 
      self.respose = body 

    def send(self): 
     self.respose = None 
     self.corr_id = str(uuid.uuid4()) 
     self.channel.basic_publish(exchange='', 
            routing_key='rpc_queue', 
            properties=pika.BasicProperties(
             reply_to=self.callback_queue, 
             correlation_id=self.corr_id,), 
            body='MESSAGE') 
     print '[x]Send sucessful' 
     while self.respose is None: 
      self.connention.process_data_events() 
     print 'MessageSender get callback ------', self.respose 


if __name__ == '__main__': 
    sender = MessageSender() 
    sender.send() 

worker.py

# coding:utf-8 
import pika 
import time 


class MessageWorker(object): 

    def __init__(self): 
     self.connection = pika.BlockingConnection(
      pika.ConnectionParameters(host='localhost')) 
     self.channel = self.connection.channel() 
     self.channel.queue_declare(queue='rpc_queue') 

    def on_request(self, ch, method, props, body): 
     print 'MessageWorker get message ------', body 
     response = task(body) 
     print 'MessageWorker callback on queue ------', props.reply_to 
     ch.basic_publish(exchange='', 
         routing_key=props.reply_to, 
         properties=pika.BasicProperties(
          correlation_id=props.correlation_id), 
         body=str(response)) 
     print 'MessageWorker send message to MessageSender' 
     ch.basic_ask(delivery_tag=method.delivery_tag) 

    def work(self): 
     self.channel.basic_qos(prefetch_count=2) 
     self.channel.basic_consume(self.on_request, queue='rpc_queue') 
     print '[x]Waiting message...' 
     self.channel.start_consuming() 


def task(body): 
    time.sleep(3) 
    return body 


if __name__ == '__main__': 
    worker = MessageWorker() 
    worker.work() 

Отправитель работает отлично, но worker.py всегда дают ошибку:

[x]Waiting message... 
MessageWorker get message ------ MESSAGE 
MessageWorker callback on queue ------ amq.gen-6l5oJapbiKIqG3ZYTRwqpA 
MessageWorker send message to MessageSender 
Traceback (most recent call last): 
    File "python/workers/new_worker.py", line 40, in <module> 
    worker.work() 
    File "python/workers/new_worker.py", line 30, in work 
    self.channel.start_consuming() 
    File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 955, in start_consuming 
    self.connection.process_data_events() 
    File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 243, in process_data_events 
    raise exceptions.ConnectionClosed() 
pika.exceptions.ConnectionClosed 

это беспокоит мне почти неделю! Кто-нибудь может помочь? Спасибо :)

+0

О, antoher вопрос, то routing_key пункт должен быть равен 'rpc_queue', когда я хочу использовать RPC? Потому что я установил его только на «rpc», рабочий не может получить то, что отправляет отправитель! – Hugo

ответ

1

Хорошо, я знал, что происходит. В worker.py я орфографическую ошибку

ch.basic_ack(delivery_tag=method.delivery_tag) 

в

ch.basic_ask(delivery_tag=method.delivery_tag) 

Боже мой! T.T

Смежные вопросы