У меня очень длинная серия похожих файлов cvs (всего 14Gb). Мне нужно открыть каждый файл, заменить определенные символы и записать исправленную версию в новый файл. Я хочу использовать вычислительную мощность моего многоядерного компьютера. Я пробовал с mp.Pools и с mp.Process/mp.Queue. Версия бассейна работает, но подход очереди производит эту ошибку:Многопроцессорность Python: синтаксический анализ, редактирование и запись длинных серий csv-файлов
IOError: [Errno 22] invalid mode ('r') or filename: '<multiprocessing.queues.Queue object at 0x0000000002775A90>'
Это упрощенная версия моего бассейна кода:
import os
import pandas as pd
import multiprocessing as mp
def fixer(a_file):
lines = []
opened_file = open(a_file)
for each_line in opened_file:
lines.append(each_line.replace('mad', 'rational'))
opened_file.close()
df = pd.DataFrame(lines)
#some pandas magics here
df.to_csv(a_file[:-4] + '_fixed.csv')
if __name__ == "__main__":
my_path = os.getcwd()
my_files = list(os.walk(my_path))[0][2] #I just get the list of file names here
processors = mp.cpu_count()
pool = mp.Pool(processes = processors) # I set as many processes as processors my computer has.
pool.map(fixer, my_files)
И это один для подхода очереди:
import os
import pandas as pd
import multiprocessing as mp
def fixer(a_file):
lines = []
opened_file = open(a_file)
for each_line in opened_file:
lines.append(each_line.replace('mad', 'rational'))
opened_file.close()
df = pd.DataFrame(lines)
#some pandas magics here
df.to_csv(a_file[:-4] + '_fixed.csv')
if __name__ == "__main__":
my_path = os.getcwd()
my_files = list(os.walk(my_path))[0][2] #I just get the list of file names here
processors = mp.cpu_count()
queue = mp.Queue()
for each_file in my_files:
queue.put(each_file)
processes = [mp.Process(target = fixer, args=(queue,)) for core in range(processors)]
for process in processes:
process.start()
for process in processes:
process.join()
Буду признателен, если вы можете предоставить пример, чтобы версия Queue работала. На втором этапе обработки, прежде чем файлы будут записаны, мне нужны процессоры для получения промежуточного результата и выполнения некоторых вычислений. Именно по этой причине мне нужны очереди.
ОК, я нашел, что происходит. См. Ответ ниже. – Jaqo