2015-06-02 2 views
1

Я пытаюсь записать определенные файлы после редактирования, используя код многопроцессорности python (2.7). Он работает как очарование для небольшого числа (< 20). но когда я пытаюсь использовать больше файлов (20+), это становится неистовым. Я использую Python 2.7.5 на CentOS 6.5 с 4-х ядерным процессором.Запись ошибок в многопроцессорной обработке Python

import sys, os 
import multiprocessing 

import glob 
list_files = glob.glob("Protein/*.txt") 

def Some_func(some_file): 
    with open(some_file) as some: 
     with open(file_output) as output: 
      for lines in Some: 
       #Do Something 
       #edited_lines = func(lines) 
       output.write(edited_lines) 


pool = multiprocessing.Pool(10) # Desired number of threads = 10 
pool.map(Some_func, list_files,) 
pool.close() 
pool.join() 

Окончательные письменными файлов накладываются друг на друга.

File 1 
Lines 1 .. File 1 
Lines 2 .. File 1 
Lines 3 .. File 1 
Lines 4 .. File 1 
Lines 5 .. File 1 
Lines 6 .. File 1 
Lines 7 .. File 1 
Lines 8 .. File 1 
Lines 9 .. File 1 

File 2 
Lines 1 .. File 2 
Lines 2 .. File 2 
Lines 3 .. File 2 
Lines 4 .. File 2 
Lines 5 .. File 2 
Lines 6 .. File 2 
Lines 7 .. File 2 
Lines 8 .. File 2 
Lines 9 .. File 2 



Output: 

Lines 1 .. File 1 
Lines 2 .. File 1 
Lines 3 .. File 1 Lines 1 .. File 2 
Lines 4 .. File 1 
Lines 5 .. File 1Lines 2 .. File 2 
Lines 3 .. File 2 
Lines 4 .. File 2 
Lines 6 .. File 1 

ответ

1

Проблема заключается в том, что вы пытаетесь записать в файл из многих процессов параллельно, которые не синхронизированы. Это означает, что возможно, что разные процессы будут пытаться писать одновременно, что приведет к странностям, которые вы видите.

Вы можете решить эту проблему либо с помощью одного процесса записи, причем каждый рабочий отправляет строки для записи в этот единственный процесс или путем синхронизации записей, выполняемых каждым процессом, с использованием multiprocessing.Lock.

Использование одного писателя:

import glob 
import multiprocessing 
from functools import partial 
from threading import Thread 

list_files = glob.glob("Protein/*.txt") 

def Some_func(out_q, some_file): 
    with open(some_file) as some: 
     for lines in Some: 
      #Do Something 
      #edited_lines = func(lines) 

      out_q.put(edited_lines) 

def write_lines(q): 
    with open(file_output) as output: 
     for line in iter(q.get, None): # This will end when None is received 
      output.write(line) 

pool = multiprocessing.Pool(10) # Desired number of threads = 10 
m = multiprocessing.Manager() 
q = m.Queue() 
t = Thread(target=write_lines, args=(q,)) 
t.start() 
pool.map(partial(Some_func, q), list_files) 
pool.close() 
pool.join() 
q.put(None) # Shut down the writer thread 
t.join() 

Использование multiprocessing.Lock:

import glob 
import multiprocessing 
from functools import partial 

list_files = glob.glob("Protein/*.txt") 

def Some_func(lock, some_file): 
    with open(some_file) as some: 
     with open(file_output) as output: 
      for lines in Some: 
       #Do Something 
       #edited_lines = func(lines) 
       with lock: 
        output.write(edited_lines) 


pool = multiprocessing.Pool(10) # Desired number of threads = 10 
m = multiprocessing.Manager() 
lock = m.Lock() 
pool.map(partial(Some_func, lock), list_files) 
pool.close() 
pool.join() 

Мы должны использовать Manager для создания общих объектов, потому что вы передаете их в Pool, который требует травля их. Нормальные multiprocessing.Lock/multiprocessing.Queue объекты могут быть переданы только конструктору multiprocessing.Process и будут вызывать исключение при передаче методу Pool, например map.

Смежные вопросы