2016-05-15 3 views
1

Я пытаюсь реализовать базовую многопроцессорную обработку, и я столкнулся с проблемой. Ниже приведен скрипт python.Multiprocessing Queue.get() зависает

import time, sys, random, threading 
from multiprocessing import Process 
from Queue import Queue 
from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency 

append_queue = Queue(10) 
database = FrequencyStore() 

def add_to_append_queue(_list): 
    append_queue.put(_list) 

def process_append_queue(): 
    while True: 
     item = append_queue.get() 
     database.append(item) 
     print("Appended to database in %.4f seconds" % database.append_time) 
     append_queue.task_done() 
    return 

def main(): 
    database.load_db() 
    print("Database loaded in %.4f seconds" % database.load_time) 
    append_queue_process = Process(target=process_append_queue) 
    append_queue_process.daemon = True 
    append_queue_process.start() 
    #t = threading.Thread(target=process_append_queue) 
    #t.daemon = True 
    #t.start() 

    while True: 
     path = raw_input("file: ") 
     if path == "exit": 
      break 
     a = AnalyzeFrequency(path) 
     a.analyze() 
     print("Analyzed file in %.4f seconds" % a._time) 
     add_to_append_queue(a.get_results()) 

    append_queue.join() 
    #append_queue_process.join() 
    database.save_db() 
    print("Database saved in %.4f seconds" % database.save_time) 
    sys.exit(0) 

if __name__=="__main__": 
    main() 

AnalyzeFrequency анализ частоты слов в файле и get_results() возвращает отсортированный список указанных слов и частот. Список очень большой, возможно, 10000 предметов.

Этот список затем передается методу add_to_append_queue, который добавляет его в очередь. Process_append_queue берет элементы один за другим и добавляет частоты в «базу данных». Эта операция занимает немного больше времени, чем фактический анализ в main(), поэтому я пытаюсь использовать отдельный процесс для этого метода. Когда я пытаюсь сделать это с помощью модуля потоковой передачи, все работает отлично, никаких ошибок. Когда я пытаюсь использовать Process, скрипт зависает на item = append_queue.get().

Не могли бы вы объяснить, что здесь происходит, и, возможно, направить меня к исправлению?

Все ответы оценены!

UPDATE

Ошибка рассол была моя вина, это просто опечатка. Теперь я использую класс Queue в многопроцессорности, но метод append_queue.get() все еще зависает. НОВЫЙ КОД

import time, sys, random 
from multiprocessing import Process, Queue 
from FrequencyAnalysis import FrequencyStore, AnalyzeFrequency 

append_queue = Queue() 
database = FrequencyStore() 

def add_to_append_queue(_list): 
    append_queue.put(_list) 

def process_append_queue(): 
    while True: 
     database.append(append_queue.get()) 
     print("Appended to database in %.4f seconds" % database.append_time) 
    return 

def main(): 
    database.load_db() 
    print("Database loaded in %.4f seconds" % database.load_time) 
    append_queue_process = Process(target=process_append_queue) 
    append_queue_process.daemon = True 
    append_queue_process.start() 
    #t = threading.Thread(target=process_append_queue) 
    #t.daemon = True 
    #t.start() 

    while True: 
     path = raw_input("file: ") 
     if path == "exit": 
      break 
     a = AnalyzeFrequency(path) 
     a.analyze() 
     print("Analyzed file in %.4f seconds" % a._time) 
     add_to_append_queue(a.get_results()) 

    #append_queue.join() 
    #append_queue_process.join() 
    print str(append_queue.qsize()) 
    database.save_db() 
    print("Database saved in %.4f seconds" % database.save_time) 
    sys.exit(0) 

if __name__=="__main__": 
    main() 

UPDATE 2

Это код базы данных:

class FrequencyStore: 

    def __init__(self): 
     self.sorter = Sorter() 
     self.db = {} 
     self.load_time = -1 
     self.save_time = -1 
     self.append_time = -1 
     self.sort_time = -1 

    def load_db(self): 
     start_time = time.time() 

     try: 
      file = open("results.txt", 'r') 
     except: 
      raise IOError 

     self.db = {} 
     for line in file: 
      word, count = line.strip("\n").split("=") 
      self.db[word] = int(count) 
     file.close() 

     self.load_time = time.time() - start_time 

    def save_db(self): 
     start_time = time.time() 

     _db = [] 
     for key in self.db: 
      _db.append([key, self.db[key]]) 
     _db = self.sort(_db) 

     try: 
      file = open("results.txt", 'w') 
     except: 
      raise IOError 

     file.truncate(0) 
     for x in _db: 
      file.write(x[0] + "=" + str(x[1]) + "\n") 
     file.close() 

     self.save_time = time.time() - start_time 

    def create_sorted_db(self): 
     _temp_db = [] 
     for key in self.db: 
      _temp_db.append([key, self.db[key]]) 
     _temp_db = self.sort(_temp_db) 
     _temp_db.reverse() 
     return _temp_db 

    def get_db(self): 
     return self.db 

    def sort(self, _list): 
     start_time = time.time() 

     _list = self.sorter.mergesort(_list) 
     _list.reverse() 

     self.sort_time = time.time() - start_time 
     return _list 

    def append(self, _list): 
     start_time = time.time() 

     for x in _list: 
      if x[0] not in self.db: 
       self.db[x[0]] = x[1] 
      else: 
       self.db[x[0]] += x[1] 

     self.append_time = time.time() - start_time 
+1

«Queue.Queue» не работает через процессы. Поэтому первое изменение заключается в использовании «multiprocessing.Queue». –

ответ

2

Комментарии предложить вы пытаетесь запустить это на Windows. Как я уже сказал в комментарии,

Если вы используете это на Windows, она не может работать - Windows не имеют fork(), поэтому каждый процесс получает свою собственную очередь, и они не имеют ничего общего друг с другом. Весь модуль импортируется «с нуля» на каждый процесс в Windows. Вам нужно создать очередь в main(), и передать ее в качестве аргумента функции работника.

Вот что вы должны сделать, чтобы сделать его переносным, хотя я удалил всю базу данных, потому что это не имеет отношения к проблемам, которые вы описали до сих пор.Я также удалил daemon пустячный, потому что обычно только ленивый способ избежать выключая вещи чисто, и часто не вернется, чтобы укусить вас позже:

def process_append_queue(append_queue): 
    while True: 
     x = append_queue.get() 
     if x is None: 
      break 
     print("processed %d" % x) 
    print("worker done") 

def main(): 
    import multiprocessing as mp 

    append_queue = mp.Queue(10) 
    append_queue_process = mp.Process(target=process_append_queue, args=(append_queue,)) 
    append_queue_process.start() 
    for i in range(100): 
     append_queue.put(i) 
    append_queue.put(None) # tell worker we're done 
    append_queue_process.join() 

if __name__=="__main__": 
    main() 

Выход «очевидный» материал :

processed 0 
processed 1 
processed 2 
processed 3 
processed 4 
... 
processed 96 
processed 97 
processed 98 
processed 99 
worker done 

Примечание: Поскольку Windows не (не может) fork(), это невозможно для рабочих процессов наследуют любой объект Python на Windows. Каждый процесс запускает всю программу с самого начала. Вот почему ваша оригинальная программа не могла работать: каждый процесс создал свой собственный Queue, полностью не связанный с Queue в другом процессе. В приведенном выше подходе только основной процесс создает Queue, а основной процесс передает его (как аргумент) рабочему процессу.

+0

Код работает! Еще одна вещь. Если вы посмотрите на мою FrequencyAnalysis.py, вы увидите класс FrequencyStorage. Если я вызову метод append из другого процесса, не будет ли он обновлять переменную экземпляра этого класса. Я передаю объект как параметр, как вы делали с очередью. – skyguy126

+0

Это был бы совсем другой вопрос, поэтому сделайте тестовый пример как можно меньшим и опубликуйте другой вопрос. В общем, вы не должны ожидать, что какая-либо мутация любого объекта будет видимой в разных процессах. Память не используется. «Многопроцессорность.Queue» работает, потому что она реализована с нуля до мутаций _make_, видимых во всех процессах, - и это не происходит по волшебству (это происходит из-за кросс-межпроцессных каналов, сообщающих информацию о мутациях между процессами, охраняемых межпроцессом семафоры, защищающие от одновременных мутаций). –

+0

Сделаю, спасибо за вашу помощь. – skyguy126

2

queue.Queue потокобезопасно, но не работает в различных процессах. Это довольно легко исправить. Вместо того, чтобы:

from multiprocessing import Process 
from Queue import Queue 

Вы хотите:

from multiprocessing import Process, Queue 
+1

Я слепой читатель экрана и не могу прочитать трассировку в изображении, извините. Это также не помогает людям, которые Google для текста в вашей трассировке позже. Предпочтительно, обновите вопрос, опубликуйте трассировку в комментарии или создайте новую. –

+0

Я опубликовал обновление в исходном вопросе. – skyguy126

+0

Ваша база данных не является безопасной для процесса. Вы создаете экземпляр базы данных в основном процессе Python, а затем сообщаете Python что-то делать с ним в совершенно другом. Таким образом, вызов 'database.append' висит, а не вызов' Queue.get'. Весь смысл использования очереди - избежать этой точной проблемы. –

Смежные вопросы