2016-01-12 2 views
0

Я пытаюсь реализовать приложение Python, которое использует функции асинхронизации для приема и испускания сообщений с использованием NATS, используя client на основе Tornado. После получения сообщения необходимо вызвать функцию блокировки, которую я пытаюсь реализовать в отдельном потоке, чтобы разрешить прием и публикацию сообщений помещать сообщения в очередь Tornado для последующей обработки блокирующей функции.Дизайн асинхронной обработки запросов и блокировки с использованием Tornado

Я очень новичок в Tornado (и для многопоточности python), но после нескольких раз прочитав документацию Tornado и другие источники, я смог создать рабочую версию кода, которая выглядит так:

import tornado.gen 
import tornado.ioloop 
from tornado.queues import Queue 
from concurrent.futures import ThreadPoolExecutor 
from nats.io.client import Client as NATS 

messageQueue = Queue() 
nc = NATS() 
@tornado.gen.coroutine 
def consumer(): 
    def processMessage(currentMessage): 
     # process the message ... 

    while True: 
     currentMessage = yield messageQueue.get() 
     try: 
      # execute the call in a separate thread to prevent blocking the queue 
      EXECUTOR.submit(processMessage, currentMessage) 
     finally: 
      messageQueue.task_done() 

@tornado.gen.coroutine 
def producer(): 
    @tornado.gen.coroutine 
    def enqueueMessage(currentMessage): 
     yield messageQueue.put(currentMessage) 

    yield nc.subscribe("new_event", "", enqueueMessage) 

@tornado.gen.coroutine 
def main(): 
    tornado.ioloop.IOLoop.current().spawn_callback(consumer) 
    yield producer() 

if __name__ == '__main__': 
    main() 
    tornado.ioloop.IOLoop.current().start() 

Мои вопросы:

1) Является ли это правильный способ использования Tornado для вызова функции блокировки?

2) Какова наилучшая практика для реализации схемы потребителя/производителя, которая всегда слушает? Я боюсь, что мой оператор while True: фактически блокирует процессор ...

3) Как проверить очередь, чтобы убедиться, что пакет вызовов завершен? Я пробовал использовать Queue(). QSize(), но он всегда возвращает ноль, что заставляет меня задаться вопросом, правильно ли выполняется enqueuing или нет.

+0

mind Ограничения GIL при многопоточности python. Я бы использовал PorcessPoolExceutor, если это возможно, например http://stackoverflow.com/questions/33553940/queue-and-processpoolexecutor-in-tornado – kwarunek

+0

. Это интересный подход ... Я не знал, что у ProcessPoolExecutor была собственная очередь. Это также верно для ThreadPoolExecutor? Я также заинтересован в том, чтобы работать с одним рабочим максимум времени, так как события должны обрабатываться последовательно ... будут ли ограничения GIL влиять на меня и в таком случае? – jimijazz

ответ

1

Общее правило (кредиты NYKevin) составляет:

  • многопроцессорная для GPU и CPU-переплете вычислений.
  • Управляемый событиями материал для неблокирующего ввода-вывода (который должен быть предпочтительнее блокировки ввода-вывода, где это возможно, поскольку он масштабируется намного эффективнее).
  • Темы для блокировки ввода-вывода (вы также можете использовать многопроцессорную обработку, но накладные расходы для каждого процесса, вероятно, не стоят).

ThreadPoolExecutor для IO, ProcessPoolExecutor для CPU. Обе имеют внутреннюю очередь, как шкалу до не более указано max_workers. Более подробная информация о concurrent executors in docs.

Так отвечают являются:

  1. Переопределение бассейн накладные расходы. Тема или процесс зависят от того, что вы планируете делать.
  2. while True не блокирует, если у вас есть, например. некоторые давали асинхронные вызовы (даже yield gen.sleep(0.01)), он возвращает управление ioloop
  3. - это право на вызов, но поскольку я не запускал/отлаживал это, и я бы использовал другой подход (существующий пул), трудно найти проблему здесь.