2015-07-21 3 views
7

Я получаю следующее сообщение об ошибке в одном из моих рабочих Сельдерея:Weird ошибка с Redis и сельдерея

2015-07-21T15:02:04.010066+00:00 app[worker.1]: Traceback (most recent call last): 
2015-07-21T15:02:04.010069+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/app/trace.py", line 296, in trace_task 
2015-07-21T15:02:04.010070+00:00 app[worker.1]:  on_chord_part_return(task, state, R) 
2015-07-21T15:02:04.010073+00:00 app[worker.1]:  deps.delete() 
2015-07-21T15:02:04.010074+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/result.py", line 773, in delete 
2015-07-21T15:02:04.010071+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/backends/base.py", line 587, in on_chord_part_return 
2015-07-21T15:02:04.010078+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/backends/base.py", line 329, in delete_group 
2015-07-21T15:02:04.010076+00:00 app[worker.1]:  (backend or self.app.backend).delete_group(self.id) 
2015-07-21T15:02:04.010079+00:00 app[worker.1]:  return self._delete_group(group_id) 
2015-07-21T15:02:04.010081+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/backends/base.py", line 499, in _delete_group 
2015-07-21T15:02:04.010082+00:00 app[worker.1]:  self.delete(self.get_key_for_group(group_id)) 
2015-07-21T15:02:04.010083+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/celery/backends/redis.py", line 172, in delete 
2015-07-21T15:02:04.010084+00:00 app[worker.1]:  self.client.delete(key) 
2015-07-21T15:02:04.010085+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/redis/client.py", line 824, in delete 
2015-07-21T15:02:04.010087+00:00 app[worker.1]:  return self.execute_command('DEL', *names) 
2015-07-21T15:02:04.010088+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/redis/client.py", line 565, in execute_command 
2015-07-21T15:02:04.010089+00:00 app[worker.1]:  return self.parse_response(connection, command_name, **options) 
2015-07-21T15:02:04.010090+00:00 app[worker.1]: File "/app/.heroku/python/lib/python2.7/site-packages/redis/client.py", line 579, in parse_response 
2015-07-21T15:02:04.010091+00:00 app[worker.1]:  return self.response_callbacks[command_name](response, **options) 
2015-07-21T15:02:04.010093+00:00 app[worker.1]: ValueError: invalid literal for int() with base 10: 'QUEUED' 

Что я нахожу странным, что я не вижу вызов int в последней строке трассировки стека , QUEUED, вероятно, пришел как статус работника. Я использую его как пользовательский статус, например:

@before_task_publish.connect 
def update_sent_state(sender=None, body=None, **kwargs): 
    # the task may not exist if sent using `send_task` which 
    # sends tasks by name, so fall back to the default result backend 
    # if that is the case. 
    task = current_app.tasks.get(sender) 
    backend = task.backend if task else current_app.backend 
    logging.debug("Setting status for %s" % body["id"]) 

    backend.store_result(body['id'], None, "QUEUED") 

В чем проблема?


В случае, если это необходимо, вот код для моей задачи. Я напрямую вызываю fetch: fetch.

@app.task 
def fetch(url_or_urls, subscribe=None): 
    """This fetches a (list of) podcast(s) and stores it in the db. It assumes that it only gets called 
    by Podcast.get_by_url, or some other method that knows whether a given podcast has 
    already been fetched. 

    If *subscribe* is given, it should be a User instance to be subscribed to the given podcasts.""" 
    if isinstance(url_or_urls, basestring): 
     url_or_urls = [url_or_urls] 
    body = _store_podcasts.s() 
    if subscribe: 
     body.link(_subscribe_user.s(user=subscribe)) 
    return chord([_fetch_podcast_data.s(url) for url in url_or_urls])(body) 

@app.task 
def _fetch_podcast_data(url): 
    return do_fetch(url) # This function returns a dict of podcast data. 

@app.task 
def _store_podcasts(podcasts_data): 
    """Given a list of dictionaries representing podcasts, store them all in the database.""" 
    podcasts = [Podcast(**pdata) for pdata in podcasts_data] 
    return Podcast.objects.insert(podcasts) 

@app.task 
def _subscribe_user(podcasts, user): 
    """Subscribe the given users to all the podcasts in the list.""" 
    return user.subscribe_multi(podcasts) 

Есть ли что-нибудь еще, что могут иметь значение здесь?


версия библиотеки, как показано на pip freeze:

redis==2.10.3 
celery==3.1.18 
+0

Можете ли вы предоставить версию сельдерея и redis-py? У меня есть какой-то момент для исследования, но ваша строка трассировки ошибок не такая же, как у меня. – mrorno

+0

@mrorno Версии, показанные 'pip freeze':' redis == 2.10.3', 'celery == 3.1.18' – bigblind

ответ

2

В redis python пакетах ожидает ответ от DEL действий, чтобы всегда быть целым числом, который я предполагаю, это количество удаленных строк.

Звонок на int происходит в последней строке (return self.response_callbacks[command_name](response, **options)), где self.response_callbacks['DEL'] равен int.

В качестве обходного пути, вы можете подкласс redis.client.StrictRedis и установить функцию обратного вызова DEL ответа на что-то другое, чем int, просто убедитесь, что вы знакомы с последствиями.

+0

И почему он не получает int? Насколько мне известно, я не имею дело с Редисом, а только с сельдереем. – bigblind

+0

Если вы вызываете его асинхронно, он собирается вернуть http://celery.readthedocs.org/en/latest/reference/celery.result.html. После завершения задачи фактический результат (возможно, целое число) будет доступен в атрибуте результата этого результата. – garnertb

+0

Несомненно, и этот результат сохраняется в redis. Похоже, что сельдерей выполняет некоторую работу по очистке, удаляет промежуточные результаты группы и где-то в этом прокси, redis получает статус моей задачи вместо количества делеций. Как это происходит? В случае, если это имеет значение, я добавлю код для задачи, которую я вызываю на вопрос. – bigblind

3

Трудно отладить такую ​​ошибку без рабочего кода. Вот что я думаю, что это может быть. Начнём здесь:

http://celery.readthedocs.org/en/latest/_modules/celery/backends/base.html#BaseBackend.store_result

def store_result(self, task_id, result, status, 
       traceback=None, request=None, **kwargs): 
    """Update task state and result.""" 
    result = self.encode_result(result, status) 
    self._store_result(task_id, result, status, traceback, 
         request=request, **kwargs) 
    return result 

Он называет ecnode_result. Позволяет проверить, что из

def encode_result(self, result, status): 
     if status in self.EXCEPTION_STATES and isinstance(result, Exception): 
      return self.prepare_exception(result) 
     else: 
      return self.prepare_value(result) 

Это выглядит как «состояние», как ожидается, будет что-то из предопределенных констант ГОСУДАРСТВА.

Его код здесь

http://celery.readthedocs.org/en/latest/_modules/celery/states.html#state

И документы здесь

http://celery.readthedocs.org/en/latest/reference/celery.states.html

Это не выглядит, как они ожидают увидеть что-то вроде «QUEU ED ". Попробуйте один из предопределенных.

+0

Задачи сельдерея имеют метод 'update_state()', где вы можете предоставить любую строку, я изучаю это. Документация: http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states – bigblind

+0

hmm 'task.update_state' похоже делает то же самое, что и я:' self.backend.store_result (task_id, meta, state) ', где мета по умолчанию - None. – bigblind

+0

Не могли бы вы попробовать что-то из предопределенных состояний, поэтому мы можем гарантировать, что проблема в том, на что мы смотрим. – singer

Смежные вопросы