2015-03-02 2 views
0

Я пробую несколько примеров многопроцессов, в основном: http://toastdriven.com/blog/2008/nov/11/brief-introduction-multiprocessing/, где я взял «простое приложение», которое использует многопроцессорный интерфейс для тестирования URL-адресов. Когда я использую его (в Python 3.3, в Windows в PyCharm IDE) с некоторыми изменениями, с большим количеством URL-адресов, мой скрипт никогда не останавливается, и я не понимаю, почему.Почему мой многопроцессорный скрипт Python никогда не заканчивается?

import httplib2 
import sys 
from multiprocessing import Lock, Process, Queue, current_process 

def worker(work_queue, done_queue): 
    for url in iter(work_queue.get, 'STOP'): 
     try: 
      print("In : %s - %s." % (current_process().name, url)) 
      status_code = print_site_status(url) 
      done_queue.put("%s - %s got %s." % (current_process().name, url, status_code)) 
     except: 
      done_queue.put("%s failed on %s with: %s" % (current_process().name, url, str(sys.exc_info()[0]))) 
    print("Out : %s " % (current_process().name)) 
    return True 

def print_site_status(url): 
    http = httplib2.Http(timeout=10) 
    headers, content = http.request(url) 
    return headers.get('status', 'no response') 

def main(): 
    workers = 8 
    work_queue = Queue() 
    done_queue = Queue() 
    processes = [] 
    with open("Annu.txt") as f: # file with URLs 
     lines = f.read().splitlines() 
    for surl in lines: 
     work_queue.put(surl) 

    for w in range(workers): 
     p = Process(target=worker, args=(work_queue, done_queue)) 
     p.start() 
     processes.append(p) 
     work_queue.put('STOP') 

    for p in processes: 
     p.join() 
    print("END") 
    done_queue.put('STOP') 

    for status in iter(done_queue.get, 'STOP'): 
     print(status) 

if __name__ == '__main__': 
    main() 

Я хорошо видеть статус URL-адресов испытанный и весь процесс сообщения «Out», которые указывают Hte конец процесса, но никогда мое сообщение «END». Список URL-адресов, которые я использую: http://www.pastebin.ca/2946850.

Итак ... где моя ошибка? Это дубликат с: Python multiprocessing threads never join when given large amounts of work?

Некоторые сведения: когда я подавляю 'done_queue' всюду в коде: это работает.

+0

Обратите внимание, что END не будет печататься в END, как вы думаете. Прокрутите вверх до половины, и вы увидите его там. – dopstar

+0

@dopstar Извините, но я ваш комментарий не ясен, вы говорите, что сообщение «END» не находится в конце сценария? Если да: я знаю, «END» здесь, чтобы сказать, что процессы завершены. – philnext

+0

Вы также распечатываете статус из сделанной очереди после того, как вы напечатали КОНЕЦ, чтобы это наложило END примерно на полпути. У вас есть 500 + URL-адресов, поэтому у вас будет 1k + print out с END где-то посередине. – dopstar

ответ

0

От Queuedocumentation:

Если опциональный арг блок истинна и тайм-аут не является None (по умолчанию), блок в случае необходимости, пока элемент не доступен.

Это означает, что ваши петли никогда не заканчиваются.

Вам необходимо добавить тайм-аут к get и остановить цикл, если вы получите исключение Empty или вам нужно выйти из цикла, когда вы получите сообщение STOP.

+2

Несомненно, небольшое количество URL-адресов работает. Я использую процессы только для того, чтобы учиться. Я тестировал несколько журналов, и кажется, что все URL-адреса обрабатываются, но процесс, похоже, не присоединился. – philnext

+0

Добавьте утверждения 'print' вокруг соединения, чтобы узнать, какой процесс не возвращается. И как вы проверили, что каждый URL-адрес обработан? Невооруженным глазом не удается проверить 1000 URL-адресов. –

+0

Я тестировал, что все URL-адреса обрабатываются и все «print» («Out:% s»% (current_process(). Name)). Кажется, что ВСЕ процессы не прибывают bacK – philnext

0

ОК, я нашел ответ (в документ Python: https://docs.python.org/3.4/library/multiprocessing.html#multiprocessing-programming):

Предупреждение Как упоминалось выше, если дочерний процесс поставил элементы на очереди (и он не используется JoinableQueue.cancel_join_thread), затем этот процесс не будет завершен до тех пор, пока все буферизованные элементы не будут , сброшенными в трубу.

Так изменить код:

print("Out : %s " % (current_process().name)) 
    return True 

По:

print("Out : %s " % (current_process().name)) 
    done_queue.cancel_join_thread() 
    return True 

Я не понимаю, почему исходный код работает с небольшим количеством адресов ...

Смежные вопросы