Per PEP-492 Я пытаюсь реализовать асинхронный итератор, чтобы я мог сделать, например,Реализация асинхронного итератора
async for foo in bar:
...
Вот тривиальный пример, похожий на тот, в документации, с очень простым тестом конкретизации и асинхронной итерация:
import pytest
class TestImplementation:
def __aiter__(self):
return self
async def __anext__(self):
raise StopAsyncIteration
@pytest.mark.asyncio # note use of pytest-asyncio marker
async def test_async_for():
async for _ in TestImplementation():
pass
Однако, когда я исполняю свой набор тестов, я вижу :
=================================== FAILURES ===================================
________________________________ test_async_for ________________________________
@pytest.mark.asyncio
async def test_async_for():
> async for _ in TestImplementation():
E TypeError: 'async for' received an invalid object from __aiter__: TestImplementation
...: TypeError
===================== 1 failed, ... passed in 2.89 seconds ======================
Почему мой TestImplementation
являются недействительными? Насколько я могу сказать, что это соответствует протоколу:
- Объект должен реализовать метод
__aiter__
... возвращая асинхронный объект итератора.- Асинхронный объект итератора должен реализовать метод
__anext__
... возвращающий ожидаемый.- Чтобы остановить итерацию
__anext__
, необходимо поднятьStopAsyncIteration
исключение.
Это неудача с последней выпущенными версиями Python (3.5.1), py.test
(2.9.2) и pytest-asyncio
(0.4.1).
После почти 3 лет вы наконец [задали вопрос] (http://stackoverflow.com/users/3001761/jonrsharpe?tab=questions). Kudos –
@BhargavRao Я чувствовал себя как 2076: 1 был оправа; o) – jonrsharpe
Работает для меня с pytest 2.9.2. Какую версию ты используешь? –