2016-09-13 1 views
0

Я пытаюсь создать CLI на основе Python, который обменивается данными с веб-сервисом через websockets. Одна из проблем, с которыми я сталкиваюсь, заключается в том, что запросы, сделанные CLI веб-службой, прерываются. Глядя на журналы из веб-службы, я могу видеть, что проблема вызвана тем, что часто эти запросы делаются в то же время (или даже после того, как) сокет закрыт:Как подождать, пока сопроцессоры завершатся синхронно внутри метода, если цикл событий уже запущен?

2016-09-13 13:28:10,930 [22 ] INFO DeviceBridge - Device bridge has opened 
2016-09-13 13:28:11,936 [21 ] DEBUG DeviceBridge - Device bridge has received message 
2016-09-13 13:28:11,937 [21 ] DEBUG DeviceBridge - Device bridge has received valid message 
2016-09-13 13:28:11,937 [21 ] WARN DeviceBridge - Unable to process request: {"value": false, "path": "testcube.pwms[0].enabled", "op": "replace"} 
2016-09-13 13:28:11,936 [5 ] DEBUG DeviceBridge - Device bridge has closed 

В моей CLI Я определяю класс CommunicationService, который отвечает за обработку всей прямой связи с веб-службой. Внутри он использует пакет websockets для обработки связи, который сам построен поверх asyncio.

CommunicationService содержит следующий метод для отправки запросов:

def send_request(self, request: str) -> None: 
    logger.debug('Sending request: {}'.format(request)) 
    asyncio.ensure_future(self._ws.send(request)) 

... где ws является WebSocket открыт ранее в другом методе:

self._ws = await websockets.connect(websocket_address) 

То, что я хочу, чтобы иметь возможность ждать будущее возвращается asyncio.ensure_future и, при необходимости, ненадолго спать, чтобы дать время веб-службы для обработки запроса до закрытия веб-камеры.

Однако, поскольку send_request является синхронным методом, он не может просто await эти фьючерсы. Сделать его асинхронным было бы бессмысленно, так как не было ничего, что ожидало бы возвращенного объекта coroutine. Я также не могу использовать loop.run_until_complete, поскольку цикл уже запущен к моменту его вызова.

Я нашел кого-то, описывающего проблему, очень похожую на ту, что у меня есть, на mail.python.org. Решение, которое было размещено в этом потоке было сделать функция возвращает объект сопрограмму в случае петля была уже запущена:

def aio_map(coro, iterable, loop=None): 
    if loop is None: 
     loop = asyncio.get_event_loop() 

    coroutines = map(coro, iterable) 
    coros = asyncio.gather(*coroutines, return_exceptions=True, loop=loop) 

    if loop.is_running(): 
     return coros 
    else: 
     return loop.run_until_complete(coros) 

Это невозможно для меня, так как я работаю с PyRx (реализация Python реактивной структура) и send_request только называется абонентом Rx наблюдаемого, что означает, что возвращаемое значение получает отбрасывается и не доступно для моего кода:

class AnonymousObserver(ObserverBase): 
    ... 
    def _on_next_core(self, value): 
     self._next(value) 

на стороне записки, я не уверен, если это какая-то проблема с asyncio, которая обычно встречается или я просто не получаю ее, но я нахожу это довольно неприятным для использования. В C# (например), все, что нужно будет сделать, это, вероятно, что-то вроде следующего:

void SendRequest(string request) 
{ 
    this.ws.Send(request).Wait(); 
    // Task.Delay(500).Wait(); // Uncomment If necessary 
} 

Между тем, версия asyncio «s из„ждать“беспомощно просто возвращает другой сопрограммная, что я вынужден отказаться.

Update

Я нашел способ обойти эту проблему, которая, кажется, работает. У меня есть асинхронный обратный вызов, который запускается на выполнение после того, как команда выполнена, и до консоли заканчивается, так что я просто изменил его от этого ...

async def after_command(): 
    await comms.stop() 

...к этому:

async def after_command(): 
    await asyncio.sleep(0.25) # Allow time for communication 
    await comms.stop() 

Я все еще был бы рад получить любые ответы на эту проблему для дальнейшего использования. Возможно, я не смогу полагаться на обходные пути, подобные этому, в других ситуациях, и я по-прежнему считаю, что было бы лучше использовать задержку, выполненную внутри send_request, так что клиентам CommunicationService не придется беспокоиться о проблемах с синхронизацией.

Что касается вопроса Винсента:

ли ваш пробег цикл в другом потоке, или send_request вызывается какой-то обратный вызов?

Все работает в одной теме - это называется обратным вызовом. Что происходит, так это то, что я определяю все мои команды для использования асинхронных обратных вызовов, и когда они выполняются, некоторые из них попытаются отправить запрос в веб-службу. Поскольку они асинхронны, они не делают этого до тех пор, пока они не будут выполнены по вызову loop.run_until_complete на верхнем уровне CLI - это означает, что цикл работает к тому времени, когда они выполняются в середине пути, и делая это запрос (по косвенному вызову send_request).

Update 2

Вот решение, основанное на предложении Винсента добавления «сделано» обратного вызова.

Новое поле boolean _busy добавлено к CommunicationService для представления, если происходит активность comms.

CommunicationService.send_request модифицирован, чтобы установить _busy истинное перед отправкой запроса, а затем предоставляет обратный вызов _ws.send сбросить _busy раз сделано:

def send_request(self, request: str) -> None: 
    logger.debug('Sending request: {}'.format(request)) 

    def callback(_): 
     self._busy = False 

    self._busy = True 
    asyncio.ensure_future(self._ws.send(request)).add_done_callback(callback) 

CommunicationService.stop теперь реализован ждать этого флага быть установлен ложный, прежде прогрессирующая:

async def stop(self) -> None: 
    """ 
    Terminate communications with TestCube Web Service. 
    """ 
    if self._listen_task is None or self._ws is None: 
     return 

    # Wait for comms activity to stop. 
    while self._busy: 
     await asyncio.sleep(0.1) 

    # Allow short delay after final request is processed. 
    await asyncio.sleep(0.1) 

    self._listen_task.cancel() 
    await asyncio.wait([self._listen_task, self._ws.close()]) 

    self._listen_task = None 
    self._ws = None 
    logger.info('Terminated connection to TestCube Web Service') 

Это похоже на работу тоже, и по крайней мере, таким образом, вся логика синхронизации связи инкапсулируется в CommunicationService класс как и должно быть.

Update 3

Nicer решение, основанное на предложении Винсента.

Вместо self._busy у нас есть self._send_request_tasks = [].

Новая send_request реализация:

def send_request(self, request: str) -> None: 
    logger.debug('Sending request: {}'.format(request)) 

    task = asyncio.ensure_future(self._ws.send(request)) 
    self._send_request_tasks.append(task) 

Новая stop реализация:

async def stop(self) -> None: 
    if self._listen_task is None or self._ws is None: 
     return 

    # Wait for comms activity to stop. 
    if self._send_request_tasks: 
     await asyncio.wait(self._send_request_tasks) 
    ... 
+0

'' ли ваш запуск цикла в другом потоке , или 'send_request' вызван некоторым обратным вызовом? – Vincent

+0

Это все однопоточное, так называемое обратным вызовом. Я обновил сообщение. – Tagc

+1

Как насчет использования 'asyncio.ensure_future (self._ws.send (request)). Add_done_callback (...)', чтобы запланировать обратный вызов для запуска после отправки запроса? – Vincent

ответ

2

Вы можете использовать set задач:

self._send_request_tasks = set() 

Расписание задач с помощью ensure_future и очистить с помощью add_done_callback:

def send_request(self, request: str) -> None: 
    task = asyncio.ensure_future(self._ws.send(request)) 
    self._send_request_tasks.add(task) 
    task.add_done_callback(self._send_request_tasks.remove) 

И ждать set задач для завершения:

async def stop(self): 
    if self._send_request_tasks: 
     await asyncio.wait(self._send_request_tasks) 
+0

Одно из незначительных предостережений с 'asyncio.wait' в соответствии с [документацией] (https://docs.python.org/3/library/asyncio-task.html) состоит в том, что« фреймы последовательности * не должны быть пустыми » , поэтому я бы просто ввел 'asyncio.wait' внутри оператора if, проверяя, что набор не пуст (как я сделал в своем сообщении). В противном случае это выглядит великолепно и, похоже, работает. – Tagc

+0

@Tagc Вы правы, см. Мое редактирование. – Vincent

0

Учитывая, что вы не в асинхронной функции вы можете использовать yield from ключевое слово, чтобы эффективно реализовать await себя. Следующий код не будет блокировать до будущей прибыли: «Я также не могу использовать loop.run_until_complete как цикл уже запущен в момент вызова»

def send_request(self, request: str) -> None: 
    logger.debug('Sending request: {}'.format(request)) 
    future = asyncio.ensure_future(self._ws.send(request)) 
    yield from future.__await__() 
Смежные вопросы