2016-12-28 5 views
4

Я пытаюсь переписать этот код python2.7 к новому асинхронном мирового порядка:Как создать генератор async в Python?

def get_api_results(func, iterable): 
    pool = multiprocessing.Pool(5) 
    for res in pool.map(func, iterable): 
     yield res 

map() блоки, пока все результаты были вычислены, так что я пытаюсь переписать это как реализация асинхронной, что будет дают результаты, как только они будут готовы. Например map(), возвращаемые значения должны быть возвращены в таком же порядке, как iterable. Я попробовал это (мне нужно requests из-за требования устаревшего AUTH):

import requests 

def get(i): 
    r = requests.get('https://example.com/api/items/%s' % i) 
    return i, r.json() 

async def get_api_results(): 
    loop = asyncio.get_event_loop() 
    futures = [] 
    for n in range(1, 11): 
     futures.append(loop.run_in_executor(None, get, n)) 
    async for f in futures: 
     k, v = await f 
     yield k, v 

for r in get_api_results(): 
    print(r) 

но с Python 3.6 Я получаю:

File "scratch.py", line 16, in <module> 
    for r in get_api_results(): 
TypeError: 'async_generator' object is not iterable 

Как я могу это сделать?

+1

Не помещайте цикл событий в блок асинхронного кода, асинхронный код должен запускаться контуром события, а не наоборот. –

+0

Спасибо! Наверняка, я что-то пропустил. Все примеры циклов событий, которые я видел, используют loop.run_until_complete (get_api_results()), которые, как я понимаю, сделают блокировку вызовов и потеряют результаты. –

+0

У вас обычно будет больше сопрограмм, обрабатывающих результаты, с циклом события, управляющим этим. –

ответ

4

Что касается вашего старого (2.7) кода - многопроцессорная обработка считается мощной заменой для более простого модуля потоковой обработки для одновременной обработки задач с интенсивным использованием ЦП, где потоки не работают так хорошо. Ваш код, вероятно, не связан с ЦП, так как ему просто нужно делать HTTP-запросы - и потоковой передачи могло быть достаточно для решения вашей проблемы.

Однако, вместо прямого использования threading, у Python 3+ есть хороший модуль под названием concurrent.futures, который с чистым API через классные классы Executor. Этот модуль доступен также для python 2.7 как external package.

Следующий код работает на Python 2 и Python 3:

# For python 2, first run: 
# 
# pip install futures 
# 
from __future__ import print_function 

import requests 
from concurrent import futures 

URLS = [ 
    'http://httpbin.org/delay/1', 
    'http://httpbin.org/delay/3', 
    'http://httpbin.org/delay/6', 
    'http://www.foxnews.com/', 
    'http://www.cnn.com/', 
    'http://europe.wsj.com/', 
    'http://www.bbc.co.uk/', 
    'http://some-made-up-domain.coooom/', 
] 


def fetch(url): 
    r = requests.get(url) 
    r.raise_for_status() 
    return r.content 


def fetch_all(urls): 
    with futures.ThreadPoolExecutor(max_workers=5) as executor: 
     future_to_url = {executor.submit(fetch, url): url for url in urls} 
     print("All URLs submitted.") 
     for future in futures.as_completed(future_to_url): 
      url = future_to_url[future] 
      if future.exception() is None: 
       yield url, future.result() 
      else: 
       # print('%r generated an exception: %s' % (
       # url, future.exception())) 
       yield url, None 


for url, s in fetch_all(URLS): 
    status = "{:,.0f} bytes".format(len(s)) if s is not None else "Failed" 
    print('{}: {}'.format(url, status)) 

Этот код использует futures.ThreadPoolExecutor, основанный на резьбе. Большая часть магии находится в as_completed() здесь.

Ваш код python 3.6 выше, использует run_in_executor(), который создает futures.ProcessPoolExecutor(), и на самом деле не использует асинхронный IO !!

Если вы действительно хотите продолжить работу с asyncio, вам нужно будет использовать HTTP-клиент, поддерживающий asyncio, например aiohttp. Вот пример кода:

import asyncio 

import aiohttp 


async def fetch(session, url): 
    print("Getting {}...".format(url)) 
    async with session.get(url) as resp: 
     text = await resp.text() 
    return "{}: Got {} bytes".format(url, len(text)) 


async def fetch_all(): 
    async with aiohttp.ClientSession() as session: 
     tasks = [fetch(session, "http://httpbin.org/delay/{}".format(delay)) 
       for delay in (1, 1, 2, 3, 3)] 
     for task in asyncio.as_completed(tasks): 
      print(await task) 
    return "Done." 


loop = asyncio.get_event_loop() 
resp = loop.run_until_complete(fetch_all()) 
print(resp) 
loop.close() 

Как вы можете видеть, asyncio также имеет as_completed(), теперь с помощью реального асинхронного ввода-вывода, используя только одну нить на одном процессе.

+0

'Поскольку coroutines являются генераторами, в них невозможно использовать простые« выходные ».« Это возможно. https://stackoverflow.com/a/37550568/2908138 – im7mortal

+0

@ im7mortal: Благодарю вас, я удалил эту часть из ответа. – Udi

5

Вы помещаете свою петлю событий в другую совместную программу. Не делай этого. Цикл событий является самым внешним «драйвером» асинхронного кода и должен выполняться синхронно.

Если вам нужно обработать полученные результаты, напишите еще несколько сопрограмм, которые это делают. Они могут принимать данные из очереди или могут напрямую управлять загрузкой.

Вы могли бы иметь основную функцию, которая извлекает и обрабатывает результаты, например:

async def main(loop): 
    for n in range(1, 11): 
     future = loop.run_in_executor(None, get, n) 
     k, v = await future 
     # do something with the result 

loop = asyncio.get_event_loop() 
loop.run_until_complete(main(loop)) 

Я бы сделать функцию get() правильно асинхронной тоже использует библиотеку асинхронной как aiohttp, так что вы не должны использовать исполнителя вообще.

Смежные вопросы