2013-12-02 2 views
2

Использование Linux и Python 2.7.6 У меня есть сценарий, который загружает много файлов за один раз. Я использую многопоточность с модулями Queue и Threading.Python Daemon Thread Clean Up Logic on Abrupt sys.exit()

Я реализовал обработчик для SIGINT, чтобы остановить скрипт, если пользователь нажимает ctrl-C. Я предпочитаю использовать потоки демона, поэтому мне не нужно очищать очередь, что потребует много повторного написания кода, чтобы заставить обработчик SIGINT иметь доступ к объекту Queue, поскольку обработчики не принимают параметры.

Чтобы убедиться, что потоки daemon закончены и очищены до sys.exit(), я использую threading.Event() и threading.clear(), чтобы заставить потоки ждать. Этот код, похоже, работает как print threading.enumerate() показывает только основной поток до завершения скрипта при отладке. Просто чтобы убедиться, мне было интересно, если есть какой-либо вид проницательности для этого очистить реализации, что я, возможно, отсутствует, хотя это, кажется, работает для меня:

def signal_handler(signal, frame): 
    global kill_received 
    kill_received = True 
    msg = (
     "\n\nYou pressed Ctrl+C!" 
     "\nYour logs and their locations are:" 
     "\n{}\n{}\n{}\n\n".format(debug, error, info)) 
    logger.info(msg) 
    threads = threading.Event() 
    threads.clear() 

    while True: 
     time.sleep(3) 
     threads_remaining = len(threading.enumerate()) 
     print threads_remaining 
     if threads_remaining == 1: 
      sys.exit() 

def do_the_uploads(file_list, file_quantity, 
     retry_list, authenticate): 
    """The uploading engine""" 
    value = raw_input(
     "\nPlease enter how many concurent " 
     "uploads you want at one time(example: 200)> ") 
    value = int(value) 
    logger.info('{} concurent uploads will be used.'.format(value)) 

    confirm = raw_input(
     "\nProceed to upload files? Enter [Y/y] for yes: ").upper() 
    if confirm == "Y": 
     kill_received = False 
     sys.stdout.write("\x1b[2J\x1b[H") 
     q = CustomQueue() 

     def worker(): 
      global kill_received 
      while not kill_received: 
       item = q.get() 
       upload_file(item, file_quantity, retry_list, authenticate, q) 
       q.task_done() 

     for i in range(value): 
      t = Thread(target=worker) 
      t.setDaemon(True) 
      t.start() 

     for item in file_list: 
      q.put(item) 

     q.join() 

     print "Finished. Cleaning up processes...", 
     #Allowing the threads to cleanup 
     time.sleep(4) 



def upload_file(file_obj, file_quantity, retry_list, authenticate, q): 
    """Uploads a file. One file per it's own thread. No batch style. This way if one upload 
     fails no others are effected.""" 
    absolute_path_filename, filename, dir_name, token, url = file_obj 
    url = url + dir_name + '/' + filename 
    try: 
     with open(absolute_path_filename) as f: 
      r = requests.put(url, data=f, headers=header_collection, timeout=20) 
    except requests.exceptions.ConnectionError as e: 
     pass 
    if src_md5 == r.headers['etag']: 
     file_quantity.deduct() 

ответ

4

Если вы хотите обрабатывать Ctrl+C; достаточно обработать исключение KeyboardInterrupt в основной теме. Не используйте global X в функции, если вы не используете X = some_value. Использование time.sleep(4), чтобы очистка потоков была запахом кода. Вам это не нужно.

Я использую threading.Event() и threading.clear(), чтобы заставить потоки ждать.

Этот код имеет никакого эффекта на ваших нитей:

# create local variable 
threads = threading.Event() 
# clear internal flag in it (that is returned by .is_set/.wait methods) 
threads.clear() 

Не называйте logger.info() из обработчика сигнала в многопоточной программе. Это может затормозить вашу программу. Из обработчика сигналов можно вызывать только ограниченный набор функций. Безопасный вариант установить глобальный флаг в нем и выходе:

def signal_handler(signal, frame): 
    global kill_received 
    kill_received = True 
    # return (no more code) 

сигнал не может быть отложен до q.join() возвращается. Даже если сигнал был доставлен немедленно; q.get() блокирует дочерние потоки. Они висят, пока главный нить не выйдет. Чтобы исправить оба проблем, вы можете использовать часовой сигнализировать дочерние процессы, нет больше работы, падение обработчика сигнала полностью в этом случае:

def worker(stopped, queue, *args): 
    for item in iter(queue.get, None): # iterate until queue.get() returns None 
     if not stopped.is_set(): # a simple global flag would also work here 
      upload_file(item, *args) 
     else: 
      break # exit prematurely 
    # do child specific clean up here 

# start threads 
q = Queue.Queue() 
stopped = threading.Event() # set when threads should exit prematurely 
threads = set() 
for _ in range(number_of_threads): 
    t = Thread(target=worker, args=(stopped, q)+other_args) 
    threads.add(t) 
    t.daemon = True 
    t.start() 

# provide work 
for item in file_list: 
    q.put(item) 
for _ in threads: 
    q.put(None) # put sentinel to signal the end 

while threads: # until there are alive child threads 
    try: 
     for t in threads: 
      t.join(.3) # use a timeout to get KeyboardInterrupt sooner 
      if not t.is_alive(): 
       threads.remove(t) # remove dead 
       break 
    except (KeyboardInterrupt, SystemExit): 
     print("got Ctrl+C (SIGINT) or exit() is called") 
     stopped.set() # signal threads to exit gracefully 

Я переименовал value в number_of_threads. Я использовал явные потоки

Если человек upload_file() блокирует, то программа не выйдет на Ctrl-C.

Ваше дело, кажется, достаточно простой для multiprocessing.Pool интерфейса:

from multiprocessing.pool import ThreadPool 
from functools import partial 

def do_uploads(number_of_threads, file_list, **kwargs_for_upload_file): 
    process_file = partial(upload_file, **kwargs_for_upload_file) 
    pool = ThreadPool(number_of_threads) # number of concurrent uploads 
    try: 
     for _ in pool.imap_unordered(process_file, file_list): 
      pass # you could report progress here 
    finally: 
     pool.close() # no more additional work 
     pool.join() # wait until current work is done 

Он должен изящно выйти на Ctrl-C т.е. загрузки, которые в процессе могут закончить, но новые закачка не началась.