2015-07-07 3 views
1

Я пытаюсь использовать функцию post_save для Django Signals в сочетании с задачами Celery. После того, как новый объект сообщения будет сохранен в базе данных, я хочу оценить, имеет ли экземпляр один из двух атрибутов, и если это так, вызовите функцию send_sms_function, которая является зарегистрированной сельдерием.Selery + Django Signals

tasks.py

from my_project.celery import app 

@app.task 
def send_sms_message(message): 
    # Do something 

signals.py

from django.db.models.signals import post_save 
from django.dispatch import receiver 

import rollbar 
rollbar.init('234...0932', 'production') 

from dispatch.models import Message 
from comm.tasks import send_sms_message 


@receiver(post_save, sender=Message) 
def send_outgoing_messages(sender, instance, **kwargs): 

if instance.some_attribute == 'A' or instance.some_attribute == 'B': 
    try: 
     send_sms_message.delay(instance) 
    except: 
     rollbar.report_exc_info() 
else: 
    pass 

Я проверяю это локально, запустив сельдерей работника. Когда я нахожусь в оболочке Django и вызываю функцию Сельдерея, он работает так, как ожидалось. Однако, когда я сохраняю экземпляр Message в базе данных, функция делает не работает должным образом: в очередь задач ничего не отправлено, и я не вижу сообщений об ошибках.

Что я делаю неправильно?

ответ

1

Выражение if instance.some_attribute == 'A' or 'B', вероятно, ваша проблема.

То, что вы, вероятно, имеете в виду:

if instance.some_attribute == 'A' or instance.some_attribute == 'B'

Или, как я бы писать:

if instance.some_attribute in ('A', 'B') 
1

вы вызываете функцию синхронно вместо очередей его:

send_sms_message.delay(instance)

shou л.д. сообщение в очередь

http://celery.readthedocs.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.delay

http://celery.readthedocs.org/en/latest/userguide/calling.html#basics

@dgel также указывает на логическую ошибку

+0

Thanks @ dm03514 Я обновил свой код здесь и запускал его локально, но функция 'send_sms_message' все еще не попадает в очередь задач и не запускает. Любые другие идеи о том, что я делаю неправильно? –

+0

Мне показалось, что я где-то читал, что вы не должны создавать запросы из экземпляра attr, но не знаете, почему это так, или если это правда. Просто мысль. –

1

Это выглядит как проблема с сериализации и/или ваши настройки. Когда сельдерей передает сообщение вашему брокеру, он должен иметь некоторое представление данных. Сельдерей сериализует аргументы, которые вы даете задаче, но если вы не настроили их последовательно с тем, что вы проходите (т. Е. У вас есть несоответствие, когда ваш брокер ожидает JSON, но вы отправляете его маринованным объектом python), задачи могут сбой просто потому что рабочий не может легко декодировать то, что вы его отправляете. Если вы запустите функцию в своей оболочке (без вызова на задержку), она вызывается синхронно, поэтому нет сериализации или передачи сообщений.

В ваших настройках вы должны использовать сериализацию JSON (если у вас нет действительно веской причины), но если нет, возможно, что-то не так с вашим травлением. Вы всегда можете увеличить уровень протоколирования для отладки при запуске сельдерея, чтобы увидеть больше о сериализации ошибки, связанной с:

celery -A yourapp worker -l debug

Если вы сомневаетесь, используйте этот оператор печати/функцию, убедитесь, что ваш приемник сигнала работает.Если нет, вы можете создать класс AppConfig, который импортирует ваши приемники в его метод ready или какой-либо другой разумный метод для того, чтобы ваши регистраторы регистрировались.

[мнение] я предложил бы делать что-то вроде этого:

@receiver(post_save, sender=Message) 
def send_outgoing_messages(sender, instance, **kwargs): 
    enqueue_message.delay(instance.id) 

в yourmodule/tasks.py

@app.task 
def enqueue_message(message_id): 
    msg = Message.object.get(id=message_id) 
    if msg.some_attribute in ('A', 'B'): # slick or 
     send_sms_message.delay(message_id) 

Вы всегда можете использовать композиционный сельдерей, но здесь у вас есть что-то, что не добавляет сложность вашего цикла запроса/ответа. [/ opinion]

Смежные вопросы