2014-12-04 2 views
1

У меня проблема. Мой клиент pika постоянно выходит из строя с сообщением об ошибке.Pika сбой без видимой причины

Это то, что происходит:

  1. RabbitMQ работает, а производитель уже толкнул сообщения в очередь
  2. я начинаю мой питон скрипт и обрабатывает все пакеты, буферизованные в очереди
  3. Мой сценарий периодически бросает Exeption: ConnectionClosed, однако я никогда не закрывать что-нибудь где-нибудь

Это мой код:

import pika 
import traceback 

class RPCServer(object): 


    def __init__(self, callback, cfg): 
     self.cfg = cfg 
     self.callback = callback 
     self.credentials = None 
     self.parameters = None 
     self.connection = None 
     self.channel = None 
     self.counter = 0 
     self.initalize_me() 


    def initalize_me(self): 
     self.credentials = pika.PlainCredentials(self.cfg.USER, self.cfg.PASSWORD) 
     self.parameters = pika.ConnectionParameters(host=self.cfg.AMQP_HOST, credentials=self.credentials) 
     self.connection = pika.BlockingConnection(self.parameters) 
     self.channel = self.connection.channel() 
     self.channel.exchange_declare(exchange=self.cfg.RPC_EXCHANGE_NAME, type="direct") 
     self.channel.queue_declare(queue=self.cfg.RPC_QUEUE_NAME) 
     self.channel.queue_bind(exchange=self.cfg.RPC_EXCHANGE_NAME, queue=self.cfg.RPC_QUEUE_NAME, routing_key=self.cfg.RPC_ROUTING_KEY) 
     self.channel.basic_consume(self.rpc_callback, queue=self.cfg.RPC_QUEUE_NAME) 
     print "init= " + str(self.cfg.RPC_EXCHANGE_NAME) + " -> " + str(self.cfg.RPC_QUEUE_NAME) + " -> " + str(self.cfg.RPC_ROUTING_KEY) 


    def start_rpc_server(self): 
     print "Server: Start listening for RPC requests..." 
     try: 
      self.channel.start_consuming() 
     except: 
      print "Exception: " + str(traceback.format_exc()) 
      self.initalize_me() 
      self.start_rpc_server() 


    def rpc_callback(self, ch, method, props, body): 
     self.counter += 1 
     if self.counter == 100: 
      print "100 package processed..." 
      self.counter = 0 
     result = self.callback(body) 
     properties = pika.BasicProperties(correlation_id=props.correlation_id) 
     ch.basic_publish(exchange="", routing_key=props.reply_to, properties=properties, body=result) 
     ch.basic_ack(delivery_tag=method.delivery_tag) 

И это выход, когда я запускаю его:

python RunPacer.py 
Initialize Configuration 
Start Pacer 
100 package processed... 
100 package processed... 
init= pacing_exchange_debug -> pacing_queue_debug -> pacing_routing_key_debug 
Server: Start listening for RPC requests... 
Exception: Traceback (most recent call last): 
    File "/home/Tom/Pacer/amqp/RPCServer.py", line 46, in start_rpc_server 
    self.channel.start_consuming() 
    File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 955, in start_consuming 
    self.connection.process_data_events() 
    File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 243, in process_data_events 
    raise exceptions.ConnectionClosed() 
ConnectionClosed 

100 package processed... 
100 package processed... 
init= pacing_exchange_debug -> pacing_queue_debug -> pacing_routing_key_debug 
Server: Start listening for RPC requests... 
Exception: Traceback (most recent call last): 
    File "/home/Tom/Pacer/amqp/RPCServer.py", line 46, in start_rpc_server 
    self.channel.start_consuming() 
    File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 955, in start_consuming 
    self.connection.process_data_events() 
    File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 243, in process_data_events 
    raise exceptions.ConnectionClosed() 
ConnectionClosed 

init= pacing_exchange_debug -> pacing_queue_debug -> pacing_routing_key_debug 
Server: Start listening for RPC requests... 
100 package processed... 
100 package processed... 
Exception: Traceback (most recent call last): 
    File "/home/Tom/Pacer/amqp/RPCServer.py", line 46, in start_rpc_server 
    self.channel.start_consuming() 
    File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 955, in start_consuming 
    self.connection.process_data_events() 
    File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 243, in process_data_events 
    raise exceptions.ConnectionClosed() 
ConnectionClosed 

К сожалению, мое описание очень unprecise, однако, это благодаря тому, что я абсолютно не знаю, почему мой сценарий аварий. Поэтому любой, действительно любой, совет будет полезен. Благодаря!

EDIT: Журналы ошибок из RabbitMQ добавил:

=INFO REPORT==== 4-Dec-2014::12:55:42 === 
accepting AMQP connection <0.8947.0> (183.13.20.123:61598 -> 183.13.20.123:5672) 

=ERROR REPORT==== 4-Dec-2014::12:55:42 === 
Error on AMQP connection <0.8947.0> (183.13.20.123:61598 -> 183.13.20.123:5672, vhost: '/', user: 'username', state: running), channel 1: 
{amqp_error,unexpected_frame, 
      "expected content body, got non content body frame instead", 
      'basic.publish'} 

=INFO REPORT==== 4-Dec-2014::12:55:43 === 
closing AMQP connection <0.8947.0> (183.13.20.123:61598 -> 183.13.20.123:5672) 

Дополнительно: У меня есть программа Java, работающих на одной и той же очереди (на самом деле копию сценария питона в Java), который работает без каких-либо проблем.

+0

Является ли какое-либо указание проблемы в журналах RabbitMQ? – HAL

+0

Ваша программа Java изящно пытается подключиться к ошибкам соединения? Любые другие журналы (dmesg/rabbitmq/etc) имеют события во время отключения? –

+0

Нет, моя программа Java не пытается повторно подключиться. Я не обнаружил никаких других проблем в файлах журналов. – toom

ответ

2

Это, очевидно, ошибка в Pika (см. Здесь: https://github.com/pika/pika/issues/349). Поскольку этот вопрос не исправлен уже более года (сообщение об ошибке 12/2013 это сообщение: 12/2014) Я больше не буду полагаться на пику.

Однако, большой (и быстро) альтернативы (что приводит к еще меньшим количеством кода) является librabbitmq: https://pypi.python.org/pypi/librabbitmq

Вот пример кода, который реализует вызов RPC с librabbitmq:

# -*- coding: utf-8 -*- 

from librabbitmq import Connection 
import uuid 

class RPCClient(object): 

    def __init__(self, cfg): 
     self.cfg = cfg 
     self.connection = Connection(host=cfg.AMQP_HOST, userid=cfg.USER, password=cfg.PASSWORD) 
     self.channel = self.connection.channel() 
     result = self.channel.queue_declare(exclusive=True) 
     self.callback_queue = result.queue 
     self.response = None 
     self.corr_id = None 
     self.channel.basic_consume(self.callback_queue, callback=self.process_response) 


    def process_response(self, msg): 
     if self.corr_id == msg.properties['correlation_id']: 
      self.response = str(msg.body) 


    def rpc_call(self, msg): 
     self.response = None 
     self.corr_id = str(uuid.uuid4()) 
     props = {'reply_to' : str(self.callback_queue), 'correlation_id' : str(self.corr_id)} 

     self.channel.basic_publish(msg, exchange=self.cfg.RPC_EXCHANGE_NAME, 
            routing_key=self.cfg.RPC_ROUTING_KEY, **props) 
     while self.response is None: 
      self.connection.drain_events() 
     return str(self.response) 

а также соответствующий RPCServer:

# -*- coding: utf-8 -*- 

from librabbitmq import Connection 

class RPCServer(object): 

    def __init__(self, callback, cfg): 
     self.cfg = cfg 
     self.callback = callback 
     self.connection = Connection(host=cfg.AMQP_HOST, userid=cfg.USER, password=cfg.PASSWORD) 
     self.channel = self.connection.channel() 
     self.channel.exchange_declare(cfg.RPC_EXCHANGE_NAME, "direct") 
     self.channel.queue_declare(cfg.RPC_QUEUE_NAME) 
     self.channel.queue_bind(cfg.RPC_QUEUE_NAME, cfg.RPC_EXCHANGE_NAME, cfg.RPC_ROUTING_KEY) 
     self.channel.basic_consume(cfg.RPC_QUEUE_NAME, callback=self.rpc_callback) 


    def start_rpc_server(self): 
     while True: self.connection.drain_events() 


    def rpc_callback(self, msg): 
     resp = self.callback(msg.body) 
     self.channel.basic_publish(resp, exchange="", routing_key=msg.properties['reply_to'], **msg.properties) 
     self.channel.basic_ack(msg.delivery_info['delivery_tag']) 
Смежные вопросы