2016-05-14 6 views
3

У меня есть ситуация, когда у меня есть «серверный» поток, который должен прослушивать вызовы/события из других потоков сервера и одновременно выполнять какой-то другой код. В последнее время я много работал с Node.js, поэтому я подумал, что было бы неплохо использовать async/await для создания цикла событий, где я могу подождать другие потоки, чтобы присоединиться к циклу событий и обработать их ответ, когда они, наконец, присоединятся.Python asyncio wait for threads

Чтобы проверить эту идею, я написал следующий тестовый скрипт в Python 3.5:

# http://stackabuse.com/python-async-await-tutorial/ 
# Testing out Python's asynchronous features 
import asyncio 
from time import sleep 
import threading 
from threading import Thread 
import random 

class MyThread(Thread): 

    def __init__(self, message): 
     Thread.__init__(self) 
     self._message = message 

    def run(self): 
     self._return = self._message + " oli viesti" 
     a = random.randint(1, 5) 
     print("Sleep for ", a) 
     sleep(a) 
     print("Thread exiting...") 


    def join(self): 
     Thread.join(self) 
     return self._return 



async def send(message): 
    t = MyThread(message) # daemon = True 
    t.start() 
    print("asd") 
    return t.join() 

async def sendmsg(msg): 
    response = await send(msg) 
    print("response is ", response) 


if __name__ == "__main__": 
    # Initiate a new thread and pass in keyword argument dictionary as parameters 
    loop = asyncio.get_event_loop() 
    tasks = [ 
     asyncio.ensure_future(sendmsg("hippa1"), loop=loop), 
     asyncio.ensure_future(sendmsg("hippa2"), loop=loop), 
     asyncio.ensure_future(sendmsg("hippa3"), loop=loop) 
    ] 

    loop.run_until_complete(asyncio.wait(tasks)) 
    loop.close() 

В примере я хочу пнуть три рабочие потоки с разными строками и ждать их, чтобы закончить. Рабочие спят случайное количество времени, поэтому я ожидаю, что они закончатся в случайном порядке, когда скрипт запускается несколько раз. Оказывается, что они, похоже, исполняются последовательно, второй поток начинается с первого раза.

Какая у меня ошибка? Не следует ли спящать блок только в том потоке, в котором он находится? Правильно ли настроен цикл событий? Могу ли я подключиться к асинхронному подключению?

В конечном итоге я хочу отправлять сообщения в другие темы и ждать их ответа, а затем запустить функцию обратного вызова с возвращенным значением.

EDIT: Чтобы уточнить, в конечном счете, я хочу подождать условных переменных с async/await в моем основном потоке и запустить другой код до тех пор, пока некоторые условные переменные не смогут выполнить выполнение. В этом примере кода я пытался сделать то же самое с соединением рабочего потока.

+0

'time.sleep' не является асинхронным, попробуйте его с помощью' await asyncio.sleep' – jonrsharpe

+0

Но он запускается в отдельном потоке, поэтому не следует просто блокировать отдельный поток вместо основного потока, где цикл события является? Я понимаю, что сон - это поток, а не блокирование процесса. Итак, зачем присоединяться к моей основной теме, даже с структурой async/wait? – Tumetsu

ответ

1

В конце концов, он работает последовательно из-за этого кода:

async def send(message): 
    t = MyThread(message) # daemon = True 
    t.start() 
    print("asd") 
    return t.join() 

Вы начинаете нить, а затем сразу же ждать на этой теме, чтобы закончить, прежде чем продолжить. Вот почему они выполняются последовательно.

Node.js и asyncio необязательно создают новые потоки для выполнения своих операций. Например, Node.js использует только один поток, но использует функции уровня ядра (например, «epoll») для вызова обратных вызовов, которые вы указываете при возникновении какой-либо новой сетевой активности. Это позволяет одному потоку управлять сотнями сетевых подключений.

Вот почему, вероятно, когда вы выполнили это без экземпляра Thread, вы вызовите sleep в текущем потоке, который совпадает с основным потоком. Когда вы используете asyncio с сетевыми функциями, вы можете использовать структуры «выход из», которые позволяют выполнять другие блоки кода, в то время как другие задачи выполняют действия с другими удаленными службами.

Основная структура правильная. Вы хотите, чтобы этот блок кода:

loop.run_until_complete(asyncio.wait(tasks)) 

Но не полагаться на «сна», чтобы проверить функцию, вам нужно сделать сетевой вызов или использование:

yield from asyncio.sleep(1) 

И нет никакой необходимости для запуска отдельных потоков в этом случае.

+0

Хороший вопрос о начале потока.Тем не менее, в моем случае мне действительно нужны отдельные потоки, но я хочу * ожидать условия от них асинхронно *, если это возможно. Поэтому я хотел бы иметь функцию, которая проверяет соединение в цикле событий, но если все еще выполняет другой произвольный код, пока соединение не будет блокироваться в цикле. Поэтому мой случай не полностью аналог Node.js, и я вместо этого хочу подождать другие потоки, чтобы закончить/установить условную переменную без блокировки моего основного потока с помощью цикла асинхронного события. – Tumetsu

+0

Вкратце я не могу понять, как написать что-то вроде: yield t.join() – Tumetsu

+0

В этом случае я предлагаю вам не использовать asyncio вообще, но работать с промежутком между ними. Вы можете подождать в очереди с task_done() и использовать get() для поиска рабочих элементов. Затем вы можете использовать другую очередь для передачи результатов обратно в основной поток. Пример кода, который показывает его на странице документа: https://docs.python.org/3/library/queue.html – radialmind