У меня есть asyncio.Protocol
подкласс, получающий данные с сервера. Я сохраняю эти данные (каждая строка, потому что данные являются текстовыми) в asyncio.Queue
.asyncio queue consumer coroutine
import asyncio
q = asyncio.Queue()
class StreamProtocol(asyncio.Protocol):
def __init__(self, loop):
self.loop = loop
self.transport = None
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
for message in data.decode().splitlines():
yield q.put(message.rstrip())
def connection_lost(self, exc):
self.loop.stop()
loop = asyncio.get_event_loop()
coro = loop.create_connection(lambda: StreamProtocol(loop),
'127.0.0.1', '42')
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
Я хочу иметь другой сопроцессор, ответственный за потребление данных в очереди и обработку.
- Должно ли это быть
asyncio.Task
? - Что делать, если очередь становится пустой, так как в течение нескольких секунд данные не принимаются? Как я могу убедиться, что мой потребитель не останавливается (
run_until_complete
)? - Есть ли более чистый способ, чем использование глобальной переменной для моей очереди?
Ваш код не так, извините: 'data_received' должна быть регулярной функцией, а не сопрограммная с 'yield' внутри. Кроме того, 'asyncio.Queue' требует' yield from', а не только 'yield'. –
Ах, правильно. Я сказал, что не тестировал его, чтобы дать представление о том, что я хотел сделать. – valentin