2014-07-16 2 views
6

У меня проблема с каждым запросом на вставку (маленький запрос), который выполняется в задачах сельдерея асинхронно. В режиме синхронизации, когда я делать вставки все сделано замечательно, но когда он выполняется в apply_async() я получаю это:Python cassandra-driver OperationTimeOut для каждого запроса в задаче Сельдерея

OperationTimedOut('errors=errors=errors={}, last_host=***.***.*.***, last_host=None, last_host=None',) 

Traceback:

Traceback (most recent call last): 
    File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task 
    R = retval = fun(*args, **kwargs) 
    File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/celery/app/trace.py", line 437, in __protected_call__ 
    return self.run(*args, **kwargs) 
    File "/var/nfs_www/***/www_v1/app/mods/news_feed/tasks.py", line 26, in send_new_comment_reply_notifications 
    send_new_comment_reply_notifications_method(comment_id) 
    File "/var/nfs_www/***www_v1/app/mods/news_feed/methods.py", line 83, in send_new_comment_reply_notifications 
    comment_type='comment_reply' 
    File "/var/nfs_www/***/www_v1/app/mods/news_feed/models/storage.py", line 129, in add 
    CommentsFeed(**kwargs).save() 
    File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/models.py", line 531, in save 
    consistency=self.__consistency__).save() 
    File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/query.py", line 907, in save 
    self._execute(insert) 
    File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/query.py", line 786, in _execute 
    tmp = execute(q, consistency_level=self._consistency) 
    File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cqlengine/connection.py", line 95, in execute 
    result = session.execute(query, params) 
    File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cassandra/cluster.py", line 1103, in execute 
    result = future.result(timeout) 
    File "/var/nfs_www/***/env_v0/local/lib/python2.7/site-packages/cassandra/cluster.py", line 2475, in result 
    raise OperationTimedOut(errors=self._errors, last_host=self._current_host) 
OperationTimedOut: errors={}, last_host=***.***.*.*** 

Кто-нибудь есть идеи о проблеме?

Я нашел это When cassandra-driver was executing the query, cassandra-driver returned error OperationTimedOut, но мой запрос очень маленький и проблема только в задачах сельдерея.

UPDATE:

Я сделал тестовое задание, и это вызывает эту ошибку тоже.

@celery.task() 
def test_task_with_cassandra(): 
    from app import cassandra_session 
    cassandra_session.execute('use news_feed') 
    return 'Done' 

UPDATE 2: Сделано это:

@celery.task() 
def test_task_with_cassandra(): 
    from cqlengine import connection 
    connection.setup(app.config['CASSANDRA_SERVERS'], port=app.config['CASSANDRA_PORT'], 
        default_keyspace='test_keyspace') 
    from .models import Feed 
    Feed.objects.count() 
    return 'Done' 

Got это:

NoHostAvailable('Unable to connect to any servers', {'***.***.*.***': OperationTimedOut('errors=errors=Timed out creating connection, last_host=None, last_host=None',)}) 

Из оболочки я могу подключиться к нему

UPDATE 3: Из удаляемого резьбы на GitHub выпуска (нашел это в моей электронной почты): (это работает для меня тоже) Вот как, по сути, я подключу CQLengine к сельдерея:

from celery import Celery 
from celery.signals import worker_process_init, beat_init 
from cqlengine import connection 
from cqlengine.connection import (
    cluster as cql_cluster, session as cql_session) 

def cassandra_init(): 
    """ Initialize a clean Cassandra connection. """ 
    if cql_cluster is not None: 
     cql_cluster.shutdown() 
    if cql_session is not None: 
     cql_session.shutdown() 
    connection.setup() 

# Initialize worker context for both standard and periodic tasks. 
worker_process_init.connect(cassandra_init) 
beat_init.connect(cassandra_init) 

app = Celery() 

Это грубо, но работает , Следует ли добавить этот фрагмент в FAQ?

+0

Имеет ли ваш пользователь сельдерея разрешения на выполнение запросов? – Banana

+0

Хм, я не устанавливал для него никаких поставщиков auth. Или что вы имели в виду? Где я должен искать? Я обновил свой вопрос. –

+0

Вы решили это?У меня такая же проблема, и я понятия не имею, что происходит! – haifzhan

ответ

5

У меня была аналогичная проблема. Казалось, что это связано с тем, что он разделяет сессию Кассандры между задачами. Я решил это, создав сеанс на поток. Убедитесь, что вы называете get_session() от вас задач, а затем сделать это:

thread_local = threading.local() 

def get_session(): 
    if hasattr(thread_local, "cassandra_session"): 
     return thread_local.cassandra_session 

    cluster = Cluster(settings.CASSANDRA_HOSTS) 
    session = cluster.connect(settings.CASSANDRA_KEYSPACE) 

    thread_local.cassandra_session = session 

    return session 
+0

это действительно помогает, каждый поток/процесс владеет собственной сессией! – haifzhan

1

Вдохновленный ответ Рон, я придумал следующий код, чтобы поместить в tasks.py:

import threading 
from django.conf import settings 
from cassandra.cluster import Cluster 
from celery.signals import worker_process_init,worker_process_shutdown 

thread_local = threading.local() 

@worker_process_init.connect 
def open_cassandra_session(*args, **kwargs): 
    cluster = Cluster([settings.DATABASES["cassandra"]["HOST"],], protocol_version=3) 
    session = cluster.connect(settings.DATABASES["cassandra"]["NAME"]) 
    thread_local.cassandra_session = session 

@worker_process_shutdown.connect 
def close_cassandra_session(*args,**kwargs): 
    session = thread_local.cassandra_session 
    session.shutdown() 
    thread_local.cassandra_session = None 

Это изящное решение будет автоматически открывать/закрывать сеансы cassandra, когда процесс работника сельдерея начинается и останавливается.

Сторона примечания: protocol_version = 3, поскольку Cassandra 2.1 поддерживает только версии протокола 3 и ниже.

+0

Я поддержал этот ответ, но я передумал - на самом деле это не работало для меня. Я написал другой ответ. –

2

Другие ответы не помогли мне, но вопрос «обновление 3» сделал. Вот что я получил (небольшие обновления к предложению в вопросе):

from celery.signals import worker_process_init 
from cassandra.cqlengine import connection 
from cassandra.cqlengine.connection import (
    cluster as cql_cluster, session as cql_session) 

def cassandra_init(*args, **kwargs): 
    """ Initialize a clean Cassandra connection. """ 
    if cql_cluster is not None: 
     cql_cluster.shutdown() 
    if cql_session is not None: 
     cql_session.shutdown() 
    connection.setup([settings.DATABASES["cassandra"]["HOST"],], settings.DATABASES["cassandra"]["NAME"]) 

# Initialize worker context (only standard tasks) 
worker_process_init.connect(cassandra_init) 
+0

Вы спасли мой день, мой друг –

+0

снова вы спасли день для другого друга ... спасибо. –

Смежные вопросы