2016-09-20 3 views
0

Я работаю над проектами, которые включают в себя много запросов на api, и для каждой обратной связи я принимаю решение и сохраняю в db. Я использую adbapi для связи с mysql.Закрученные реакторные блоки для длинных отложенных задач

Я получаю запрос как POST, содержащий список элементов, которые должны быть перенесены на удаленный api и сохранены.

Я заметил, что при обработке элементов в отложенном состоянии все остальные операции блокируются до тех пор, пока не будет выполнена одна часть.

Ниже приведены примеры, которые показывают что-то похожее на то, что я делаю.

#!/usr/bin/python2.7 

from twisted.web.server import Site 
from twisted.web.resource import Resource 
from twisted.internet import reactor, defer 
from twisted.web.server import NOT_DONE_YET 

from utils import send_mail, save_in_db 


def get_params(request): 
    params = {} 
    for k, v in request.args.items(): 
     if k and v: 
      params[k] = v[0] 
    return params 


class SendPage(Resource): 

    def render_POST(self, request): 
     params = get_params(request) 
     emails = params['emails'] 
     message = params['message'] 
     self.process_send_mail(message, emails) 
     request.write('Received') 
     request.finish() 
     return NOT_DONE_YET 

    def process_send_mail(self, message, emails): 
     defs = [] 
     for email in emails: 
      d = send_mail(email, message) 
      defs.append(d) 
     d1 = defer.DeferredList(defs) 
     d1.addCallback(self.process_save) 

    def process_save(self, result): 
     defs = [] 
     for r in result: 
      d = save_in_db(r) 
      defs.append(d) 
     d1 = defer.DeferredList(defs) 
     d1.addCallback(self.post_save) 

    def post_save(self, result): 
     print "request was completed" 


root = Resource() 
root.putChild("", SendPage()) 
factory = Site(root) 
reactor.listenTCP(8880, factory) 
reactor.run() 

В приведенных выше примерах, когда у меня есть много писем в списке, как 100000, когда я делаю send_mail он блокирует другие операции, до его законченным. Если я попытаюсь отправить другой запрос во время его выполнения, он блокируется до его завершения.

Мой вопрос в том, есть ли способ, которым я могу выполнять операции одновременно? Могу ли я send_mail и одновременно сохранять save_in_db? могу ли я сделать это, когда получаю другие запросы и обрабатываю, не дожидаясь, когда друг друга закончит?

ответ

0

Вы можете просто пропустить ждать результатов или ждать всех результатов: отправка и сохранение в базе данных следующим образом:

def process_send_mail(self, message, emails): 
    defs = [] 
    for email in emails: 
     d = send_mail(email, message) 
     defs.append(d) 
     d = save_in_db(email) 
     defs.append(d) 

    d1 = defer.DeferredList(defs) 
    d1.addCallback(self.post_save)  

def post_save(self): 
    print "request was completed" 
+0

Но этот результат вы зацикливаете в 'for r in result', не определен выше? я должен получить результат от send_mail, чтобы я мог его использовать. Я отметил, что отложенные письма ждут отправки всех писем. Я бы хотел, чтобы обрабатывать каждую почту и сохранять в db, а не ждать всех. –

+0

Исправлено. Я не знаю, что возвращает 'send_mail'. Используется электронная почта, чтобы передать ее в базу данных. Я предполагаю, что это параметр для перехода на 'save_in_db'. –

0

Один трюк я использовала в прошлом это сочетание inlineCallbacks и yield. В принципе, вы можете перебирать n количество элементов, затем yield или паузу в заданный интервал, чтобы реактор мог выполнять другие задачи. Таким образом, в вашем случае вы должны украсить все функции, которые имеют потенциально блокирующие петли с @inlineCallbacks, enumerate петля, затем yield/пауза в определенный момент, чтобы дать управление обратно в реактор.

@defer.inlineCallbacks 
def process_send_mail(self, message, emails): 
    defs = [] 
    for i, email in enumerate(emails): # enumerate 
     d = send_mail(email, message) 
     defs.append(d) 
     if i % 1000 == 0: 
      yield # pause every 1000 elements 
    d1 = defer.DeferredList(defs) 
    d1.addCallback(self.process_save) 

Вы должны настроить значение интервала в соответствии с вашими потребностями, как величина будет зависеть от того, насколько быстро результаты могут быть получены. Надеюсь это поможет.

0

есть два вопроса; Я обращусь к ним отдельно.

Первый: «Есть ли способ, которым я могу выполнять операции одновременно? Могу ли я send_mail и одновременным способом save_in_db»?

Ответ: да и нет. Вы не можете сделать это одновременно, потому что, насколько я могу судить, сохранение данных в БД требует некоторого результата от отправки почты. Но если вы имели в виду: могу ли я начать сохранять вещи в БД, как только получаю первый результат почты, не дожидаясь появления всех результатов электронной почты до сохранения вещей в БД - да, вы можете это сделать; просто объединить две функции обработки в одном:

def process_send_mail_and_save(self, message, emails): 
    defs = [] 
    for email in emails: 
     d = send_mail(email, message) 
     # might require tuning for save_in_db parameters if not matching send_mail callback output 
     d.addCallback(save_in_db) 
     defs.append(d) 
    d1 = defer.DeferredList(defs) 
    d1.addCallback(self.post_save) 

2) «я могу сделать это, как я получаю другие запросы и обрабатывать без необходимости ждать друг друга, чтобы закончить?»

Конечно, вы можете сделать это в Twisted. Но вы должны написать свой код правильно.Вы не говорите нам, что делают send_mail или save_in_db - я полагаю, вы их написали, и Я полагаю, что функции THOSE блокируют и вызывают большинство ваших проблем. - возможно, send_mail выполняет всю работу SMTP и только после ее завершения он возвращается? Он должен вернуть немедленно отложить, и обратный вызов, когда работа завершена:

http://twistedmatrix.com/documents/16.4.0/core/howto/clients.html

Я предлагаю вам ставить регистрации вызовов с временными метками вокруг send_mail и save_in_db функций - около момент вы их называете, не момент их отсроченные пожары.

Помните: вся отсрочка Twisted отложена, что отложенные возвращаются НЕМЕДЛЕННО, без блокировки, тогда как обратный вызов, который вы связываете с ними, срабатывает позже, когда что-то выполняется. Если ANYTHING блокирует ANYWHERE, Twisted ничего не может сделать - это однопоточная, в основном совместная многозадачность. Но Twisted не может превратить ваш код в неблокирующее магическое - вы должны это сделать.

Sidenote: способ использования сервера. NOT_DONE_YET бессмысленна. Просто верните «Received» в виде строки и забудьте объект запроса. Вы используете NOT_DONE_YET при вызове request.finish() где-то еще, а не сразу.

+0

Аллан, я не делаю никаких блокирующих вызовов в своем коде. И send_mail, и save_in_db не выполняют никаких блокирующих вызовов, возвращают отложенные. Как я уже сказал, проблема отмечена, когда запросов много (например, 50k запросов). Я отредактировал свой код так, как вы предложили, когда я начну сохранять, как только получаю ответ от функции send_mail, но я все же заметил, что метод сохранения запускается только после того, как все запросы для send_mail отложены, что может занять заметное время и в это время реактор не делает ничего другого. –

+0

Если вы не возвращаете его (например, return 'Received', как вы и предложили), я получаю исключение ('exceptions.RuntimeError: Request.write, вызываемый по запросу после того, как Request.finish был вызван'). Так ли это должно работать? –

+0

Да. Вы ничего не должны делать с запросом. Нет write(), no finish() - просто верните «Received». –

Смежные вопросы