2016-10-25 4 views
1

Я использую celerybeat для отправки периодических заданий в очередь rabbitmq. Он работает как ожидалось в течение некоторого времени, а затем отправляет celery.backend_cleanup с ошибкой SSL.celerybeat send task failed with ssl error

[2016-10-24 04:00:02,309: DEBUG/MainProcess] Channel open 
[2016-10-24 04:00:02,443: ERROR/MainProcess] Message Error: Couldn't apply scheduled task celery.backend_cleanup: [Errno 1] _ssl.c:1309: error:1409F07F:SSL routines:SSL3_WRITE_PENDING:bad write retry 
[' File "/local/mnt/apps/ipcat/venvs/django16/bin/celery", line 9, in <module>\n load_entry_point(\'celery-ipcat==3.1.23\', \'console_scripts\', \'celery\')()\n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/__main__.py", line 30, in main\n main()\n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/bin/celery.py", line 81, in main\n cmd.execute_from_commandline(argv)\n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/bin/celery.py", line 793, in execute_from_commandline\n super(CeleryCommand, self).execute_from_commandline(argv)))\n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/bin/base.py", line 311, in execute_from_commandline\n return self.handle_argv(self.prog_name, argv[1:])\n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/bin/celery.py", line 785, in handle_argv\n return self.execute(command, argv)\n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/bin/celery.py", line 717, in execute\n ).run_from_argv(self.prog_name, argv[1:], command=argv[0])\n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/bin/base.py", line 315, in run_from_argv\n sys.argv if argv is None else argv, command)\n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/bin/base.py", line 377, in handle_argv\n return self(*args, **options)\n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/bin/base.py", line 274, in __call__\n ret = self.run(*args, **kwargs)\n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/bin/beat.py", line 79, in run\n return beat().run()\n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/apps/beat.py", line 83, in run\n self.start_scheduler()\n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/apps/beat.py", line 112, in start_scheduler\n beat.start()\n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/beat.py", line 479, in start\n interval = self.scheduler.tick()\n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/beat.py", line 221, in tick\n next_time_to_run = self.maybe_due(entry, self.publisher)\n', ' File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/beat.py", line 207, in maybe_due\n exc, traceback.format_stack(), exc_info=True)\n'] 
Traceback (most recent call last): 
    File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/beat.py", line 204, in maybe_due 
    result = self.apply_async(entry, publisher=publisher) 
    File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/beat.py", line 260, in apply_async 
    entry, exc=exc)), sys.exc_info()[2]) 
    File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/beat.py", line 252, in apply_async 
    **entry.options) 
    File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/app/task.py", line 565, in apply_async 
    **dict(self._get_exec_options(), **options) 
    File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/app/base.py", line 354, in send_task 
    reply_to=reply_to or self.oid, **options 
    File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/celery/app/amqp.py", line 310, in publish_task 
    **kwargs 
    File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/kombu/messaging.py", line 172, in publish 
    routing_key, mandatory, immediate, exchange, declare) 
    File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/kombu/connection.py", line 436, in _ensured 
    return fun(*args, **kwargs) 
    File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/kombu/messaging.py", line 184, in _publish 
    [maybe_declare(entity) for entity in declare] 
    File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/kombu/messaging.py", line 111, in maybe_declare 
    return maybe_declare(entity, self.channel, retry, **retry_policy) 
    File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/kombu/common.py", line 113, in maybe_declare 
    return _maybe_declare(entity, declared, ident, channel) 
    File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/kombu/common.py", line 120, in _maybe_declare 
    entity.declare() 
    File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/kombu/entity.py", line 521, in declare 
    self.exchange.declare(nowait) 
    File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/kombu/entity.py", line 174, in declare 
    nowait=nowait, passive=passive, 
    File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/amqp/channel.py", line 615, in exchange_declare 
    self._send_method((40, 10), args) 
    File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, in _send_method 
    self.channel_id, method_sig, args, content, 
    File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/amqp/method_framing.py", line 221, in write_method 
    write_frame(1, channel, payload) 
    File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/amqp/transport.py", line 182, in write_frame 
    frame_type, channel, size, payload, 0xce, 
    File "/local/mnt/apps/ipcat/venvs/django16/lib/python2.7/site-packages/amqp/transport.py", line 254, in _write 
    n = write(s) 
    File "/usr/lib64/python2.7/ssl.py", line 172, in write 
    return self._sslobj.write(data) 
SchedulingError: Couldn't apply scheduled task celery.backend_cleanup: [Errno 1] _ssl.c:1309: error:1409F07F:SSL routines:SSL3_WRITE_PENDING:bad write retry 
[2016-10-24 04:00:02,743: DEBUG/MainProcess] beat: Waking up in 5.00 minutes. 

После этой неудачи все последующие задачи посыла с ошибкой, как,

[2016-10-24 12:30:03,026: DEBUG/MainProcess] beat: Synchronizing schedule... 
[2016-10-24 12:30:03,253: ERROR/MainProcess] Message Error: Couldn't apply scheduled task aggregate_hw_errata_sign_off_metrics_schedule: 'NoneType' object has no attribute 'do_handshake' 

Я мог бы отключить celery.backend_cleanjob не используя CELERY_TASK_RESULT_EXPIRES = None, но я боюсь, это может не работает на других задач, настроенных.

Любая помощь или руководство оценены.

ответ

0

Думаю, я нашел проблему. Я создавал соединение и передавал его для обмена и очереди экземпляра. Я изменил код, чтобы позволить Queue и Exchange создавать соединение с брокером. до сих пор никаких проблем после этого изменения.

Благодаря

Старый код:

from kombu import Connection, Queue, Exchange 
conn = Connection(
       hostname=settings.BROKER_HOST, 
       port=settings.BROKER_PORT, 
       userid=settings.BROKER_USER, 
       password=settings.BROKER_PASSWORD, 
       virtual_host=settings.BROKER_VHOST, 
       connect_timeout=BROKER_CONNECTION_TIMEOUT, 
       ssl=BROKER_USE_SSL, 
       transport='pyamqp') 
conn.connect() 
channel = conn.channel() 

exchange = Exchange('my.exchange', type='direct', passive=True, channel=channel) 

CELERY_QUEUES = (
    Queue('my.tasks', exchange=exchange, routing_key='my.metrics', channel=conn, passive=True), 
) 

Новый код (исправление):

from kombu import Queue, Exchange 
exchange = Exchange('my.exchange', type='direct', passive=True) 

CELERY_QUEUES = (
    Queue('my.tasks', exchange=exchange, routing_key='my.metrics', passive=True), 
)