2013-09-10 2 views
12

Сельдерей включает в себя модуль, который способен создавать асинхронные HTTP-запросы с использованием amqp или какого-либо другого ферментера. Я использую tornado-celery производитель для асинхронной публикации сообщений. Как я понял, tornado-celery использует для этого пику. Вопрос в том, как адаптировать celery.task.http.URL для торнадо (сделать его неблокирующим). В основном есть два места, которые необходимо уточнить:Адаптация celery.task.http.URL для торнадо

  1. HttpDispatch.make_request() должны быть реализованы с использованием tornado async http client;
  2. URL.get_async(**kw) или URL.post_async(**kw) необходимо переопределить соответствующим неблокирующим кодом с использованием API торнадо. Например:

    class NonBlockingURL(celery.task.http.URL): 
    
        @gen.coroutine 
        def post_async(self, **kwargs): 
         async_res = yield gen.Task(self.dispatcher.delay, 
                str(self), 'POST', **kwargs) 
         raise gen.Return(async_res) 
    

Но я не мог понять, как сделать это в надлежащем и краткой форме. Как сделать его полностью неблокирующим как асинхронным? Кстати, я использую бэкэнд amqp.

Пожалуйста, предоставьте мне хороший ориентир или еще лучше, пример.

+0

Я понимаю вашу цель. Но я не думаю, что полностью понял проблему, с которой вы столкнулись. Только для «неблокирующей части» вы пробовали [Eventlet] (https://github.com/celery/celery/tree/master/examples/eventlet) или [Планировщик] (http://docs.celeryproject.org) /en/latest/userguide/periodic-tasks.html) с опцией? – user2390183

+0

Просто понял, что это старый пост :) надеюсь, что у вас есть решение. pls делят интересное решение, которое вы, возможно, нашли – user2390183

+0

. Я поделюсь. Ждите парней. –

ответ

1

На самом деле вам нужно решить, используете ли вы метод асинхронизации Tornado или используете очередь, подобную элементу. Нет смысла использовать оба варианта, потому что очередь быстро отвечает за состояние очереди, поэтому нет смысла торнадо делать что-то еще, ожидая ответа очереди. Чтобы решить между двумя решениями, я бы сказал:

Сельдерей: более модульный, легко распространяемый на разные ядра или разные машины, задача может быть использована другими людьми, кроме торнадо, вам необходимо установить и продолжить работу в программном обеспечении (amqp , cellery рабочие ...)

Асинхронных в Торнадо: более монолитная, одна программы делать все, короткий код, одна программы для запуска

чтобы использовать метод асинхронного смерча, обратитесь к документации. Вот краткое решение с использованием сельдерея и торнадо вместе:

task.py
from celery import Celery,current_task 
import time 
celery=Celery('tasks',backend='amqp',result_backend='amqp') 

@celery.task 
def MyTask(url,resid): 
    for i in range(10): 
     time.sleep(1) 
     current_task.update_state(state='running',meta={'i': i}) 
    return 'done' 

server.py
import tasks 
from tornado import RequestHandler,.... 
from tornado.web import Application 
dictasks={} 
class runtask(RequestHandler): 
    def post(self): 
     i=len(dictasks) 
     dictasks[i]=task.MyTask.delay() 
      self.write(i) 

class chktask(RequestHandler): 
    def get(self,i): 
     i=int(i) 
     if dictasks[i].ready(): 
      self.write(dictasks[i].result) 
      del dictasks[i] 
     else: 
      self.write(dictasks[i].state + ' i:' + dictasks[i].info.get('i',-1)) 


Application = Application([ 
    (r"/runtask", runtask}), 
    (r"/chktask/([0-9]+)", chktask), 

etc. 
Смежные вопросы