2015-08-17 4 views
20

Я использую библиотеку websockets для создания сервера websocket в Python 3.4. Вот простой сервер эхо:Python - как запустить несколько сопрограмм одновременно с помощью asyncio?

import asyncio 
import websockets 

@asyncio.coroutine 
def connection_handler(websocket, path): 
    while True: 
     msg = yield from websocket.recv() 
     if msg is None: # connection lost 
      break 
     yield from websocket.send(msg) 

start_server = websockets.serve(connection_handler, 'localhost', 8000) 
asyncio.get_event_loop().run_until_complete(start_server) 
asyncio.get_event_loop().run_forever() 

Допустим, мы - дополнительно - хотел послать сообщение клиенту всякий раз, когда какое-то событие происходит. Для простоты, давайте отправляем сообщение периодически каждые 60 секунд. Как мы это сделаем? Я имею в виду, потому что connection_handler постоянно ждет входящих сообщений, сервер может принять меры после он получил сообщение от клиента, не так ли? Что мне здесь не хватает?

Возможно, этот сценарий требует рамки, основанной на событиях/обратных вызовах, а не на основе сопрограмм? Tornado?

ответ

22

TL; DR Используйте asyncio.ensure_future() для запуска нескольких сопрограмм одновременно.


Может быть, этот сценарий требует структуру, основанную на событиях/обратных вызовов, а не один на основе сопрограммам? Торнадо?

Нет, для этого вам не нужны никакие другие рамки. Вся идея асинхронного приложения против синхронного заключается в том, что он не блокирует, ожидая результата. Неважно, как это реализовано, используя сопрограммы или обратные вызовы.

Я имею в виду, потому что connection_handler постоянно ждет входящих сообщений, сервер может действовать только после того, как он получил сообщение от клиента, верно? Что мне здесь не хватает?

В синхронном приложении вы напишете что-то вроде msg = websocket.recv(), которое блокирует все приложение до получения сообщения (как вы описали). Но в асинхронном приложении это совершенно другое.

Когда вы делаете msg = yield from websocket.recv(), вы говорите что-то вроде: приостановите исполнение connection_handler() до websocket.recv() что-то произведет. С помощью yield from внутри coroutine возвращает управление обратно в цикл событий, поэтому можно выполнить другой код, в то время как мы ожидаем результата от websocket.recv(). Пожалуйста, обратитесь к documentation, чтобы лучше понять, как работают сопрограммы.

Предположим, что мы - дополнительно - хотели отправить сообщение клиенту всякий раз, когда происходит какое-либо событие. Для простоты, давайте отправляем сообщение периодически каждые 60 секунд. Как мы это сделаем?

Вы можете использовать asyncio.async() запускать столько сопрограмм, как вы хотите, перед выполнением блокировки вызова для starting event loop.

import asyncio 

import websockets 

# here we'll store all active connections to use for sending periodic messages 
connections = [] 


@asyncio.coroutine 
def connection_handler(connection, path): 
    connections.append(connection) # add connection to pool 
    while True: 
     msg = yield from connection.recv() 
     if msg is None: # connection lost 
      connections.remove(connection) # remove connection from pool, when client disconnects 
      break 
     else: 
      print('< {}'.format(msg)) 
     yield from connection.send(msg) 
     print('> {}'.format(msg)) 


@asyncio.coroutine 
def send_periodically(): 
    while True: 
     yield from asyncio.sleep(5) # switch to other code and continue execution in 5 seconds 
     for connection in connections: 
      print('> Periodic event happened.') 
      yield from connection.send('Periodic event happened.') # send message to each connected client 


start_server = websockets.serve(connection_handler, 'localhost', 8000) 
asyncio.get_event_loop().run_until_complete(start_server) 
asyncio.async(send_periodically()) # before blocking call we schedule our coroutine for sending periodic messages 
asyncio.get_event_loop().run_forever() 

Ниже приведен пример реализации клиента. Он просит вас ввести имя, вернуть его с сервера эха, ждет еще два сообщения с сервера (которые являются нашими периодическими сообщениями) и закрывает соединение.

import asyncio 

import websockets 


@asyncio.coroutine 
def hello(): 
    connection = yield from websockets.connect('ws://localhost:8000/') 
    name = input("What's your name? ") 
    yield from connection.send(name) 
    print("> {}".format(name)) 
    for _ in range(3): 
     msg = yield from connection.recv() 
     print("< {}".format(msg)) 

    yield from connection.close() 


asyncio.get_event_loop().run_until_complete(hello()) 

Важные моменты:

  1. В Python 3.4. был переименован в asyncio.ensure_future().
  2. Существуют специальные методы для планирования delayed calls, но они не работают с сопрограммами.
+2

Отличный ответ, спасибо! Я понимаю, что такое сопрограммы, но я все еще пытаюсь разглядеть структуру асинхронного ядра. Ваш ответ очень помог. – weatherfrog

+0

@weatherfrog Вы решили проблему? У меня есть аналогичная проблема [здесь] (https://stackoverflow.com/questions/49871048/tornado-client-on-message-callback-is-not-ответ) – verystrongjoe

6

Тот же вопрос, вряд ли получил решение, пока я не увидел идеальный образец здесь: http://websockets.readthedocs.io/en/stable/intro.html#both

done, pending = await asyncio.wait(
     [listener_task, producer_task], 
     return_when=asyncio.FIRST_COMPLETED) # Important 

Таким образом, я могу справиться с мульти задачи сопрограмм, такие как сердцебиение и Redis ПОДПИСАТЬСЯ.

Смежные вопросы