2015-09-29 3 views
1

Я новичок в asyncio (используется с python3.4), и я не уверен, что использую его как нужно. Я видел в this thread, что его можно использовать для выполнения функции каждые n секунд (в моем случае ms) без необходимости погружения в потоки.Использование цикла событий asyncio рекурсивно

Я использую его для получения данных от лазерных датчиков через базовый последовательный протокол каждые n мс до тех пор, пока не получу m выборок.

Вот определение из моих функций:

def countDown(self, 
       loop, 
       funcToDo, 
       *args, 
       counter = [ 1 ], 
       **kwargs): 
    """ At every call, it executes funcToDo (pass it args and kwargs) 
     and count down from counter to 0. Then, it stop loop """ 
    if counter[ 0 ] == 0: 
     loop.stop() 
    else: 
     funcToDo(*args, **kwargs) 
     counter[ 0 ] -= 1 


def _frangeGen(self, start = 0, stop = None, step = 1): 
    """ use to generate a time frange from start to stop by step step """ 
    while stop is None or start < stop: 
     yield start 
     start += step 

def callEvery(self, 
       loop, 
       interval, 
       funcToCall, 
       *args, 
       now = True, 
       **kwargs): 
    """ repeat funcToCall every interval sec in loop object """ 
    nb = kwargs.get('counter', [ 1000 ]) 
    def repeat(now = True, 
       times = self._frangeGen(start = loop.time(), 
             stop=loop.time()+nb[0]*interval, 
             step = interval)): 
     if now: 
      funcToCall(*args, **kwargs) 
     loop.call_at(next(times), repeat) 

    repeat(now = now) 

И это, как я использую его (getAllData это функция, управление последовательной связи):

ts = 0.01 
nbOfSamples = 1000 
loop = asyncio.get_event_loop() 
callEvery(loop, ts, countDown, loop, getAllData, counter = [nbOfSamples]) 
loop.run_forever() 

Я хочу поставить этот блок в функцию и называть ее так часто, как я хочу, примерно так:

for i in range(nbOfMeasures): 
    myFunction() 
    processData() 

Но секция Тест ond не вызывает getAllData 1000 раз, только дважды, иногда три раза. Интересный факт - один раз в два, я получаю столько данных, сколько хочу. Я действительно не понимаю, и я не могу найти что-либо в документах, поэтому я прошу вас о помощи. Любое объяснение или более простой способ сделать это с радостью приветствуется :)

+0

Я не изучил ваш код в глубину, однако, возможно, в вашем случае просто вызвать funcToCall, запустив его внутри задачи asyncio.async()? Вы можете использовать функцию asyncio.sleep() внутри цикла True внутри вашего funcToCall, чтобы позволить ему «спать» на заданную продолжительность. Просто создайте столько задач, сколько вам нужно, и они будут эффективно запускаться одновременно в одном цикле. – shongololo

ответ

1

Вы слишком усложняете и, вообще говоря, занимаетесь рекурсией, когда у вас есть цикл событий - плохой дизайн. asyncio is fun только, если вы используете сопрограммы. Вот один из способов сделать это:

import asyncio as aio 

def get_laser_data(): 
    """ 
    get data from the laser using blocking IO 
    """ 
    ... 


@aio.coroutine 
def get_samples(loop, m, n): 
    """ 
    loop = asyncio event loop 
    m = number of samples 
    n = time between samples 
    """ 
    samples = [] 
    while len(samples) < m: 
    sample = yield from loop.run_in_executor(None, get_laser_data) 
    samples.append(sample) 
    yield from aio.sleep(n) 

    return samples 

@aio.coroutine 
def main(loop): 
    for i in range(nbOfMeasures): 
    samples = yield from get_samples(loop, 1000, 0.01) 
    ... 

loop = aio.get_event_loop() 
loop.run_until_complete(main(loop)) 
loop.close() 

Если вы полностью сбиты с толку в этом, рассмотрим чтение некоторые учебники/документацию о asyncio.

Но я хотел бы указать, что вы должны использовать для получения данных от лазерного датчика. Выполнение любого блокирующего ввода-вывода в том же потоке, что и цикл событий, будет блокировать цикл и сбросить aio.sleep. Это то, что делает yield from loop.run_in_executor(None, get_laser_data). Он запускает функцию get_laser_data в отдельном потоке.

+0

Спасибо за ответ. Я медленно понимаю, как все работает, но сопрограммы все еще сопротивляются мне. В любом случае, ваше решение не соответствует моей потребности, так как оно сходит ** после ** выполнения функции 'get_laser_data', поэтому оно будет вызываться каждые n секунд + время для выполнения' get_laser_data'. –

+0

Вы можете вызвать 'loop.time()' до и после вызова 'get_laser_data', а затем удалить разницу во сне:' aio.sleep (n- (after-before)) '. Но у вас всегда будет некоторая ошибка из-за разрешения часов asycnio. Внутри он использует «монотонные» часы из модуля 'time'. Если вам нужно более точное время, я бы рекомендовал использовать 'perf_counter' из' time' & вместо того, чтобы использовать 'asyncio' написать простой цикл для сна. –

+0

Я попробовал простой цикл while с функцией сна (это была моя первая попытка на самом деле). Я недостаточно точен, даже с 'time.perf_counter()', но я думаю, что это связано с точностью функции 'time.sleep()'. Я пришел к уродливому решению, используя цикл, поэтому на данный момент я пытаюсь использовать класс threading.Timer, чтобы найти более чистый способ. –

0

В python 3.5 вы можете использовать синтаксис async for и создать asynchronous iterator для управления вашими временными рамками. Он должен осуществлять __aiter__ и __anext__ методы:

class timeframes(collections.AsyncIterator): 

    def __init__(self, steps, delay=1.0, *, loop=None): 
     self.loop = asyncio.get_event_loop() if loop is None else loop 
     self.ref = self.loop.time() 
     self.delay = delay 
     self.steps = steps 
     self.iter = iter(range(steps)) 

    async def __anext__(self): 
     try: 
      when = self.ref + next(self.iter) * self.delay 
     except StopIteration: 
      raise StopAsyncIteration 
     else: 
      future = asyncio.Future() 
      self.loop.call_at(when, future.set_result, None) 
      await future 
      return self.loop.time() 

    async def __aiter__(self): 
     return self 

Вот сопрограммная, который имитирует выполнение:

async def simulate(steps, delay, execution): 
    # Prepare timing 
    start = loop.time() 
    expected = steps * delay - delay + execution 
    # Run simulation 
    async for t in timeframes(steps, delay): 
     await loop.run_in_executor(None, time.sleep, execution) 
    # Return error 
    result = loop.time() - start 
    return result - expected 

И это своего рода результат вы получите на ОС Linux:

>>> loop = asyncio.get_event_loop() 
>>> simulation = simulate(steps=1000, delay=0.020, execution=0.014) 
>>> error = loop.run_until_complete(simulation) 
>>> print("Overall error = {:.3f} ms".format(error * 1000)) 
Overall error = 1.199 ms 

Он отличается от ОС Windows (см. this answer), но цикл событий будет догонять, а общая ошибка не должна превышать 15 мс.

Смежные вопросы