1

Я довольно новичок в python (я в основном пишу код на Java). У меня есть скрипт python, который по сути является сканером. Он вызывает phantomjs, который загружает страницу, возвращает ее источник и список URL-адресов, найденных на этой странице.Использование общей очереди, в которой рабочие могут добавлять задачи к

Я пытался использовать Python 3's multiprocessing, чтобы сделать это, но я не могу понять, как использовать общую очередь, к которой могут добавить и рабочие. Я получаю непредсказуемые результаты.

В моем предыдущем подходе использовался глобальный список URL-адресов, из которого я извлек кусок и отправил работникам, используя map_async. В конце я собирал все возвращенные URL-адреса и добавлял их в глобальный список. Проблема в том, что каждый «кусок» занимает столько же времени, сколько и самый медленный рабочий. Я пытаюсь изменить его, чтобы всякий раз, когда рабочий был сделан, он мог выбрать следующий URL-адрес. Однако я не думаю, что делаю это правильно. Вот что у меня есть до сих пор:

def worker(url, urls): 
    print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " loading " + url) 
    returned_urls = phantomjs(url) 
    print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " returning " + str(len(returned_urls)) + " URLs") 

    for returned_url in returned_urls: 
     urls.put(returned_url, block=True) 

    print("There are " + str(urls.qsize()) + " URLs in total.\n") 

if __name__ == '__main__':  
    manager = multiprocessing.Manager() 
    urls = manager.Queue() 
    urls.append(<some-url>) 

    pool = Pool() 
    while True: 
     url = urls.get(block=True) 
     pool.apply_async(worker, (url, urls)) 

    pool.close() 
    pool.join() 

Если есть лучший способ сделать это, пожалуйста, дайте мне знать. Я просматриваю известный сайт, и конечное условие завершения - это когда нет URL-адресов для обработки. Но сейчас похоже, что я буду продолжать работать навсегда. Я не уверен, буду ли я использовать queue.empty(), потому что он говорит, что он не является надежным.

+0

См. Связанные: http://stackoverflow.com/questions/17241663/filling-a-queue-and-managing-multiprocessing-in-python (* Я не думаю, что ваш шаблон дизайна совершенно прав *) Я считаю вы хотите, чтобы сотрудники N обращались к общей очереди совместно. –

+0

@JamesMills Этот пример имеет большой смысл! Можно ли добавить в очередь в 'worker_main'? –

+0

Кроме того, я пробовал это, и похоже, что он выходит почти сразу, даже с 'time.sleep (10)'. Для вызова phantomjs требуется некоторое время, но сценарий завершается до этого. –

ответ

0

Вот что я бы, вероятно:

def worker(url, urls): 
    print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " loading " + url) 
    returned_urls = phantomjs(url) 
    print(multiprocessing.current_process().name + "." + str(multiprocessing.current_process().pid) + " returning " + str(len(returned_urls)) + " URLs") 

     for returned_url in returned_urls: 
      urls.put(returned_url, block=True) 

     # signal finished processing this url 
     urls.put('no-url') 

    print("There are " + str(urls.qsize()) + " URLs in total.\n") 

if __name__ == '__main__':  
    manager = multiprocessing.Manager() 
    pool = Pool() 
    urls = manager.Queue() 

    # start first url before entering loop 
    counter = 1 
    pool.apply_async(worker, (<some-url>, urls)) 

    while counter > 0: 
     url = urls.get(block=True) 
     if url == 'no-url': 
      # a url has finished processing 
      counter -= 1 
     else: 
      # a new url needs to be processed 
      counter += 1 
      pool.apply_async(worker, (url, urls)) 

    pool.close() 
    pool.join() 

Всякий раз, когда URL извлекается из очереди, увеличить счетчик. Подумайте об этом как о счетчике «в настоящее время обработки URL». Когда из очереди выдается «нет-url», завершается «текущий обрабатывающий URL-адрес», поэтому уменьшайте счетчик. Пока счетчик больше 0, есть URL-адреса, которые еще не завершили обработку и еще не вернули «нет-url».

EDIT

Как я уже сказал в комментарии (поместить здесь для всех, кто читает его), при использовании multiprocessing.Pool, вместо того чтобы думать о нем, как отдельные процессы, то лучше думать о нем, как единственная конструкция, которая выполняет вашу функцию каждый раз, когда она получает данные (одновременно, когда это возможно). Это наиболее полезно для проблем с данными, в которых вы не отслеживаете или не заботитесь о отдельных рабочих процессах только обрабатываемыми данными.

+0

А, я понимаю, что вы делаете. Поэтому я думаю, что смогу приспособить это к моему делу. Для меня это не так, что я хочу знать, когда URL-адрес завершил обработку, но я хочу знать, когда рабочий не возвращает * никаких * URL-адресов. Следовательно, условием завершения является то, что * все * рабочие заканчивают свои задачи, а * ни один из них не возвращают URL-адреса. –

+0

Я думал об этом во-первых, но это сложнее, потому что с пулом работников каждый рабочий * процесс * выполняет '' 'worker''' снова и снова, сидя без дела между выполнением. Таким образом, у вас может быть ситуация, когда каждый рабочий процесс * заканчивает текущее выполнение '' 'worker''' и не получает URL-адресов, но в очереди все еще сохраняются URL-адреса, ожидающие перехода к следующему бездействующему работнику. Единственный способ узнать * наверняка *, что больше не нужно делать, это знать, что все полученные URL-адреса завершили обработку. – bj0

+0

Это помогает думать о пуле не как отдельных рабочих, а о едином объекте, который выполняет функцию снова и снова. Бассейны, как правило, используются в большем количестве проблем, связанных с данными, где вы не отслеживаете или не заботитесь о количестве рабочих процессов. – bj0

0

Вот как я решил проблему. Я первоначально пошел с проектом, опубликованным в this answer, но bj0 упомянул, что он злоупотреблял функцией инициализации. Поэтому я решил сделать это, используя apply_async, по модулю, подобному коду, опубликованному в моем вопросе.

Поскольку мои работники изменить очередь они читают URL, из (они добавляют к нему), я думал, что я мог бы просто запустить свой цикл следующим образом:

while not urls.empty(): 
    pool.apply_async(worker, (urls.get(), urls)) 

Я ожидал, что это будет работать, так как рабочие добавит в очередь, а apply_async будет ждать, если все рабочие будут заняты. Это не сработало, как я ожидал, и цикл закончился раньше. Проблема заключалась в том, что неясно, что apply_asyncне блокирует, если все рабочие заняты. Вместо этого он будет помещать поставленные задачи в очередь, а это значит, что urls со временем станет пустым и цикл завершится. Единственный раз, когда блокирует цикл, если очередь пуста, когда вы пытаетесь выполнить urls.get(). На этом этапе он будет ожидать появления большего количества элементов в очереди. Но мне все еще нужно было выяснить, как закончить цикл. Условие состоит в том, что цикл должен заканчиваться, когда ни один из рабочих не возвращает новые URL-адреса. Для этого я использую общий dict, который устанавливает значение, связанное с именем процесса, равным 0, если процесс не возвратил никаких URL-адресов, и 1 в противном случае. Я проверяю сумму ключей на каждой итерации цикла, и если это когда-либо 0, я знаю, что я сделан.

Основная структура заканчивал тем, как это:

def worker(url, url_queue, proc_user_urls_queue, proc_empty_urls_queue): 

    returned_urls = phantomjs(url) # calls phantomjs and waits for output 
    if len(returned_urls) > 0: 
     proc_empty_urls_queue.put(
      [multiprocessing.current_process().name, 1] 
     ) 
    else: 
     proc_empty_urls_queue.put(
      [multiprocessing.current_process().name, 0] 
     ) 

    for returned_url in returned_urls: 
     url_queue.put(returned_url) 

def empty_url_tallier(proc_empty_urls_queue, proc_empty_urls_dict): 
    while 1: 
     # This may not be necessary. I don't know if this worker is run 
     # by the same process every time. If not, it is possible that 
     # the worker was assigned the task of fetching URLs, and returned 
     # some. So let's make sure that we set its entry to zero anyway. 
     # If this worker is run by the same process every time, then this 
     # stuff is not necessary. 
     id = multiprocessing.current_process().name 
     proc_empty_urls_dict[id] = 0 

     proc_empty_urls = proc_empty_urls_queue.get() 
     if proc_empty_urls == "done": # poison pill 
      break 

     proc_id = proc_empty_urls[0] 
     proc_empty_url = proc_empty_urls[1] 
     proc_empty_urls_dict[proc_id] = proc_empty_url 

manager = Manager() 

urls = manager.Queue() 
proc_empty_urls_queue = manager.Queue() 
proc_empty_urls_dict = manager.dict() 

pool = Pool(33) 

pool.apply_async(writer, (proc_user_urls_queue,)) 
pool.apply_async(empty_url_tallier, (proc_empty_urls_queue, proc_empty_urls_dict)) 

# Run the first apply synchronously 
urls.put("<some-url>") 
pool.apply(worker, (urls.get(), urls, proc_empty_urls_queue)) 
while sum(proc_empty_urls_dict.values()) > 0: 
    pool.apply_async(worker, (urls.get(), urls, proc_empty_urls_queue)) 

proc_empty_urls_queue.put("done") # poison pill 
pool.close() 
pool.join() 
+0

Это интересный подход, но я считаю, что он восприимчив к состоянию гонки. Если работник без URL-адреса заканчивается, а другая работа (у которого previoulsy есть нет-url) обрабатывает phantomjs, цикл while может закончиться до того, как этот работник закончит (с потенциальными новыми URL-адресами). Кроме того, вы используете общий файл dict, зачем вам нужна вторая очередь? Вы должны иметь возможность передать диктору рабочим и позволить им использовать его напрямую. – bj0

+0

Самый простой способ устранить состояние гонки (и избавиться от dict) - подсчитать URL. Каждый раз, когда urls.get() дает вам URL-адрес, увеличивайте счетчик. Каждый раз, когда вы получаете нейтральное значение «нет-url», уменьшайте его. Когда счетчик достигнет 0, вы должны быть вне URL-адресов. Это, конечно, не предполагает круговых ссылок, которые могут вызвать проблемы ... – bj0

+0

@ bj0 Хорошая точка. Я решил, что будет какое-то состояние гонки. Я использовал общий dict, потому что, честно говоря, я все еще немного туманно о том, как многопроцессорность выполняется на python, и я не был уверен, что было бы нормально изменять свойства dict, подобные этому непосредственно на рабочем столе. –

Смежные вопросы