2015-03-17 3 views
1

У меня очень длинная серия похожих файлов cvs (всего 14Gb). Мне нужно открыть каждый файл, заменить определенные символы и записать исправленную версию в новый файл. Я хочу использовать вычислительную мощность моего многоядерного компьютера. Я пробовал с mp.Pools и с mp.Process/mp.Queue. Версия бассейна работает, но подход очереди производит эту ошибку:Многопроцессорность Python: синтаксический анализ, редактирование и запись длинных серий csv-файлов

IOError: [Errno 22] invalid mode ('r') or filename: '<multiprocessing.queues.Queue object at 0x0000000002775A90>'

Это упрощенная версия моего бассейна кода:

import os 
import pandas as pd 
import multiprocessing as mp 
def fixer(a_file): 
    lines = [] 
    opened_file = open(a_file) 
    for each_line in opened_file: 
     lines.append(each_line.replace('mad', 'rational')) 
    opened_file.close() 
    df = pd.DataFrame(lines) 
    #some pandas magics here 
    df.to_csv(a_file[:-4] + '_fixed.csv') 
if __name__ == "__main__": 
    my_path = os.getcwd() 
    my_files = list(os.walk(my_path))[0][2] #I just get the list of file names here 
    processors = mp.cpu_count() 
    pool = mp.Pool(processes = processors) # I set as many processes as processors my computer has. 
    pool.map(fixer, my_files) 

И это один для подхода очереди:

import os 
import pandas as pd 
import multiprocessing as mp 
def fixer(a_file): 
    lines = [] 
    opened_file = open(a_file) 
    for each_line in opened_file: 
     lines.append(each_line.replace('mad', 'rational')) 
    opened_file.close() 
    df = pd.DataFrame(lines) 
    #some pandas magics here 
    df.to_csv(a_file[:-4] + '_fixed.csv') 
if __name__ == "__main__": 
    my_path = os.getcwd() 
    my_files = list(os.walk(my_path))[0][2] #I just get the list of file names here 
    processors = mp.cpu_count() 
    queue = mp.Queue() 
    for each_file in my_files: 
     queue.put(each_file) 
    processes = [mp.Process(target = fixer, args=(queue,)) for core in range(processors)] 
    for process in processes: 
     process.start() 
    for process in processes: 
     process.join() 

Буду признателен, если вы можете предоставить пример, чтобы версия Queue работала. На втором этапе обработки, прежде чем файлы будут записаны, мне нужны процессоры для получения промежуточного результата и выполнения некоторых вычислений. Именно по этой причине мне нужны очереди.

+0

ОК, я нашел, что происходит. См. Ответ ниже. – Jaqo

ответ

0

Проблема в сценарии Queue заключается в том, что я не получил следующий элемент в очереди, но передал всю очередь в функцию фиксатора. Эта проблема решается путем присвоения значения queue.get() переменной в функции фиксатора:

import os 
import pandas as pd 
import multiprocessing as mp 
def fixer(a_queue): 
    a_file = a_queue.get() 
    lines = [] 
    opened_file = open(a_file) 
    for each_line in opened_file: 
     lines.append(each_line.replace('mad', 'rational')) 
    opened_file.close() 
    df = pd.DataFrame(lines) 
    #some pandas magics here 
    df.to_csv(a_file[:-4] + '_fixed.csv') 
if __name__ == "__main__": 
    my_path = os.getcwd() 
    my_files = list(os.walk(my_path))[0][2] #I just get the list of file names here 
    processors = mp.cpu_count() 
    queue = mp.Queue() 
    for each_file in my_files: 
     queue.put(each_file) 
    processes = [mp.Process(target = fixer, args=(queue,)) for core in range(processors)] 
    for process in processes: 
     process.start() 
    for process in processes: 
     process.join() 
+0

Может кто-нибудь объяснить, почему вам нужно включить ',' после передачи 'Queue' при определении' args' 'mp.Process'? – Jaqo

+0

, потому что параметр 'args' должен принимать кортеж в качестве аргумента, а когда кортеж в python имеет только один элемент, он должен иметь' 'после этого элемента. Дополнительная информация в разделе 5.3 [структуры данных python] (https://docs.python.org/2/tutorial/datastructures.html). –

Смежные вопросы