2015-12-06 2 views
7

Я использую aiohttp сделать простой запрос HTTP в Python 3.4 вроде этого:как кэшировать asyncio сопрограмм

response = yield from aiohttp.get(url) 

Приложение запрашивает тот же URL снова и снова так, естественно, я хотел бы кэшировать. Моя первая попытка была что-то вроде этого:

@functools.lru_cache(maxsize=128) 
def cached_request(url): 
    return aiohttp.get(url) 

Первый вызов cached_request работает нормально, но в последующих вызовах я в конечном итоге с None вместо объекта ответа.

Я довольно новичок в asyncio, поэтому я пробовал много комбинаций декоратора asyncio.coroutine, yield from и некоторых других вещей, но ни один из них не работал.

Как работает кеширование сопрограмм?

+0

Не уверен, что вы имеете в виду кэширования сопрограммы? например Сохраните его как переменную, чтобы вы могли называть ее повторно? Сохранить результат, пока результат не будет заменен при последующем исполнении? Или повторить тот же сопроцесс позже? – shongololo

+0

@shongololo Я хочу кэшировать результат сопрограммы. – tobib

+1

Я не знаком с functools.lru_cache(), но если вы просто хотите вернуть обновленные результаты, то есть ли причина, по которой вы не просто сохраняете обновленные результаты в переменной? Тем не менее, при использовании асинхронного метода (например, 'aiohttp.get()') вам нужно что-то прогнать. Поэтому cached_request должен быть заключен в '@ asyncio.coroutine'; он должен быть вызван с использованием 'yield from'; и оператор return должен быть обрамлен по линиям 'return (yield from aiohttp.get (url))' – shongololo

ответ

3

Я написал простой декоратор своего кэша себя:

def async_cache(maxsize=128): 
    cache = {} 

    def decorator(fn): 
     def wrapper(*args):               
      key = ':'.join(args) 

      if key not in cache: 
       if len(cache) >= maxsize: 
        del cache[cache.keys().next()] 

       cache[key] = yield from fn(*args) 

      return cache[key] 

     return wrapper 

    return decorator 


@async_cache() 
@asyncio.coroutine 
def expensive_io(): 
    .... 

Этот вид-из-работ. Но многие аспекты, вероятно, могут быть улучшены. Например: если функция кэширования вызывается второй раз перед первым вызовом, она будет выполняться второй раз.

+0

Предложение: используйте ['OrderedDict'] (https://docs.python.org/3/library/collections .html # collections.OrderedDict), чтобы реализовать поведение 'lru', то есть использовать' OrderedDict.move_to_end' для каждого вызванного ключа, а затем 'OrderedDict.popitem', когда кеш заполнен. –

0

Я не знаком с aiohttp, поэтому я не уверен точно, что происходит, что приведет к возврату Nones, но декодер lru_cache не будет работать с асинхронными функциями.

Я использую декоратор, который делает практически то же самое; обратите внимание, что это отличается от декоратора tobib в выше в том, что он всегда будет возвращать будущее или задачу, а не значение:

from collections import OrderedDict 
from functools import _make_key, wraps 

def future_lru_cache(maxsize=128): 
    # support use as decorator without calling, for this case maxsize will 
    # not be an int 
    try: 
     real_max_size = int(maxsize) 
    except ValueError: 
     real_max_size = 128 

    cache = OrderedDict() 

    async def run_and_cache(func, args, kwargs): 
     """Run func with the specified arguments and store the result 
     in cache.""" 
     result = await func(*args, **kwargs) 
     cache[_make_key(args, kwargs, False)] = result 
     if len(cache) > real_max_size: 
      cache.popitem(False) 
     return result 

    def wrapper(func): 
     @wraps(func) 
     def decorator(*args, **kwargs): 
      key = _make_key(args, kwargs, False) 
      if key in cache: 
       # Some protection against duplicating calls already in 
       # progress: when starting the call cache the future, and if 
       # the same thing is requested again return that future. 
       if isinstance(cache[key], asyncio.Future): 
        return cache[key] 
       else: 
        f = asyncio.Future() 
        f.set_result(cache[key]) 
        return f 
      else: 
       task = asyncio.Task(run_and_cache(func, args, kwargs)) 
       cache[key] = task 
       return task 
     return decorator 

    if callable(maxsize): 
     return wrapper(maxsize) 
    else: 
     return wrapper 

я использовал _make_key из functools, как lru_cache делает, я думаю, что он должен быть частным, так вероятно, лучше скопировать его.

0

Другой вариант LRU декоратора, который кэширует еще не закончил сопрограмм, очень полезные с параллельными запросами на тот же ключ:

import asyncio 
from collections import OrderedDict 
from functools import _make_key, wraps 

def async_cache(maxsize=128, event_loop=None): 
    cache = OrderedDict() 
    if event_loop is None: 
     event_loop = asyncio.get_event_loop() 
    awaiting = dict() 

    async def run_and_cache(func, args, kwargs): 
     """await func with the specified arguments and store the result 
     in cache.""" 
     result = await func(*args, **kwargs) 
     key = _make_key(args, kwargs, False) 
     cache[key] = result 
     if len(cache) > maxsize: 
      cache.popitem(False) 
     cache.move_to_end(key) 
     return result 

    def decorator(func): 
     @wraps(func) 
     async def wrapper(*args, **kwargs): 
      key = _make_key(args, kwargs, False) 
      if key in cache: 
       return cache[key] 
      if key in awaiting: 
       task = awaiting[key] 
       return await asyncio.wait_for(task, timeout=None, loop=event_loop) 
      task = asyncio.ensure_future(run_and_cache(func, args, kwargs), loop=event_loop) 
      awaiting[key] = task 
      result = await asyncio.wait_for(task, timeout=None, loop=event_loop) 
      del awaiting[key] 
      return result 
     return wrapper 

    return decorator 


async def test_async_cache(event_loop): 
    counter = 0 
    n, m = 10, 3 

    @async_cache(maxsize=n, event_loop=event_loop) 
    async def cached_function(x): 
     nonlocal counter 
     await asyncio.sleep(0) # making event loop switch to other coroutine 
     counter += 1 
     return x 

    tasks = [asyncio.ensure_future(cached_function(x), loop=event_loop) 
      for x in list(range(n)) * m] 
    done, pending = await asyncio.wait(tasks, loop=event_loop, timeout=1) 
    assert len(done) == n * m 
    assert counter == n 

event_loop = asyncio.get_event_loop() 
task = asyncio.ensure_future(test_async_cache(event_loop)) 
event_loop.run_until_complete(task) 
2

Может быть, немного поздно, но я начал новый пакет, который может помочь : https://github.com/argaen/aiocache. Взносы/комментарии всегда приветствуются.

Пример:

import asyncio 

from collections import namedtuple 

from aiocache import cached 
from aiocache.serializers import PickleSerializer 

Result = namedtuple('Result', "content, status") 


@cached(ttl=10, serializer=PickleSerializer()) 
async def async_main(): 
    print("First ASYNC non cached call...") 
    await asyncio.sleep(1) 
    return Result("content", 200) 


if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 
    print(loop.run_until_complete(async_main())) 
    print(loop.run_until_complete(async_main())) 
    print(loop.run_until_complete(async_main())) 
    print(loop.run_until_complete(async_main())) 

Обратите внимание, что в качестве дополнительных, он может кэшировать любой объект питона в Redis с помощью Рассол сериализации. Если вы просто хотите работать с памятью, вы можете использовать бэкэнд SimpleMemoryCache :).

1

Чтобы использовать functools.lru_cache с сопрограммами, действует следующий код.

class Cacheable: 
    def __init__(self, co): 
     self.co = co 
     self.done = False 
     self.result = None 
     self.lock = asyncio.Lock() 

    def __await__(self): 
     with (yield from self.lock): 
      if self.done: 
       return self.result 
      self.result = yield from self.co.__await__() 
      self.done = True 
      return self.result 

def cacheable(f): 
    def wrapped(*args, **kwargs): 
     r = f(*args, **kwargs) 
     return Cacheable(r) 
    return wrapped 


@functools.lru_cache() 
@cacheable 
async def foo(): 
    async with aiohttp.ClientSession() as session: 
     async with session.get(url) as resp: 
      return await resp.text() 

Ниже поточно

class ThreadSafeCacheable: 
    def __init__(self, co): 
     self.co = co 
     self.done = False 
     self.result = None 
     self.lock = threading.Lock() 

    def __await__(self): 
     while True: 
      if self.done: 
       return self.result 
      if self.lock.acquire(blocking=False): 
       self.result = yield from self.co.__await__() 
       self.done = True 
       return self.result 
      else: 
       yield from asyncio.sleep(0.005) 
Смежные вопросы