Я пытаюсь создать CLI на основе Python, который обменивается данными с веб-сервисом через websockets. Одна из проблем, с которыми я сталкиваюсь, заключается в том, что запросы, сделанные CLI веб-службой, прерываются. Глядя на журналы из веб-службы, я могу видеть, что проблема вызвана тем, что часто эти запросы делаются в то же время (или даже после того, как) сокет закрыт:Как подождать, пока сопроцессоры завершатся синхронно внутри метода, если цикл событий уже запущен?
2016-09-13 13:28:10,930 [22 ] INFO DeviceBridge - Device bridge has opened
2016-09-13 13:28:11,936 [21 ] DEBUG DeviceBridge - Device bridge has received message
2016-09-13 13:28:11,937 [21 ] DEBUG DeviceBridge - Device bridge has received valid message
2016-09-13 13:28:11,937 [21 ] WARN DeviceBridge - Unable to process request: {"value": false, "path": "testcube.pwms[0].enabled", "op": "replace"}
2016-09-13 13:28:11,936 [5 ] DEBUG DeviceBridge - Device bridge has closed
В моей CLI Я определяю класс CommunicationService
, который отвечает за обработку всей прямой связи с веб-службой. Внутри он использует пакет websockets
для обработки связи, который сам построен поверх asyncio
.
CommunicationService
содержит следующий метод для отправки запросов:
def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
asyncio.ensure_future(self._ws.send(request))
... где ws
является WebSocket открыт ранее в другом методе:
self._ws = await websockets.connect(websocket_address)
То, что я хочу, чтобы иметь возможность ждать будущее возвращается asyncio.ensure_future
и, при необходимости, ненадолго спать, чтобы дать время веб-службы для обработки запроса до закрытия веб-камеры.
Однако, поскольку send_request
является синхронным методом, он не может просто await
эти фьючерсы. Сделать его асинхронным было бы бессмысленно, так как не было ничего, что ожидало бы возвращенного объекта coroutine. Я также не могу использовать loop.run_until_complete
, поскольку цикл уже запущен к моменту его вызова.
Я нашел кого-то, описывающего проблему, очень похожую на ту, что у меня есть, на mail.python.org. Решение, которое было размещено в этом потоке было сделать функция возвращает объект сопрограмму в случае петля была уже запущена:
def aio_map(coro, iterable, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
coroutines = map(coro, iterable)
coros = asyncio.gather(*coroutines, return_exceptions=True, loop=loop)
if loop.is_running():
return coros
else:
return loop.run_until_complete(coros)
Это невозможно для меня, так как я работаю с PyRx (реализация Python реактивной структура) и send_request
только называется абонентом Rx наблюдаемого, что означает, что возвращаемое значение получает отбрасывается и не доступно для моего кода:
class AnonymousObserver(ObserverBase):
...
def _on_next_core(self, value):
self._next(value)
на стороне записки, я не уверен, если это какая-то проблема с asyncio
, которая обычно встречается или я просто не получаю ее, но я нахожу это довольно неприятным для использования. В C# (например), все, что нужно будет сделать, это, вероятно, что-то вроде следующего:
void SendRequest(string request)
{
this.ws.Send(request).Wait();
// Task.Delay(500).Wait(); // Uncomment If necessary
}
Между тем, версия asyncio
«s из„ждать“беспомощно просто возвращает другой сопрограммная, что я вынужден отказаться.
Update
Я нашел способ обойти эту проблему, которая, кажется, работает. У меня есть асинхронный обратный вызов, который запускается на выполнение после того, как команда выполнена, и до консоли заканчивается, так что я просто изменил его от этого ...
async def after_command():
await comms.stop()
...к этому:
async def after_command():
await asyncio.sleep(0.25) # Allow time for communication
await comms.stop()
Я все еще был бы рад получить любые ответы на эту проблему для дальнейшего использования. Возможно, я не смогу полагаться на обходные пути, подобные этому, в других ситуациях, и я по-прежнему считаю, что было бы лучше использовать задержку, выполненную внутри send_request
, так что клиентам CommunicationService
не придется беспокоиться о проблемах с синхронизацией.
Что касается вопроса Винсента:
ли ваш пробег цикл в другом потоке, или send_request вызывается какой-то обратный вызов?
Все работает в одной теме - это называется обратным вызовом. Что происходит, так это то, что я определяю все мои команды для использования асинхронных обратных вызовов, и когда они выполняются, некоторые из них попытаются отправить запрос в веб-службу. Поскольку они асинхронны, они не делают этого до тех пор, пока они не будут выполнены по вызову loop.run_until_complete
на верхнем уровне CLI - это означает, что цикл работает к тому времени, когда они выполняются в середине пути, и делая это запрос (по косвенному вызову send_request
).
Update 2
Вот решение, основанное на предложении Винсента добавления «сделано» обратного вызова.
Новое поле boolean _busy
добавлено к CommunicationService
для представления, если происходит активность comms.
CommunicationService.send_request
модифицирован, чтобы установить _busy
истинное перед отправкой запроса, а затем предоставляет обратный вызов _ws.send
сбросить _busy
раз сделано:
def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
def callback(_):
self._busy = False
self._busy = True
asyncio.ensure_future(self._ws.send(request)).add_done_callback(callback)
CommunicationService.stop
теперь реализован ждать этого флага быть установлен ложный, прежде прогрессирующая:
async def stop(self) -> None:
"""
Terminate communications with TestCube Web Service.
"""
if self._listen_task is None or self._ws is None:
return
# Wait for comms activity to stop.
while self._busy:
await asyncio.sleep(0.1)
# Allow short delay after final request is processed.
await asyncio.sleep(0.1)
self._listen_task.cancel()
await asyncio.wait([self._listen_task, self._ws.close()])
self._listen_task = None
self._ws = None
logger.info('Terminated connection to TestCube Web Service')
Это похоже на работу тоже, и по крайней мере, таким образом, вся логика синхронизации связи инкапсулируется в CommunicationService
класс как и должно быть.
Update 3
Nicer решение, основанное на предложении Винсента.
Вместо self._busy
у нас есть self._send_request_tasks = []
.
Новая send_request
реализация:
def send_request(self, request: str) -> None:
logger.debug('Sending request: {}'.format(request))
task = asyncio.ensure_future(self._ws.send(request))
self._send_request_tasks.append(task)
Новая stop
реализация:
async def stop(self) -> None:
if self._listen_task is None or self._ws is None:
return
# Wait for comms activity to stop.
if self._send_request_tasks:
await asyncio.wait(self._send_request_tasks)
...
'' ли ваш запуск цикла в другом потоке , или 'send_request' вызван некоторым обратным вызовом? – Vincent
Это все однопоточное, так называемое обратным вызовом. Я обновил сообщение. – Tagc
Как насчет использования 'asyncio.ensure_future (self._ws.send (request)). Add_done_callback (...)', чтобы запланировать обратный вызов для запуска после отправки запроса? – Vincent