2017-01-02 1 views
1

Я хотел бы настроить приложение Django для настройки электронной почты через Gmail с помощью очереди задач async. Я использую сельдерей, а Редис - моим брокером. Тем не менее, я не могу отправить электронную почту, когда я определяю сельдерей, как мой электронный бэкэндом - я получаю ошибку о том, что соединение не удалось:Настройка Django с Redis, Celery для отправки электронной почты через Gmail

...: 
--------------------------------------------------------------------------- 
AttributeError       Traceback (most recent call last) 
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/utils/functional.py in __call__(self) 
    35   try: 
---> 36    return self.__value__ 
    37   except AttributeError: 

AttributeError: 'ChannelPromise' object has no attribute '__value__' 

During handling of the above exception, another exception occurred: 

ConnectionRefusedError     Traceback (most recent call last) 
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _ensured(*args, **kwargs) 
    493      try: 
--> 494       return fun(*args, **kwargs) 
    495      except conn_errors as exc: 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/messaging.py in _publish(self, body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, immediate, exchange, declare) 
    186     immediate, exchange, declare): 
--> 187   channel = self.channel 
    188   message = channel.prepare_message(

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/messaging.py in _get_channel(self) 
    208   if isinstance(channel, ChannelPromise): 
--> 209    channel = self._channel = channel() 
    210    self.exchange.revive(channel) 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/utils/functional.py in __call__(self) 
    37   except AttributeError: 
---> 38    value = self.__value__ = self.__contract__() 
    39    return value 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/messaging.py in <lambda>() 
    223    self.__connection__ = connection 
--> 224    channel = ChannelPromise(lambda: connection.default_channel) 
    225   if isinstance(channel, ChannelPromise): 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in default_channel(self) 
    818   # make sure we're still connected, and if not refresh. 
--> 819   self.connection 
    820   if self._default_channel is None: 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in connection(self) 
    801     self._default_channel = None 
--> 802     self._connection = self._establish_connection() 
    803     self._closed = False 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _establish_connection(self) 
    756   self._debug('establishing connection...') 
--> 757   conn = self.transport.establish_connection() 
    758   self._debug('connection established: %r', self) 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/transport/pyamqp.py in establish_connection(self) 
    129   conn.client = self.client 
--> 130   conn.connect() 
    131   return conn 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/connection.py in connect(self, callback) 
    293  ) 
--> 294   self.transport.connect() 
    295   self.on_inbound_frame = self.frame_handler_cls(

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/transport.py in connect(self) 
    119  def connect(self): 
--> 120   self._connect(self.host, self.port, self.connect_timeout) 
    121   self._init_socket(

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/transport.py in _connect(self, host, port, timeout) 
    160     self.sock.settimeout(timeout) 
--> 161     self.sock.connect(sa) 
    162    except socket.error: 

ConnectionRefusedError: [Errno 61] Connection refused 

During handling of the above exception, another exception occurred: 

ConnectionRefusedError     Traceback (most recent call last) 
/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _reraise_as_library_errors(self, ConnectionError, ChannelError) 
    413   try: 
--> 414    yield 
    415   except (ConnectionError, ChannelError): 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _ensured(*args, **kwargs) 
    514        interval_start, interval_step, interval_max, 
--> 515        reraise_as_library_errors=False, 
    516      ) 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in ensure_connection(self, errback, max_retries, interval_start, interval_step, interval_max, callback, reraise_as_library_errors) 
    404        interval_start, interval_step, interval_max, 
--> 405        callback) 
    406   return self 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/utils/functional.py in retry_over_time(fun, catch, args, kwargs, errback, max_retries, interval_start, interval_step, interval_max, callback) 
    332   try: 
--> 333    return fun(*args, **kwargs) 
    334   except catch as exc: 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in connect(self) 
    260   self._closed = False 
--> 261   return self.connection 
    262 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in connection(self) 
    801     self._default_channel = None 
--> 802     self._connection = self._establish_connection() 
    803     self._closed = False 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _establish_connection(self) 
    756   self._debug('establishing connection...') 
--> 757   conn = self.transport.establish_connection() 
    758   self._debug('connection established: %r', self) 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/transport/pyamqp.py in establish_connection(self) 
    129   conn.client = self.client 
--> 130   conn.connect() 
    131   return conn 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/connection.py in connect(self, callback) 
    293  ) 
--> 294   self.transport.connect() 
    295   self.on_inbound_frame = self.frame_handler_cls(

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/transport.py in connect(self) 
    119  def connect(self): 
--> 120   self._connect(self.host, self.port, self.connect_timeout) 
    121   self._init_socket(

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/transport.py in _connect(self, host, port, timeout) 
    160     self.sock.settimeout(timeout) 
--> 161     self.sock.connect(sa) 
    162    except socket.error: 

ConnectionRefusedError: [Errno 61] Connection refused 

During handling of the above exception, another exception occurred: 

OperationalError       Traceback (most recent call last) 
<ipython-input-1-75e7f965af5d> in <module>() 
     1 from django.core.mail import send_mail 
----> 2 send_mail('test email', 'hello world', '[email protected]', recipient_list=['[email protected]'], fail_silently=False) 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/django/core/mail/__init__.py in send_mail(subject, message, from_email, recipient_list, fail_silently, auth_user, auth_password, connection, html_message) 
    59   mail.attach_alternative(html_message, 'text/html') 
    60 
---> 61  return mail.send() 
    62 
    63 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/django/core/mail/message.py in send(self, fail_silently) 
    290    # send to. 
    291    return 0 
--> 292   return self.get_connection(fail_silently).send_messages([self]) 
    293 
    294  def attach(self, filename=None, content=None, mimetype=None): 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/djcelery_email/backends.py in send_messages(self, email_messages) 
    15   messages = [email_to_dict(msg) for msg in email_messages] 
    16   for chunk in chunked(messages, settings.CELERY_EMAIL_CHUNK_SIZE): 
---> 17    result_tasks.append(send_emails.delay(chunk, self.init_kwargs)) 
    18   return result_tasks 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/celery/app/task.py in delay(self, *args, **kwargs) 
    410    celery.result.AsyncResult: Future promise. 
    411   """ 
--> 412   return self.apply_async(args, kwargs) 
    413 
    414  def apply_async(self, args=None, kwargs=None, task_id=None, producer=None, 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/celery/app/task.py in apply_async(self, args, kwargs, task_id, producer, link, link_error, shadow, **options) 
    533    link=link, link_error=link_error, result_cls=self.AsyncResult, 
    534    shadow=shadow, task_type=self, 
--> 535    **options 
    536  ) 
    537 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/celery/app/base.py in send_task(self, name, args, kwargs, countdown, eta, task_id, producer, connection, router, result_cls, expires, publisher, link, link_error, add_to_parent, group_id, retries, chord, reply_to, time_limit, soft_time_limit, root_id, parent_id, route_name, shadow, chain, task_type, **options) 
    735    with P.connection._reraise_as_library_errors(): 
    736     self.backend.on_task_call(P, task_id) 
--> 737     amqp.send_task_message(P, name, message, **options) 
    738   result = (result_cls or self.AsyncResult)(task_id) 
    739   if add_to_parent: 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/celery/app/amqp.py in send_task_message(producer, name, message, exchange, routing_key, queue, event_dispatcher, retry, retry_policy, serializer, delivery_mode, compression, declare, headers, exchange_type, **kwargs) 
    556     delivery_mode=delivery_mode, declare=declare, 
    557     headers=headers2, 
--> 558     **properties 
    559   ) 
    560    if after_receivers: 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/messaging.py in publish(self, body, routing_key, delivery_mode, mandatory, immediate, priority, content_type, content_encoding, serializer, headers, compression, exchange, retry, retry_policy, declare, expiration, **properties) 
    179    body, priority, content_type, content_encoding, 
    180    headers, properties, routing_key, mandatory, immediate, 
--> 181    exchange_name, declare, 
    182  ) 
    183 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _ensured(*args, **kwargs) 
    525       self._debug('ensure channel error: %r', 
    526          exc, exc_info=1) 
--> 527       errback and errback(exc, 0) 
    528   _ensured.__name__ = bytes_if_py2('{0}(ensured)'.format(fun.__name__)) 
    529   _ensured.__doc__ = fun.__doc__ 

/usr/local/Cellar/python3/3.4.3_2/Frameworks/Python.framework/Versions/3.4/lib/python3.4/contextlib.py in __exit__(self, type, value, traceback) 
    75     value = type() 
    76    try: 
---> 77     self.gen.throw(type, value, traceback) 
    78     raise RuntimeError("generator didn't stop after throw()") 
    79    except StopIteration as exc: 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _reraise_as_library_errors(self, ConnectionError, ChannelError) 
    417   except self.recoverable_connection_errors as exc: 
    418    reraise(ConnectionError, ConnectionError(text_t(exc)), 
--> 419      sys.exc_info()[2]) 
    420   except self.recoverable_channel_errors as exc: 
    421    reraise(ChannelError, ChannelError(text_t(exc)), 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/vine/five.py in reraise(tp, value, tb) 
    173   """Reraise exception.""" 
    174   if value.__traceback__ is not tb: 
--> 175    raise value.with_traceback(tb) 
    176   raise value 
    177 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _reraise_as_library_errors(self, ConnectionError, ChannelError) 
    412    ChannelError=exceptions.OperationalError): 
    413   try: 
--> 414    yield 
    415   except (ConnectionError, ChannelError): 
    416    raise 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _ensured(*args, **kwargs) 
    513        remaining_retries, 
    514        interval_start, interval_step, interval_max, 
--> 515        reraise_as_library_errors=False, 
    516      ) 
    517       channel = self.default_channel 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in ensure_connection(self, errback, max_retries, interval_start, interval_step, interval_max, callback, reraise_as_library_errors) 
    403       (), {}, on_error, max_retries, 
    404        interval_start, interval_step, interval_max, 
--> 405        callback) 
    406   return self 
    407 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/utils/functional.py in retry_over_time(fun, catch, args, kwargs, errback, max_retries, interval_start, interval_step, interval_max, callback) 
    331  for retries in count(): 
    332   try: 
--> 333    return fun(*args, **kwargs) 
    334   except catch as exc: 
    335    if max_retries and retries >= max_retries: 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in connect(self) 
    259   """Establish connection to server immediately.""" 
    260   self._closed = False 
--> 261   return self.connection 
    262 
    263  def channel(self): 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in connection(self) 
    800     self.declared_entities.clear() 
    801     self._default_channel = None 
--> 802     self._connection = self._establish_connection() 
    803     self._closed = False 
    804    return self._connection 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/connection.py in _establish_connection(self) 
    755  def _establish_connection(self): 
    756   self._debug('establishing connection...') 
--> 757   conn = self.transport.establish_connection() 
    758   self._debug('connection established: %r', self) 
    759   return conn 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/kombu/transport/pyamqp.py in establish_connection(self) 
    128   conn = self.Connection(**opts) 
    129   conn.client = self.client 
--> 130   conn.connect() 
    131   return conn 
    132 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/connection.py in connect(self, callback) 
    292    socket_settings=self.socket_settings, 
    293  ) 
--> 294   self.transport.connect() 
    295   self.on_inbound_frame = self.frame_handler_cls(
    296    self, self.on_inbound_method) 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/transport.py in connect(self) 
    118 
    119  def connect(self): 
--> 120   self._connect(self.host, self.port, self.connect_timeout) 
    121   self._init_socket(
    122    self.socket_settings, self.read_timeout, self.write_timeout, 

/Users/user/virtualenvs/myapp/lib/python3.4/site-packages/amqp/transport.py in _connect(self, host, port, timeout) 
    159      pass 
    160     self.sock.settimeout(timeout) 
--> 161     self.sock.connect(sa) 
    162    except socket.error: 
    163     self.sock.close() 

OperationalError: [Errno 61] Connection refused 

Попытка отладки выходов к следующему:

Я способный успешно запустить сервер redis и выполнить ping. Однако, когда я пытаюсь запустить работника сельдерея, используя celery worker -A myapp -l info -c 5, я получаю сообщение об ошибке. Я предполагаю, что это связано с аутентификацией? Похоже, он просто пытается войти в систему как анонимный пользователь, но я не уверен, как передать ему имя пользователя и пароль.

сельдерея рабочего -A MyApp -l Информация -c 5

-------------- [email protected] v4.0.2 (latentcall) 
---- **** ----- 
--- * *** * -- Darwin-15.6.0-x86_64-i386-64bit 2017-01-02 19:58:38 
-- * - **** --- 
- ** ---------- [config] 
- ** ---------- .> app:   myapp:0x10dc15400 
- ** ---------- .> transport: amqp://guest:**@localhost:5672// 
- ** ---------- .> results:  disabled:// 
- *** --- * --- .> concurrency: 5 (prefork) 
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) 
--- ***** ----- 
-------------- [queues] 
       .> celery   exchange=celery(direct) key=celery 


[tasks] 
    . djcelery_email_send 
    . myapp.celery.debug_task 

[2017-01-02 19:58:39,187: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused. 
Trying again in 2.00 seconds... 

[2017-01-02 19:58:41,199: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused. 
Trying again in 4.00 seconds... 

[2017-01-02 19:58:45,219: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused. 
Trying again in 6.00 seconds... 

[2017-01-02 19:58:51,245: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 61] Connection refused. 
Trying again in 8.00 seconds... 

Я использую djcelery_email, есть файл в myapp/myapp/__init__.py, который выглядит следующим образом:

# This will make sure the app is always imported when 
# Django starts so that shared_task will use this app. 
from .celery import app as celery_app 

__all__ = ['celery_app'] 

, а также celery.py файла на том же уровне, что и settings.py

import os 
from celery import Celery 
from celery.bin import Option 

# set the default Django settings module for the 'celery' program. 
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings') 

app = Celery('myapp') 

# Using a string here means the worker don't have to serialize 
# the configuration object to child processes. 
# - namespace='CELERY' means all celery-related configuration keys 
# should have a `CELERY_` prefix. 
app.config_from_object('django.conf:settings', namespace='CELERY') 


# Load task modules from all registered Django app configs. 
app.autodiscover_tasks() 


@app.task(bind=True) 
def debug_task(self): 
    print('Request: {0!r}'.format(self.request)) 

и имеют следующий `settings.py:

# Email 

CACHES = { 
    "default": { 
     "BACKEND": "django_redis.cache.RedisCache", 
     "LOCATION": "redis://127.0.0.1:6379/1", 
     "OPTIONS": { 
      "CLIENT_CLASS": "django_redis.client.DefaultClient", 
     } 
    } 
} 

EMAIL_HOST = 'smtp.gmail.com' # since you are using a gmail account 
EMAIL_PORT = 587 # Gmail SMTP port for TLS 
EMAIL_USE_TLS = True 
EMAIL_HOST_USER = '[email protected]' 
EMAIL_HOST_PASSWORD = 'secret_password' 
EMAIL_BACKEND = 'djcelery_email.backends.CeleryEmailBackend' # Use Celery for sending emails 
CELERY_EMAIL_TASK_CONFIG = { 
    'name': 'djcelery_email_send', 
    'ignore_result': True, 
} 

BROKER_URL = 'redis://127.0.0.1:6379/1' 

какие изменения мне нужно сделать, чтобы иметь возможность отправлять электронную почту с помощью Redis, сельдерей, и Django?

ответ

2

Эта ошибка не имеет ничего общего с электронной почтой. Сельдерей пытается подключиться к брокеру сообщений через протокол amqp, что подразумевает RabbitMQ; но вы говорите, что вы создали Редиса в качестве брокера. Таким образом, сельдерей не поднимает эту настройку.

Причина этого в том, что вам необходимо префикс настроек, связанных с сельдерей, с помощью CELERY_ в вашем файле settings.py; поэтому BROKER_URL должен быть CELERY_BROKER_URL.

+0

Исправить. Это связано с: 'app.config_from_object ('django.conf: settings', namespace = 'CELERY')' – chefarov

Смежные вопросы