Я нашел его относительно просто подкласс asyncio.streams библиотеки классов.
Функция start_server приподнимается на примере TCP сервера:
@asyncio.coroutine
def start_server(self, loop):
def factory():
reader = QbpStreamReader()
return QbpStreamReaderProtocol(reader, self._accept_client)
logger.info("QbpServer starting at tcp://%s:%s", self.host, self.port)
self.server = yield from loop.create_server(factory, self.host, self.port)
Это было необходимо создать подкласс StreamReaderProtocol, чтобы построить свой собственный StreamWriter. Кроме того, это то же самое, что и библиотечная функция.
class QbpStreamReaderProtocol(streams.StreamReaderProtocol):
def connection_made(self, transport):
self._stream_reader.set_transport(transport)
if self._client_connected_cb is not None:
self._stream_writer = QbpStreamWriter(transport, self,
self._stream_reader,
self._loop)
res = self._client_connected_cb(self._stream_reader,
self._stream_writer)
if coroutines.iscoroutine(res):
self._loop.create_task(res)
Для исходящих сообщений:
class QbpStreamWriter(streams.StreamWriter):
def write_msg(self, msg):
# data = serialise message
self.write(data)
И для входящих сообщений:
class QbpStreamReader(streams.StreamReader):
@asyncio.coroutine
def read_msg(self):
data = yield from self.readexactly(header_length)
# msg_type, msg_length = unpack header
data = yield from self.readexactly(msg_length)
return build_message(msg_type, data)
Надеется, что это помогает кому-то
Я понимаю, что это широкий вопрос, но вниз голоса без комментариев не помогите сформулировать это лучше. – MarkNS