2016-02-01 3 views
7

У меня есть asyncio.Protocol подкласс, получающий данные с сервера. Я сохраняю эти данные (каждая строка, потому что данные являются текстовыми) в asyncio.Queue.asyncio queue consumer coroutine

import asyncio 

q = asyncio.Queue() 

class StreamProtocol(asyncio.Protocol): 
    def __init__(self, loop): 
     self.loop = loop 
     self.transport = None 

    def connection_made(self, transport): 
     self.transport = transport 

    def data_received(self, data): 
     for message in data.decode().splitlines(): 
      yield q.put(message.rstrip()) 

    def connection_lost(self, exc): 
     self.loop.stop() 

loop = asyncio.get_event_loop() 
coro = loop.create_connection(lambda: StreamProtocol(loop), 
           '127.0.0.1', '42') 
loop.run_until_complete(coro) 
loop.run_forever() 
loop.close() 

Я хочу иметь другой сопроцессор, ответственный за потребление данных в очереди и обработку.

  • Должно ли это быть asyncio.Task?
  • Что делать, если очередь становится пустой, так как в течение нескольких секунд данные не принимаются? Как я могу убедиться, что мой потребитель не останавливается (run_until_complete)?
  • Есть ли более чистый способ, чем использование глобальной переменной для моей очереди?
+0

Ваш код не так, извините: 'data_received' должна быть регулярной функцией, а не сопрограммная с 'yield' внутри. Кроме того, 'asyncio.Queue' требует' yield from', а не только 'yield'. –

+0

Ах, правильно. Я сказал, что не тестировал его, чтобы дать представление о том, что я хотел сделать. – valentin

ответ

5

Должно ли это быть asyncio.Task?

Да, создайте его, используя asyncio.ensure_future или loop.create_task.

Что делать, если очередь пуста, потому что на несколько секунд данные не получены?

Просто используйте queue.get ждать, пока деталь не будет доступен:

async def consume(queue): 
    while True: 
     item = await queue.get() 
     print(item) 

Есть ли способ очистки, чем при использовании глобальной переменной для моей очереди?

Да, просто передать его в качестве аргумента к протоколу потребитель сопрограммного и потока:

class StreamProtocol(asyncio.Protocol): 
    def __init__(self, loop, queue): 
     self.loop = loop 
     self.queue = queue 

    def data_received(self, data): 
     for message in data.decode().splitlines(): 
      self.queue.put_nowait(message.rstrip()) 

    def connection_lost(self, exc): 
     self.loop.stop() 

Как я могу убедиться, что мой потребитель не останавливается (run_until_complete)?

Как только соединение будет закрыто, используйте queue.join, чтобы подождать, пока очередь не будет пуста.


Полный пример:

loop = asyncio.get_event_loop() 
queue = asyncio.Queue() 
# Connection coroutine 
factory = lambda: StreamProtocol(loop, queue) 
connection = loop.create_connection(factory, '127.0.0.1', '42') 
# Consumer task 
consumer = asyncio.ensure_future(consume(queue)) 
# Set up connection 
loop.run_until_complete(connection) 
# Wait until the connection is closed 
loop.run_forever() 
# Wait until the queue is empty 
loop.run_until_complete(queue.join()) 
# Cancel the consumer 
consumer.cancel() 
# Let the consumer terminate 
loop.run_until_complete(consumer) 
# Close the loop 
loop.close() 

В качестве альтернативы, вы можете также использовать streams:

async def tcp_client(host, port, loop=None): 
    reader, writer = await asyncio.open_connection(host, port, loop=loop) 
    async for line in reader: 
     print(line.rstrip()) 
    writer.close() 

loop = asyncio.get_event_loop() 
loop.run_until_complete(tcp_client('127.0.0.1', 42, loop)) 
loop.close() 
+0

Спасибо! Похоже на правильный способ сделать это. Я думаю, что есть проблема с вашим полным примером, переменная 'coro' не существует – valentin

+0

@toogy Правда, я просто ее исправил. – Vincent

+0

Отлично. Просто последнее. Что, если я хочу, чтобы мой потребитель был больше чем функция (я имею в виду класс)? Должен ли я просто наследовать класс 'asyncio.Task'? – valentin

Смежные вопросы