2015-06-26 7 views
4

Я ищу некоторые «дополнительные» рекомендации по шаблонам использования классов StreamReader и StreamWriter в пакете asyncio Python.API-интерфейс потокового Python Asyncio

Я пытаюсь создать автономный сервер с пользовательским протоколом с использованием protobuf. Должен ли я подклассифицировать StreamReader и StreamWriter для управления сериализацией из protobuf байтов? Затем я мог бы предоставить читателю функцию read_message. Я знаю, что могу скопировать код из streams.start_server, предоставляющего мой собственный StreamReader, но как мне установить StreamWriter?

Любые указатели или примеры с благодарностью получены.

+0

Я понимаю, что это широкий вопрос, но вниз голоса без комментариев не помогите сформулировать это лучше. – MarkNS

ответ

2

Я предлагаю вместо создания StreamReader/StreamWriter предложить собственные классы с похожим API. Скажем, я сделал это для aiozmq библиотеки: https://github.com/aio-libs/aiozmq/blob/master/aiozmq/stream.py

+0

спасибо за совет Andrew. В конце (как было опубликовано в моем ответе) я решил подкласс. Надеюсь, это не укусит меня в конце, но мне показалось, что мне нужно писать меньше кода, чтобы достичь моих целей. – MarkNS

3

Я нашел его относительно просто подкласс asyncio.streams библиотеки классов.

Функция start_server приподнимается на примере TCP сервера:

@asyncio.coroutine 
def start_server(self, loop): 
    def factory(): 
     reader = QbpStreamReader() 
     return QbpStreamReaderProtocol(reader, self._accept_client) 

    logger.info("QbpServer starting at tcp://%s:%s", self.host, self.port) 
    self.server = yield from loop.create_server(factory, self.host, self.port) 

Это было необходимо создать подкласс StreamReaderProtocol, чтобы построить свой собственный StreamWriter. Кроме того, это то же самое, что и библиотечная функция.

class QbpStreamReaderProtocol(streams.StreamReaderProtocol): 
    def connection_made(self, transport): 
     self._stream_reader.set_transport(transport) 
     if self._client_connected_cb is not None: 
      self._stream_writer = QbpStreamWriter(transport, self, 
                self._stream_reader, 
                self._loop) 
      res = self._client_connected_cb(self._stream_reader, 
              self._stream_writer) 
      if coroutines.iscoroutine(res): 
       self._loop.create_task(res) 

Для исходящих сообщений:

class QbpStreamWriter(streams.StreamWriter): 
    def write_msg(self, msg): 
     # data = serialise message 
     self.write(data) 

И для входящих сообщений:

class QbpStreamReader(streams.StreamReader): 
    @asyncio.coroutine 
    def read_msg(self): 
     data = yield from self.readexactly(header_length) 
     # msg_type, msg_length = unpack header 
     data = yield from self.readexactly(msg_length) 
     return build_message(msg_type, data) 

Надеется, что это помогает кому-то