2011-12-14 3 views
33

Я использую Celery для управления асинхронными задачами. Иногда, однако, процесс сельдерея снижается, что не приводит к тому, что ни одна из задач не выполняется. Я хотел бы иметь возможность проверить статус сельдерея и убедиться, что все работает нормально, и если я обнаруживаю какие-либо проблемы, выдает сообщение об ошибке пользователю. Из документации Celery Worker видно, что я мог бы использовать ping или inspect, но ping чувствует себя взломанным, и неясно, как именно будет использоваться проверка (если проверка() зарегистрирована() пуста?).Определить, доступен ли сельдерей/работает

Любое руководство по этому вопросу будет оценено по достоинству. В основном то, что я ищу способ, как так:

def celery_is_alive(): 
    from celery.task.control import inspect 
    return bool(inspect().registered()) # is this right?? 

EDIT: Это даже не похоже на учет() доступно на сельдерее 2.3.3 (хотя бы перечислить его в 2,1 документах). Возможно, пинг - правильный ответ.

РЕДАКТИРОВАТЬ: Пинг также не выглядит так, как я думал, что он будет делать, поэтому не уверен, что ответ здесь.

+0

ли ответ ниже не работает для вас? Как кто-то, у кого есть аналогичная проблема для решения, я хотел бы получить некоторое подтверждение. – kojiro

ответ

44

Вот код, который я использовал. celery.task.control.Inspect.stats() возвращает dict, содержащий множество подробностей о доступных в настоящее время работниках, None, если нет работающих рабочих, или вызывает IOError, если он не может подключиться к брокеру сообщений. Я использую RabbitMQ - возможно, что другие системы обмена сообщениями могут вести себя несколько иначе. Это работало в Celery 2.3.x и 2.4.x; Я не уверен, как далеко он уходит.

def get_celery_worker_status(): 
    ERROR_KEY = "ERROR" 
    try: 
     from celery.task.control import inspect 
     insp = inspect() 
     d = insp.stats() 
     if not d: 
      d = { ERROR_KEY: 'No running Celery workers were found.' } 
    except IOError as e: 
     from errno import errorcode 
     msg = "Error connecting to the backend: " + str(e) 
     if len(e.args) > 0 and errorcode.get(e.args[0]) == 'ECONNREFUSED': 
      msg += ' Check that the RabbitMQ server is running.' 
     d = { ERROR_KEY: msg } 
    except ImportError as e: 
     d = { ERROR_KEY: str(e)} 
    return d 
+0

Работал для меня :) – kojiro

+6

Я обнаружил, что вышеприведенное добавляет две очереди reply.celery.pidbox к rabbitmq каждый раз, когда он запускается. Это приводит к постепенному увеличению использования памяти rabbitmq. – kojiro

2

Следующие работал для меня:

import socket 
from kombu import Connection 

celery_broker_url = "amqp://localhost" 

try: 
    conn = Connection(celery_broker_url) 
    conn.ensure_connection(max_retries=3) 
except socket.error: 
    raise RuntimeError("Failed to connect to RabbitMQ instance at {}".format(celery_broker_url)) 
+2

Я уверен, что это будет успешным, если rabbitmq работает независимо от статуса сельдерея. Но это хорошая проверка, если сельдерей не знает, сбой с кроликом или что-то еще. –

4

Для проверки же с помощью командной строки в случае сельдерея запущен как демон,

  • Активировать virtualenv и перейти к директории, где 'app' is
  • Теперь запустить: celery -A [app_name] status
  • Он покажет, если сельдерей вверх или нет плюс нет. узлов онлайн

Источник: http://michal.karzynski.pl/blog/2014/05/18/setting-up-an-asynchronous-task-queue-for-django-using-celery-redis/

Смежные вопросы