2014-09-02 4 views
2

Я пытаюсь создать сервер серверных событий, с которым я могу подключиться к telnet, и содержимое telnet будет перенесено в браузер. Идея использования Python и asyncio заключается в том, чтобы использовать как можно меньше CPU, поскольку это будет работать на малине Pi.Python - Общайтесь между протоколом/сервером asyncio

До сих пор у меня есть следующее, которое использует библиотеку, найденную здесь: https://pypi.python.org/pypi/asyncio-sse/0.1, которая использует asyncio.

И я также скопировал сервер telnet, который также использует asyncio.

Обе работают отдельно, но я понятия не имею, как связать обе вместе. Как я понимаю, мне нужно позвонить send() в класс SSEHandler изнутри Telnet.data_received, но я не знаю, как к нему обращаться. Оба этих «сервера» должны быть запущены в цикле для приема новых соединений или для перемещения данных.

Может ли кто-нибудь помочь или указать мне в другом направлении? Я на том этапе, где больше не знаю, что делать дальше

import asyncio 
import sse 

# Get an instance of the asyncio event loop 
loop = asyncio.get_event_loop() 

# Setup SSE address and port 
sse_host, sse_port = '192.168.2.25', 8888 

class Telnet(asyncio.Protocol): 
    def connection_made(self, transport): 
     print("Connection received!"); 
     self.transport = transport 

    def data_received(self, data): 
     print(data) 
     self.transport.write(b'echo:') 
     self.transport.write(data) 

     # This is where I want to send data via SSE 
     # SSEHandler.send(data) 

     # Things I've tried :(
     #loop.call_soon_threadsafe(SSEHandler.handle_request()); 
     #loop.call_soon_threadsafe(sse_server.send("PAH!")); 

    def connection_lost(self, esc): 
     print("Connection lost!") 
     telnet_server.close() 

class SSEHandler(sse.Handler): 
    @asyncio.coroutine 
    def handle_request(self): 
     self.send('Working') 

# SSE server 
sse_server = sse.serve(SSEHandler, sse_host, sse_port) 

# Telnet server 
telnet_server = loop.run_until_complete(loop.create_server(Telnet, '192.168.2.25', 7777)) 

#telnet_server.something = sse_server; 

loop.run_until_complete(sse_server) 
loop.run_until_complete(telnet_server.wait_closed()) 

ответ

4

События на стороне сервера - это своего рода протокол HTTP; и у вас может быть любое количество одновременных запросов HTTP в полете в любой данный момент, у вас может быть ноль, если никто не подключен, или десятки. Этот нюанс завершается в двух конструкциях sse.serve и sse.Handler; первый представляет собой один порт прослушивания, который отправляет каждый отдельный запрос клиента последнему.

Кроме того, sse.Handler.handle_request() вызывается один раз для каждого клиента, и клиент отключается после завершения этой совместной процедуры. В вашем коде эта сопрограмма немедленно прекращается, и поэтому клиент видит одно «рабочее» событие. Итак, нам нужно ждать, более или менее навсегда. Мы можем это сделать yield from, используя asyncio.Future().

Вторая проблема заключается в том, что нам как-то нужно будет удержать все отдельные экземпляры SSEHandler() и использовать метод send() для каждого из них. Ну, мы можем иметь каждый самостоятельный учет в своих методах handle_request(); добавив каждый из них в dict, который отображает отдельные экземпляры обработчика в будущее, которое они ожидают.

class SSEHandler(sse.Handler): 
    _instances = {} 

    @asyncio.coroutine 
    def handle_request(self): 
     self.send('Working') 
     my_future = asyncio.Future() 
     SSEHandler._instances[self] = my_future 
     yield from my_future 

Теперь, чтобы отправить событие каждый прослушивания, мы просто посетить все экземпляры SSEHandler, зарегистрированных в Словаре мы создали и используя send() на каждом из них.

class SSEHandler(sse.Handler): 

    #... 

    @classmethod 
    def broadcast(cls, message): 
     for instance, future in cls._instances.items(): 
      instance.send(message) 

class Telnet(asyncio.Protocol): 

    #... 

    def data_received(self, data): 
     #... 
     SSEHandler.broadcast(data.decode('ascii')) 

Наконец, ваш код завершается, когда соединение telnet закрывается. это хорошо, но мы должны очистить и в то время. К счастью, это всего лишь вопрос установки результата на всех фьючерсы на все обработчик

class SSEHandler(sse.Handler): 

    #... 

    @classmethod 
    def abort(cls): 
     for instance, future in cls._instances.items(): 
      future.set_result(None) 
     cls._instances = {} 

class Telnet(asyncio.Protocol): 

    #... 

    def connection_lost(self, esc): 
     print("Connection lost!") 
     SSEHandler.abort() 
     telnet_server.close() 

вот полный, рабочий дамп в случае, если моя иллюстрация не является очевидной.

import asyncio 
import sse 

loop = asyncio.get_event_loop() 
sse_host, sse_port = '0.0.0.0', 8888 

class Telnet(asyncio.Protocol): 
    def connection_made(self, transport): 
     print("Connection received!"); 
     self.transport = transport 

    def data_received(self, data): 
     SSEHandler.broadcast(data.decode('ascii')) 

    def connection_lost(self, esc): 
     print("Connection lost!") 
     SSEHandler.abort() 
     telnet_server.close() 

class SSEHandler(sse.Handler): 
    _instances = {} 
    @classmethod 
    def broadcast(cls, message): 
     for instance, future in cls._instances.items(): 
      instance.send(message) 

    @classmethod 
    def abort(cls): 
     for instance, future in cls._instances.items(): 
      future.set_result(None) 
     cls._instances = {} 

    @asyncio.coroutine 
    def handle_request(self): 
     self.send('Working') 
     my_future = asyncio.Future() 
     SSEHandler._instances[self] = my_future 
     yield from my_future 

sse_server = sse.serve(SSEHandler, sse_host, sse_port) 
telnet_server = loop.run_until_complete(loop.create_server(Telnet, '0.0.0.0', 7777)) 
loop.run_until_complete(sse_server) 
loop.run_until_complete(telnet_server.wait_closed()) 
+0

Огромное спасибо за объяснение, как И почему, очень очень полезно. У меня есть намного больше, чтобы понять, что это казалось бы. Я получил ваш пример кода и выполнял именно то, что я хотел/нуждался, и очень низкий процессор. еще раз спасибо –

Смежные вопросы