Я пытаюсь реализовать приложение Python, которое использует функции асинхронизации для приема и испускания сообщений с использованием NATS, используя client на основе Tornado. После получения сообщения необходимо вызвать функцию блокировки, которую я пытаюсь реализовать в отдельном потоке, чтобы разрешить прием и публикацию сообщений помещать сообщения в очередь Tornado для последующей обработки блокирующей функции.Дизайн асинхронной обработки запросов и блокировки с использованием Tornado
Я очень новичок в Tornado (и для многопоточности python), но после нескольких раз прочитав документацию Tornado и другие источники, я смог создать рабочую версию кода, которая выглядит так:
import tornado.gen
import tornado.ioloop
from tornado.queues import Queue
from concurrent.futures import ThreadPoolExecutor
from nats.io.client import Client as NATS
messageQueue = Queue()
nc = NATS()
@tornado.gen.coroutine
def consumer():
def processMessage(currentMessage):
# process the message ...
while True:
currentMessage = yield messageQueue.get()
try:
# execute the call in a separate thread to prevent blocking the queue
EXECUTOR.submit(processMessage, currentMessage)
finally:
messageQueue.task_done()
@tornado.gen.coroutine
def producer():
@tornado.gen.coroutine
def enqueueMessage(currentMessage):
yield messageQueue.put(currentMessage)
yield nc.subscribe("new_event", "", enqueueMessage)
@tornado.gen.coroutine
def main():
tornado.ioloop.IOLoop.current().spawn_callback(consumer)
yield producer()
if __name__ == '__main__':
main()
tornado.ioloop.IOLoop.current().start()
Мои вопросы:
1) Является ли это правильный способ использования Tornado для вызова функции блокировки?
2) Какова наилучшая практика для реализации схемы потребителя/производителя, которая всегда слушает? Я боюсь, что мой оператор while True:
фактически блокирует процессор ...
3) Как проверить очередь, чтобы убедиться, что пакет вызовов завершен? Я пробовал использовать Queue(). QSize(), но он всегда возвращает ноль, что заставляет меня задаться вопросом, правильно ли выполняется enqueuing или нет.
mind Ограничения GIL при многопоточности python. Я бы использовал PorcessPoolExceutor, если это возможно, например http://stackoverflow.com/questions/33553940/queue-and-processpoolexecutor-in-tornado – kwarunek
. Это интересный подход ... Я не знал, что у ProcessPoolExecutor была собственная очередь. Это также верно для ThreadPoolExecutor? Я также заинтересован в том, чтобы работать с одним рабочим максимум времени, так как события должны обрабатываться последовательно ... будут ли ограничения GIL влиять на меня и в таком случае? – jimijazz