2016-01-10 4 views
12

Я пишу инструмент, который подключается к X номерам сокетов UNIX, отправляет команду и сохраняет результат в локальной файловой системе. Он запускается через каждые X секунд. Чтобы выполнить некоторую очистку, когда инструмент принимает сигналы окончания, я регистрирую функцию (выключение) на сигнал. Сигналы SIGHUP и signal.SIGTERM. Эта функция отменяет все задачи, а затем закрывает цикл событий.Правильный способ остановки задач asyncio

Моя проблема заключается в том, что я получаю

RuntimeError: Event loop stopped before Future completed

когда я посылаю signal.SIGTERM (убить 'PID'). Я прочитал документацию об отмене задач дважды, но я не заметил, что я делаю неправильно здесь.

Я также заметил что-то странное, когда я посылаю сигнал завершения, программа находится в спящем режиме, и я вижу в журнале, что он просыпает сопроводительный файл pull_stats(), вы можете видеть это в первых двух строках журнала ,

Log:

21:53:44,194 [23031] [MainThread:supervisor ] DEBUG **sleeping for 9.805s secs** 
21:53:45,857 [23031] [MainThread:pull_stats ] INFO  pull statistics 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin1.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin4.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin3.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin3.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin2.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin2.sock 
21:53:45,858 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin4.sock 
21:53:45,859 [23031] [MainThread:get   ] DEBUG connecting to UNIX socket /run/haproxy/admin1.sock 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  received stop signal, cancelling tasks... 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,859 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,860 [23031] [MainThread:shutdown ] INFO  True 
21:53:45,860 [23031] [MainThread:shutdown ] INFO  stopping event loop 
21:53:45,860 [23031] [MainThread:shutdown ] INFO  bye, exiting... 
Traceback (most recent call last): 
    File "./pull.py", line 249, in <module> 
    main() 
    File "./pull.py", line 245, in main 
    supervisor(loop, config) 
    File "./pull.py", line 161, in supervisor 
    config['pull']['socket-dir'], storage_dir, loop)) 
    File "/usr/lib/python3.4/asyncio/base_events.py", line 274, in run_until_complete 
    raise RuntimeError('Event loop stopped before Future completed.') 
RuntimeError: Event loop stopped before Future completed. 

Вот код:

def shutdown(loop): 
    LOGGER.info('received stop signal, cancelling tasks...') 
    for task in asyncio.Task.all_tasks(): 
     LOGGER.info(task.cancel()) 
    LOGGER.info('stopping event loop') 
    loop.stop() 
    LOGGER.info('bye, exiting...') 


def write_file(filename, data): 
    try: 
     with open(filename, 'w') as file_handle: 
      file_handle.write(data.decode()) 
    except OSError as exc: 
     return False 
    else: 
     return True 


@asyncio.coroutine 
def get(socket_file, cmd, storage_dir, loop): 
    connect = asyncio.open_unix_connection(socket_file) 
    reader, writer = yield from asyncio.wait_for(connect, 1) 

    writer.write('{c}\n'.format(c=cmd).encode()) 
    data = yield from reader.read() 
    writer.close() 

    filename = os.path.basename(socket_file) + '_' + cmd.split()[1] 
    filename = os.path.join(storage_dir, filename) 
    result = yield from loop.run_in_executor(None, write_file, filename, data) 

    return result 


@asyncio.coroutine 
def pull_stats(socket_dir, storage_dir, loop): 
    socket_files = glob.glob(socket_dir + '/*sock*') 
    coroutines = [get(socket_file, cmd, storage_dir, loop) 
        for socket_file in socket_files 
        for cmd in CMDS] 
    status = yield from asyncio.gather(*coroutines) 

    if len(set(status)) == 1 and True in set(status): 
     return True 
    else: 
     return False 


def supervisor(loop, config): 
    dst_dir = config.get('pull', 'dst-dir') 
    tmp_dst_dir = config.get('pull', 'tmp-dst-dir') 

    while True: 
     start_time = int(time.time()) 
     storage_dir = os.path.join(tmp_dst_dir, str(start_time)) 

     try: 
      os.makedirs(storage_dir) 
     except OSError as exc: 
      msg = "failed to create directory {d}:{e}".format(d=storage_dir, 
                   e=exc) 
      LOGGER.critical(msg) 

     # Launch all connections. 
     result = loop.run_until_complete(pull_stats(
      config['pull']['socket-dir'], storage_dir, loop)) 

     if result: 
      try: 
       shutil.move(storage_dir, dst_dir) 
      except OSError as exc: 
       LOGGER.critical("failed to move %s to %s: %s", storage_dir, 
           dst_dir, exc) 
       break 
      else: 
       LOGGER.info('statistics are saved in %s', os.path.join(
        dst_dir, os.path.basename(storage_dir))) 
     else: 
      LOGGER.critical('failed to pull stats') 
      shutil.rmtree(storage_dir) 

     sleep = config.getint('pull', 'pull-interval') - (time.time() - 
                  start_time) 
     if 0 < sleep < config.getint('pull', 'pull-interval'): 
      time.sleep(sleep) 
    loop.close() 
    sys.exit(1) 


def main(): 
    args = docopt(__doc__, version=VERSION) 
    config = ConfigParser(interpolation=ExtendedInterpolation()) 
    config.read_dict(copy.copy(DEFAULT_OPTIONS)) 
    config.read(args['--file']) 

    loop = asyncio.get_event_loop() 

    loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop)) 
    loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop)) 

    num_level = getattr(logging, config.get('pull', 'loglevel').upper(), None) 
    LOGGER.setLevel(num_level) 

    supervisor(loop, config) 

# This is the standard boilerplate that calls the main() function. 
if __name__ == '__main__': 
    main() 

ответ

6

отмена не сразу и требует работает ioloop быть решена за исключением CancelledError. Удалите ioloop.stop из выключения и обработайте исключение в диспетчере, чтобы все работало. Ниже упрощенный пример.

Важно, однако вы можете отменить Task, он останавливает просмотр/ожидание конца/результатов, и цикл не будет обрабатывать дополнительные события для него. Но нижний запрос/труба не будет остановлен.

Упрощенный пример:

import asyncio 
import functools 
import logging 
import signal 
import sys 
from concurrent.futures import CancelledError 


def shutdown(loop): 
    logging.info('received stop signal, cancelling tasks...') 
    for task in asyncio.Task.all_tasks(): 
     task.cancel() 
    logging.info('bye, exiting in a minute...')  


@asyncio.coroutine 
def get(i): 
    logging.info('sleep for %d', i) 
    yield from asyncio.sleep(i)  


@asyncio.coroutine 
def pull_stats(): 
    coroutines = [get(i) for i in range(10,20)] 
    status = yield from asyncio.gather(*coroutines) 


def supervisor(loop): 
    try: 
     while True: 
      result = loop.run_until_complete(pull_stats()) 
    except CancelledError: 
     logging.info('CancelledError') 
    loop.close() 
    sys.exit(1) 


def main(): 
    logging.getLogger().setLevel(logging.INFO) 
    loop = asyncio.get_event_loop() 
    loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop)) 
    loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop)) 
    supervisor(loop) 


if __name__ == '__main__': 
    main() 

Обратите внимание, что если вы отмените только gather's будущее, все дети будут установлены как отменили, а также.

И сон вещь

Любой прием сигнала или прерывание заставляет программу возобновить выполнение. Поэтому, когда процесс получает SIGTERM и обработчик, python позволяет вам обрабатывать его, чтобы возобновить эту нить и вызывать sighandler. Благодаря реализации ioloop и обработке сигнала, он продолжает работать после пробуждения.

+0

Я изменил код, как вы предложили, и он ловит исключение, но я все еще вижу, что pull_stats() будет пробуждаться, когда я отправить сигнал TERM. В вашем примере кода я не вижу, что это происходит. Я не совсем понимаю ваше утверждение о сне. Вы предполагаете, что сон предотвращает остановку потока? Кроме того, как мне распространять отмену во всех сопрограммах, чтобы я мог выполнять очистку? Большое спасибо @kwarunek за ваш ответ и ваше время, чтобы предоставить пример кода, очень ценный –

+1

Я немного отредактировал о SIGTERM, также он не рассматривается в примере. – kwarunek

+0

@kwarunke, теперь это имеет смысл. Таким образом, когда отмена отправляется, задача возобновляется на последней строке доходности, где coroutine в настоящее время приостановлена. В моем случае я нахожусь в линии сна, поступает сигнал, основная нить пробуждается от сна, в то время как True запускает все фьючерсы, которые затем получают отмену, но coroutines pull_stats и пробуждаются, но не действуют, поскольку они отменены. Я все еще пытаюсь понять способ улавливать отмену, когда программа находится в фазе соединения/приема/записи, так как я хотел бы выполнить некоторые чистые. Еще раз большое спасибо за вашу помощь. –

0

Обновление: Код работает как ожидается на python 3.4.4, см. Мой комментарий ниже. @kwarunek, когда вы упомянули о своем последнем комментарии о том, что ioloop продолжает работать, я не получил его, так как мой код работал, убивая процесс, отправляет отмену всех задач, которые просыпаются. Но теперь я вижу вашу точку, потому что отмена задач не запускается с 3.4.4, а 3.4.2 - это хорошо.

21:28:09,004 [59441] [MainThread:supervisor] CRITICAL failed to pull stats 
<killing process> 
21:28:11,826 [59441] [MainThread:supervisor] INFO  starting while loop 
21:28:11,827 [59441] [MainThread:supervisor] INFO  launch the delegating coroutine 
21:28:11,828 [59441] [MainThread:shutdown] INFO  received stop signal 
21:28:11,828 [59441] [MainThread:shutdown] INFO  <Task finished coro=<pull_stats() done, defined at /opt/blue-python/3.4/lib/python3.4/site-packages/haproxystats/pull.py:124> result=False> 
21:28:11,829 [59441] [MainThread:shutdown] INFO  cancelling task 
21:28:11,829 [59441] [MainThread:supervisor] INFO  delegating coroutine finished 
21:28:11,829 [59441] [MainThread:supervisor] CRITICAL failed to pull stats 
21:28:21,009 [59441] [MainThread:supervisor] INFO  starting while loop 
21:28:21,010 [59441] [MainThread:supervisor] INFO  launch the delegating coroutine 
21:28:21,011 [59441] [MainThread:supervisor] INFO  delegating coroutine finished 
2016-01-30 21:28:21,011 [59441] [MainThread:supervisor] CRITICAL failed to pull stats 

в то время как в Python 3.4.2

21:23:51,015 [10219] [MainThread:supervisor] CRITICAL failed to pull stats 
<killing process> 
21:23:55,737 [10219] [MainThread:supervisor] INFO  starting while loop 
21:23:55,737 [10219] [MainThread:supervisor] INFO  launch the delegating coroutine 
21:23:55,740 [10219] [MainThread:shutdown] INFO  received stop signal 
21:23:55,740 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,740 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,740 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,741 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,741 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,741 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,741 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,741 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,741 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,741 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,742 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,742 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,742 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,742 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,742 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,742 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,742 [10219] [MainThread:shutdown] INFO  <Task finished coro=<pull_stats() done, defined at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:124> result=False> 
21:23:55,743 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,743 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(0)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,743 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,743 [10219] [MainThread:shutdown] INFO  <Task finished coro=<pull_stats() done, defined at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:124> result=False> 
21:23:55,744 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,744 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(7)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,744 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,744 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(4)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,745 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,745 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(5)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,745 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,745 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,746 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,746 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(3)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,746 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,746 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,747 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,747 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future pending cb=[Task._wakeup()]> cb=[gather.<locals>._done_callback(6)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,747 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,747 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future pendi...sk._wakeup()]>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,747 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,747 [10219] [MainThread:shutdown] INFO  <Task pending coro=<pull_stats() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:150> wait_for=<_GatheringFuture pending cb=[Task._wakeup()]> cb=[_raise_stop_error() at /usr/lib/python3.4/asyncio/base_events.py:101]> 
21:23:55,748 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,748 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback(2)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,748 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,748 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,748 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,749 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,749 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,749 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,749 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,749 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,750 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,750 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,750 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,750 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,750 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,751 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,751 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,751 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,751 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,751 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,751 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,752 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,752 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,752 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,752 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,752 [10219] [MainThread:shutdown] INFO  <Task finished coro=<open_unix_connection() done, defined at /usr/lib/python3.4/asyncio/streams.py:107> exception=ConnectionRefusedError(111, 'Connection refused')> 
21:23:55,752 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,752 [10219] [MainThread:shutdown] INFO  <Task pending coro=<get() running at /home/pparissis/.virtualenvs/python3/lib/python3.4/site-packages/haproxystats/pull.py:93> wait_for=<Future cancelled> cb=[gather.<locals>._done_callback(1)() at /usr/lib/python3.4/asyncio/tasks.py:582]> 
21:23:55,753 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,753 [10219] [MainThread:shutdown] INFO  <Task pending coro=<open_unix_connection() running at /usr/lib/python3.4/asyncio/streams.py:107> cb=[_release_waiter(<Future cancelled>)() at /usr/lib/python3.4/asyncio/tasks.py:334]> 
21:23:55,753 [10219] [MainThread:shutdown] INFO  cancelling task 
21:23:55,754 [10219] [MainThread:supervisor] INFO  Received CancelledError exception 
21:23:55,754 [10219] [MainThread:supervisor] INFO  waiting for threads to finish any pending IO tasks 
21:23:55,754 [10219] [MainThread:supervisor] INFO  closing our asyncio loop 
21:23:55,755 [10219] [MainThread:supervisor] INFO  exiting with status 0 

Основное различие, когда отключение() посылает отмену нет задач пробуждают и в результате цикл, пока не остановлен попробуйте блок catch, который обрабатывает отмену. Как мне решить это сейчас ?!

здесь код

def shutdown(): 
    """Performs a clean shutdown""" 
    log.info('received stop signal') 
    for task in asyncio.Task.all_tasks(): 
     log.info(task) 
     log.info('cancelling task') 
     task.cancel() 


def write_file(filename, data): 
    """Writes data to a file. 

    Returns: 
     True if succeeds False otherwise. 
    """ 
    try: 
     with open(filename, 'w') as file_handle: 
      file_handle.write(data.decode()) 
    except OSError as exc: 
     log.critical('failed to write data %s', exc) 
     return False 
    else: 
     log.debug('data saved in %s', filename) 
     return True 


@asyncio.coroutine 
def get(socket_file, cmd, storage_dir, loop, executor, timeout): 
    """Fetches data from a UNIX socket. 

    Sends a command to HAProxy over UNIX socket, reads the response and then 
    offloads the writing of the received data to a thread, so we don't block 
    this coroutine. 

    Arguments: 
     socket_file (str): The full path of the UNIX socket file to connect to. 
     cmd (str): The command to send. 
     storage_dir (str): The full path of the directory to save the response. 
     loop (obj): A base event loop from asyncio module. 
     executor (obj): A Threader executor to execute calls asynchronously. 
     timeout (int): Timeout for the connection to the socket. 

    Returns: 
     True if statistics from a UNIX sockets are save False otherwise. 
    """ 
    # try to connect to the UNIX socket 
    connect = asyncio.open_unix_connection(socket_file) 
    log.debug('connecting to UNIX socket %s', socket_file) 
    try: 
     reader, writer = yield from asyncio.wait_for(connect, timeout) 
    except (ConnectionRefusedError, PermissionError, OSError) as exc: 
     log.critical(exc) 
     return False 
    else: 
     log.debug('connection established to UNIX socket %s', socket_file) 

    log.debug('sending command "%s" to UNIX socket %s', cmd, socket_file) 
    writer.write('{c}\n'.format(c=cmd).encode()) 
    data = yield from reader.read() 
    writer.close() 

    if len(data) == 0: 
     log.critical('received zero data') 
     return False 

    log.debug('received data from UNIX socket %s', socket_file) 

    suffix = CMD_SUFFIX_MAP.get(cmd.split()[1]) 
    filename = os.path.basename(socket_file) + suffix 
    filename = os.path.join(storage_dir, filename) 
    log.debug('going to save data to %s', filename) 
    # Offload the writing to a thread so we don't block ourselves. 
    result = yield from loop.run_in_executor(executor, 
              write_file, 
              filename, 
              data) 

    return result 


@asyncio.coroutine 
def pull_stats(config, storage_dir, loop, executor): 
    """Launches coroutines for pulling statistics from UNIX sockets. 

    This a delegating routine. 

    Arguments: 
     config (obj): A configParser object which holds configuration. 
     storage_dir (str): The absolute directory path to save the statistics. 
     loop (obj): A base event loop. 
     executor(obj): A ThreadPoolExecutor object. 

    Returns: 
     True if statistics from *all* UNIX sockets are fetched False otherwise. 
    """ 
    # absolute directory path which contains UNIX socket files. 
    socket_dir = config.get('pull', 'socket-dir') 
    timeout = config.getint('pull', 'timeout') 
    socket_files = [f for f in glob.glob(socket_dir + '/*') 
        if is_unix_socket(f)] 

    log.debug('pull statistics') 
    coroutines = [get(socket_file, cmd, storage_dir, loop, executor, timeout) 
        for socket_file in socket_files 
        for cmd in CMDS] 
    # Launch all connections. 
    status = yield from asyncio.gather(*coroutines) 

    return len(set(status)) == 1 and True in set(status) 


def supervisor(loop, config): 
    """Coordinates the pulling of HAProxy statistics from UNIX sockets. 

    This is the client routine which launches requests to all HAProxy 
    UNIX sockets for retrieving statistics and save them to file-system. 
    It runs indefinitely until main program is terminated. 

    Arguments: 
     loop (obj): A base event loop from asyncio module. 
     config (obj): A configParser object which holds configuration. 
    """ 
    dst_dir = config.get('pull', 'dst-dir') 
    tmp_dst_dir = config.get('pull', 'tmp-dst-dir') 
    executor = ThreadPoolExecutor(max_workers=config.getint('pull', 'workers')) 
    exit_code = 1 

    while True: 
     log.info('starting while loop') 
     start_time = int(time.time()) 
     # HAProxy statistics are stored in a directory and we use retrieval 
     # time(seconds since the Epoch) as a name of the directory. 
     # We first store them in a temporary place until we receive statistics 
     # from all UNIX sockets. 
     storage_dir = os.path.join(tmp_dst_dir, str(start_time)) 

     # If our storage directory can't be created we can't do much, thus 
     # abort main program. 
     try: 
      os.makedirs(storage_dir) 
     except OSError as exc: 
      msg = "failed to make directory {d}:{e}".format(d=storage_dir, 
                  e=exc) 
      log.critical(msg) 
      log.critical('a fatal error has occurred, exiting..') 
      break 

     try: 
      log.info('launch the delegating coroutine') 
      result = loop.run_until_complete(pull_stats(config, storage_dir, 
                 loop, executor)) 
      log.info('delegating coroutine finished') 
     except asyncio.CancelledError: 
      log.info('Received CancelledError exception') 
      exit_code = 0 
      break 

     # if and only if we received statistics from all sockets then move 
     # statistics to the permanent directory. 
     # NOTE: when temporary and permanent storage directory are on the same 
     # file-system the move is actual a rename, which is an atomic 
     # operation. 
     if result: 
      log.debug('move %s to %s', storage_dir, dst_dir) 
      try: 
       shutil.move(storage_dir, dst_dir) 
      except OSError as exc: 
       log.critical("failed to move %s to %s: %s", storage_dir, 
          dst_dir, exc) 
       log.critical('a fatal error has occurred, exiting..') 
       break 
      else: 
       log.info('statistics are stored in %s', os.path.join(
        dst_dir, os.path.basename(storage_dir))) 
     else: 
      log.critical('failed to pull stats') 
      log.debug('removing temporary directory %s', storage_dir) 
      shutil.rmtree(storage_dir) 

     # calculate sleep time which is interval minus elapsed time. 
     sleep = config.getint('pull', 'pull-interval') - (time.time() - 
                  start_time) 
     if 0 < sleep < config.getint('pull', 'pull-interval'): 
      log.debug('sleeping for %.3fs secs', sleep) 
      time.sleep(sleep) 

    # It is very unlikely that threads haven't finished their job by now, but 
    # they perform disk IO operations which can take some time in certain 
    # situations, thus we want to wait for them in order to perform a clean 
    # shutdown. 
    log.info('waiting for threads to finish any pending IO tasks') 
    executor.shutdown(wait=True) 
    log.info('closing our asyncio loop') 
    loop.close() 
    log.info('exiting with status %s', exit_code) 
    sys.exit(exit_code) 


def main(): 
    """Parses CLI arguments and launches main program.""" 
    args = docopt(__doc__, version=VERSION) 

    config = ConfigParser(interpolation=ExtendedInterpolation()) 
    # Set defaults for all sections 
    config.read_dict(copy.copy(DEFAULT_OPTIONS)) 
    # Load configuration from a file. NOTE: ConfigParser doesn't warn if user 
    # sets a filename which doesn't exist, in this case defaults will be used. 
    config.read(args['--file']) 

    if args['--print']: 
     for section in sorted(DEFAULT_OPTIONS): 
      print("[{}]".format(section)) 
      for key, value in sorted(DEFAULT_OPTIONS[section].items()): 
       print("{k} = {v}".format(k=key, v=value)) 
      print() 
     sys.exit(0) 
    if args['--print-conf']: 
     for section in sorted(config): 
      print("[{}]".format(section)) 
      for key, value in sorted(config[section].items()): 
       print("{k} = {v}".format(k=key, v=value)) 
      print() 
     sys.exit(0) 

    log.setLevel(getattr(logging, config.get('pull', 'loglevel').upper(), 
         None)) 
    # Setup our event loop 
    loop = asyncio.get_event_loop() 

    # Register shutdown to signals 
    loop.add_signal_handler(signal.SIGHUP, shutdown) 
    loop.add_signal_handler(signal.SIGTERM, shutdown) 

    # a temporary directory to store fetched data 
    tmp_dst_dir = config['pull']['tmp-dst-dir'] 
    # a permanent directory to move data from the temporary directory. Data are 
    # picked up by the process daemon from that directory. 
    dst_dir = config['pull']['dst-dir'] 
    for directory in dst_dir, tmp_dst_dir: 
     try: 
      os.makedirs(directory) 
     except OSError as exc: 
      # errno 17 => file exists 
      if exc.errno != 17: 
       sys.exit("failed to make directory {d}:{e}".format(d=directory, 
                    e=exc)) 
    supervisor(loop, config) 

# This is the standard boilerplate that calls the main() function. 
if __name__ == '__main__': 
    main() 
+0

Обнаружена проблема. в системе, где я использовал python 3.4.4, coroutine pull_stats не расписал получение сопрограммы, поскольку список socket_files пуст. Это объясняет сообщение [MainThread: shutdown] INFO <Завершено завершение завершенного coro = result = False> Поскольку задача завершена, аннулирование не происходит и, как следствие, попытка catch никогда не получает исключения, чтобы вызвать выход программы. В другом поле с 3.4.4, где список сокетов_ * не * пуст, отменяется –

Смежные вопросы