2014-06-06 6 views
4

У меня возникает проблема, когда я помещаю задачу в очередь, и она запускается несколько раз. Из бревен сельдерея я могу видеть, что тот же работник работает задание ...Celery/Django Single Tasks запускается несколько раз

[2014-06-06 15:12:20,731: INFO/MainProcess] Received task: input.tasks.add_queue 
[2014-06-06 15:12:20,750: INFO/Worker-2] starting runner.. 
[2014-06-06 15:12:20,759: INFO/Worker-2] collection started 
[2014-06-06 15:13:32,828: INFO/Worker-2] collection complete 
[2014-06-06 15:13:32,836: INFO/Worker-2] generation of steps complete 
[2014-06-06 15:13:32,836: INFO/Worker-2] update created 
[2014-06-06 15:13:33,655: INFO/Worker-2] email sent 
[2014-06-06 15:13:33,656: INFO/Worker-2] update created 
[2014-06-06 15:13:34,420: INFO/Worker-2] email sent 
[2014-06-06 15:13:34,421: INFO/Worker-2] FINISH - Success 

Однако при просмотре фактических журналов приложения она показывает 5-6 строк журнала для каждого шага (??).

Im, использующий Django 1.6 с RabbitMQ. Способ размещения в очереди - это установка задержки на функцию.

Эта функция (задача декоратора добавлена ​​(то называет класс, который запускается

Кто-нибудь какие-либо идеи по лучшим способом для устранения этой

Edit:.? В соответствии с просьбой Херес код,

views.py

на мой взгляд, им посылать свои данные в т он в очереди с помощью ...

from input.tasks import add_queue_project 

add_queue_project.delay(data) 

tasks.py

from celery.decorators import task 

@task() 
def add_queue_project(data): 
    """ run project """ 
    logger = logging_setup(app="project") 

    logger.info("starting project runner..") 
    f = project_runner(data) 
    f.main() 

class project_runner(): 
    """ main project runner """ 

    def __init__(self,data): 
     self.data = data 
     self.logger = logging_setup(app="project") 

    def self.main(self): 
     .... Code 

settings.py

THIRD_PARTY_APPS = (
    'south', # Database migration helpers: 
    'crispy_forms', # Form layouts 
    'rest_framework', 
    'djcelery', 
) 

import djcelery 
djcelery.setup_loader() 

BROKER_HOST = "127.0.0.1" 
BROKER_PORT = 5672 # default RabbitMQ listening port 
BROKER_USER = "test" 
BROKER_PASSWORD = "test" 
BROKER_VHOST = "test" 
CELERY_BACKEND = "amqp" # telling Celery to report the results back to RabbitMQ 
CELERY_RESULT_DBURI = "" 

CELERY_IMPORTS = ("input.tasks",) 

celeryd

л ине им ход, чтобы начать сельдерей,

python2.7 manage.py celeryd -l info 

Спасибо,

+0

Это не рабочие места, созданные celerybeat, правильно? – JeffS

+0

Создаете ли вы задачу в обработчике сигнала django? Если да, убедитесь, что сигнал не вызывается несколько раз. –

+0

, пожалуйста, напишите свой код – dm03514

ответ

1

У меня нет точного ответа для вас, но есть несколько вещей, которые вы должны искать в:

  • djcelery устарел, поэтому, если вы используете новую версию celery, может возникнуть конфликт.

  • Если input приложения отображается в INSTALLED_APPS сельдерее обнаружит это, так что вам не нужно, чтобы добавить его в CELERY_IMPORTS = ("input.tasks",), которые, может быть причина вашей проблемы, так как может быть загружена задача несколько раз

  • попробуйте дать своей задаче имя @task(name='input.tasks.add'), он будет знать, что это та же задача, независимо от того, как вы ее импортируете.

Глядя на вашу настройку, похоже, что вы используете старую версию сельдерея или используете старую конфигурацию для новой версии сельдерея.В любом случае, убедитесь, что у вас есть последняя версия и попробовать эту конфигурацию вместо того, что у вас есть:

BROKER_URL = 'amqp://<user>:<password>@localhost:5672/<vhost>' 
CELERY_RESULT_BACKEND = 'amqp' 
CELERY_ACCEPT_CONTENT = ['json'] 
CELERY_TASK_SERIALIZER = 'json' 
CELERY_RESULT_SERIALIZER = 'json' 

Теперь, вы также должны настроить сельдерей по-разному:

Избавьтесь от djcelery вещи полностью.

Создание proj/celery.py внутри Джанго проекта:

from __future__ import absolute_import 

import os 

from celery import Celery 

from django.conf import settings 

# set the default Django settings module for the 'celery' program. 
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings') 

app = Celery('proj') 

# Using a string here means the worker will not have to 
# pickle the object when using Windows. 
app.config_from_object('django.conf:settings') 
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) 

@app.task(bind=True) 
def debug_task(self): 
    print('Request: {0!r}'.format(self.request)) 

В вашем proj/__init__.py:

from __future__ import absolute_import 

from proj.celery import app as celery_app 

Тогда, если ваши input приложения является многоразовым приложением и не является частью вашего использования проекта @shared_task вместо @task декоратор.

Затем запустите сельдерей:

celery -A proj worker -l info 

Надеется, что это помогает.

Смежные вопросы