2013-11-27 3 views
2

Я пытаюсь реализовать функцию тайм-аута в Python.Обработка исключений для асинхронных потоков python

Он работает путем обертывания функций с помощью декоратора функций, который вызывает функцию как поток, но также вызывает поток «сторожевого таймера», который будет вызывать исключение в потоке функций по истечении заданного периода.

В настоящее время он работает для потоков, которые не спят. Во время вызова do_rand я подозреваю, что «асинхронное» исключение фактически вызывается после вызова time.sleep и после того, как выполнение переместилось за пределы блока try/except, так как это объясняет ошибку Unhandled exception in thread started by. Кроме того, ошибка вызова do_rand генерируется через 7 секунд после вызова (продолжительность time.sleep).

Как бы я начал «пробуждать» поток вверх (используя ctypes?), Чтобы заставить его ответить на асинхронное исключение?

Возможно, совсем другой подход?

Код:

# Import System libraries 
import ctypes 
import random 
import sys 
import threading 
import time 

class TimeoutException(Exception): 
    pass 

def terminate_thread(thread, exc_type = SystemExit): 
    """Terminates a python thread from another thread. 

    :param thread: a threading.Thread instance 
    """ 
    if not thread.isAlive(): 
     return 

    exc = ctypes.py_object(exc_type) 
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(thread.ident), exc) 

    if res == 0: 
     raise ValueError("nonexistent thread id") 
    elif res > 1: 
     # """if it returns a number greater than one, you're in trouble, 
     # and you should call it again with exc=NULL to revert the effect""" 
     ctypes.pythonapi.PyThreadState_SetAsyncExc(thread.ident, None) 
     raise SystemError("PyThreadState_SetAsyncExc failed") 

class timeout_thread(threading.Thread): 
    def __init__(self, interval, target_thread): 
     super(timeout_thread, self).__init__() 
     self.interval  = interval 
     self.target_thread = target_thread 
     self.done_event = threading.Event() 
     self.done_event.clear() 

    def run(self): 
     timeout = not self.done_event.wait(self.interval) 
     if timeout: 
      terminate_thread(self.target_thread, TimeoutException) 

class timeout_wrapper(object): 
    def __init__(self, interval = 300): 
     self.interval = interval 

    def __call__(self, f): 
     def wrap_func(*args, **kwargs): 
      thread = threading.Thread(target = f, args = args, kwargs = kwargs) 
      thread.setDaemon(True) 
      timeout_ticker = timeout_thread(self.interval, thread) 
      timeout_ticker.setDaemon(True) 
      timeout_ticker.start() 
      thread.start() 
      thread.join() 
      timeout_ticker.done_event.set() 
     return wrap_func 

@timeout_wrapper(2) 
def print_guvnah(): 
    try: 
     while True: 
      print "guvnah" 

    except TimeoutException: 
     print "blimey" 

def print_hello(): 
    try: 
     while True: 
      print "hello" 

    except TimeoutException: 
     print "Whoops, looks like I timed out" 

def do_rand(*args): 
    try: 
     rand_num = 7 #random.randint(0, 10) 
     rand_pause = 7 #random.randint(0, 5) 
     print "Got rand: %d" % rand_num 
     print "Waiting for %d seconds" % rand_pause 
     time.sleep(rand_pause) 
    except TimeoutException: 
     print "Waited too long" 

print_guvnah() 
timeout_wrapper(3)(print_hello)() 
timeout_wrapper(2)(do_rand)() 

ответ

1

Проблема в том, что time.sleep блоков. И он блокирует очень тяжело, поэтому единственное, что может фактически прервать его, - это сигналы процесса. Но код с ним становится очень грязным, а в некоторых случаях даже сигналы не работают (например, если вы блокируете socket.recv(), см. Это: recv() is not interrupted by a signal in multithreaded environment).

Как правило, прерывание потока (без уничтожения всего процесса) не может быть выполнено (не говоря уже о том, что кто-то может просто переопределить обработку сигнала из потока).

Но в данном случае вместо использования time.sleep вы можете использовать Event класс от резьбы модуля:

резьбы 1

from threading import Event 

ev = Event() 
ev.clear() 

state = ev.wait(rand_pause) # this blocks until timeout or .set() call 

резьбы 2 (убедитесь, что он имеет доступ к тем же ev экземпляры)

ev.set() # this will unlock .wait above 

Обратите внимание, что state будет внутренним состоянием события. Таким образом, state == True будет означать, что он был разблокирован .set(), тогда как state == False будет означать, что время ожидания.

Подробнее о событиях здесь:

http://docs.python.org/2/library/threading.html#event-objects

+0

Спасибо за ответ, однако, в центре внимания этого кода должна быть библиотека (надеюсь). В результате я действительно не хочу ограничивать код пользователя, чтобы исключить использование функции time.sleep или любой другой функции блокировки. 'do_rand' - всего лишь тест. Я надеялся, что, возможно, я могу изменить PyThreadState с помощью ctypes (или аналогичных): https://gist.github.com/gdementen/5635324#file-recursive_ctypes_struct-py. – dilbert

+0

@dilbert Как я уже сказал: системные вызовы, такие как 'recv()' may (и будут в случае блокировки 'recv()'), будут блокировать ваш поток неопределенно. И вы не сможете прервать его без (силы) убийства всего процесса. Таким образом, я думаю, что вы далеко продвинетесь с этим, проблема не может быть решена. Но вопрос, который вы должны задать, - это проблема в первую очередь? Возможно, запрет на блокировку всей нити неплохо? – freakish

+0

В Windows только основной поток может быть прерван из 'time.sleep'. Он использует 'WaitForSingleObject' в событии, в то время как все другие потоки используют uninterruptibile' Sleep'. Вызов 'select', используемый' time.sleep' в системах POSIX, может быть прерван с помощью 'pthread_kill' для нацеливания потока. – eryksun

1

Вы должны были бы использовать что-то другое, чем сон, или вы должны послать сигнал на другой поток, чтобы сделать его проснуться.

Один из вариантов, который я использовал, состоит в том, чтобы настроить пару дескрипторов файлов и использовать select или poll вместо sleep, это позволяет вам что-то писать в дескриптор файла, чтобы разбудить другой поток. В качестве альтернативы вы просто носите до тех пор, пока не закончите сон, если все, что вам нужно, - это операция, чтобы выполнить ошибку, потому что она занимала слишком много времени, и ничто другое не зависит от нее.

+0

Как бы я идти о посылая нить сигнал? И какой сигнал? – dilbert

+0

Модуль 'signal' будет делать это. Метод дескриптора файла, вероятно, более надежный. – Benno

+0

Я очень хочу избежать использования файлов. Значит, нет ничего особенного в сигнале, как в любом случае? – dilbert

Смежные вопросы