2016-08-11 2 views
0

Я пытаюсь использовать несколько очередей одновременно с использованием python, asyncio и asynqp.Asyncio и rabbitmq (asynqp): как потреблять из нескольких очередей одновременно

Я не понимаю, почему мой вызов asyncio.sleep() не имеет никакого эффекта. Код не приостанавливается. Справедливости ради, я действительно не понимаю, в каком контексте выполняется обратный вызов, и могу ли я полностью передать управление байком в цикле событий (так что вызов asyncio.sleep() имел бы смысл).

Что делать, если мне пришлось использовать вызов функции aiohttp.ClientSession.get() в моей функции обратного вызова process_msg? Я не могу это сделать, потому что это не сопрограмма. Должен быть способ, который выходит за рамки моего нынешнего понимания асинчо.

#!/usr/bin/env python3 

import asyncio 
import asynqp 


USERS = {'betty', 'bob', 'luis', 'tony'} 


def process_msg(msg): 
    asyncio.sleep(10) 
    print('>> {}'.format(msg.body)) 
    msg.ack() 

async def connect(): 
    connection = await asynqp.connect(host='dev_queue', virtual_host='asynqp_test') 
    channel = await connection.open_channel() 
    exchange = await channel.declare_exchange('inboxes', 'direct') 

    # we have 10 users. Set up a queue for each of them 
    # use different channels to avoid any interference 
    # during message consumption, just in case. 
    for username in USERS: 
     user_channel = await connection.open_channel() 
     queue = await user_channel.declare_queue('Inbox_{}'.format(username)) 
     await queue.bind(exchange, routing_key=username) 
     await queue.consume(process_msg) 

    # deliver 10 messages to each user 
    for username in USERS: 
     for msg_idx in range(10): 
      msg = asynqp.Message('Msg #{} for {}'.format(msg_idx, username)) 
      exchange.publish(msg, routing_key=username) 


loop = asyncio.get_event_loop() 
loop.run_until_complete(connect()) 
loop.run_forever() 

ответ

1

Я не понимаю, почему мой вызов функции asyncio.sleep() не имеет любого эффекта.

Поскольку asyncio.sleep() возвращает будущий объект, который должен быть использован в комбинации с циклом обработки событий (или async/await семантики).

Вы не можете использовать await в простой декларации def, потому что обратный вызов вызывается вне контекста async/await, который прикреплен к какому-либо контуру события под капотом. Другими словами, сочетание стиля обратного вызова с использованием стиля async/await довольно сложно.

Простое решение, хотя это планировать работу обратно в цикл обработки событий:

async def process_msg(msg): 
    await asyncio.sleep(10) 
    print('>> {}'.format(msg.body)) 
    msg.ack() 

def _process_msg(msg): 
    loop = asyncio.get_event_loop() 
    loop.create_task(process_msg(msg)) 
    # or if loop is always the same one single line is enough 
    # asyncio.ensure_future(process_msg(msg)) 

# some code 
await queue.consume(_process_msg) 

Обратите внимание, что нет никакой рекурсии в _process_msg функции, то есть тело process_msg не выполняется в то время как в _process_msg. Внутренняя функция process_msg вызывается, как только элемент управления возвращается к циклу событий.

Это может быть обобщено следующим кодом:

def async_to_callback(coro): 
    def callback(*args, **kwargs): 
     asyncio.ensure_future(coro(*args, **kwargs)) 
    return callback 

async def process_msg(msg): 
    # the body 

# some code 
await queue.consume(async_to_callback(process_msg)) 
Смежные вопросы