2015-11-01 3 views
2

Я сжимаю файлы. Для некоторых из них один процесс хорош, но я сжимаю тысячи из них, и это может (и было) занято несколько дней, поэтому я бы хотел ускорить его многопроцессорность. У меня есть read, что я должен избегать одновременного чтения нескольких файлов, и я предполагаю, что я не должен одновременно писать несколько процессов. Это мой текущий метод, который выполняется отдельно:Чтение, сжатие, запись с многопроцессорной обработкой

import tarfile, bz2, os 
def compress(folder): 
    "compresses a folder into a file" 

    bz_file = bz2.BZ2File(folder+'.tbz', 'w') 

    with tarfile.open(mode='w', fileobj = bz_file) as tar: 

     for fn in os.listdir(folder): 

      read each file in the folder and do some pre processing 
      that will make the compressed file much smaller than without 

      tar.addfile(processed file) 

    bz_file.close() 
    return 

Это занимает папку и сжимает все ее содержимое в один файл. Это упрощает их обработку и более организованность. Если бы я просто бросил это в пул, то у меня было бы несколько процессов чтения и записи сразу, поэтому я хочу этого избежать. Я могу переделать его так, только один процесс читает файлы, но я до сих пор есть несколько из них писать:

import multiprocessing as mp 
import tarfile, bz2, os 

def compress(file_list): 
    folder = file_list[0] 
    bz_file = bz2.BZ2File(folder+'.tbz', 'w') 

    with tarfile.open(mode='w', fileobj = bz_file) as tar: 

     for i in file_list[1:]: 
      preprocess file data 
      tar.addfile(processed data) 

    bz_file.close() 
    return 

cpu_count = mp.cpu_count() 
p = mp.Pool(cpu_count) 

for subfolder in os.listdir(main_folder): 

    read all files in subfolder into memory, place into file_list 
    place file_list into fld_list until fld_list contains cpu_count 
    file lists. then pass to p.map(compress, fld_list) 

Это все еще имеет ряд процессов записи сжатых файлов одновременно. Просто рассказ tarfile о том, какое сжатие использовать начинает писать на жесткий диск. Я не могу прочитать все файлы, которые мне нужно сжать в память, так как у меня нет такого количества оперативной памяти, поэтому я также неоднократно перезапускаю Pool.map.

Как я могу читать и записывать файлы в одном процессе, но все они имеют сжатие в нескольких процессах, избегая при этом перезапуска многопроцессорной обработки. Несколько раз?

+0

Вы собираетесь должны смотреть на то, что 'pbzip2' делает и имитировать его. –

+0

использование очереди с многопроцессорной или многопотоковой обработкой. во-первых, один процесс считывает все файлы и помещает их в очередь 1. во-вторых, многопроцессорные процессы получают файлы из очереди 1 и сжимают, а затем помещают результат в очередь 2. наконец, один процесс переходит из очереди 2 и записывает. –

ответ

3

Вместо того, чтобы использовать multiprocessing.Pool, следует использовать multiprocessing.Queue и создать почтовый ящик и почтовый ящик.

Запустите один процесс для чтения в файлах и поместите данные в очередь входящих сообщений и установите ограничение на размер очереди, чтобы не заполнить свою RAM. Пример здесь сжимает отдельные файлы, но его можно настроить для одновременной обработки целых папок.

def reader(inbox, input_path, num_procs): 
    "process that reads in files to be compressed and puts to inbox" 

    for fn in os.listdir(input_path): 
     path = os.path.join(input_path, fn) 

     # read in each file, put data into inbox 
     fname = os.path.basename(fn) 
     with open(fn, 'r') as src: lines = src.readlines() 

     data = [fname, lines] 
     inbox.put(data) 

    # read in everything, add finished notice for all running processes 
    for i in range(num_procs): 
     inbox.put(None) # when a compressor sees a None, it will stop 
    inbox.close() 
    return 

Но это только половина вопроса, а другая часть - сжатие файла без необходимости его записи на диск. Мы предоставляем объект StringIO функции сжатия вместо открытого файла; он передается tarfile. После сжатия мы помещаем объект StringIO в очередь исходящих сообщений.

За исключением того, что мы не можем этого сделать, потому что объекты StringIO не могут быть маринованными, только объекты pickleable могут попасть в очередь. Однако функция getvalue StringIO может предоставить содержимое в выбираемом формате, поэтому захватите содержимое с помощью getvalue, закройте объект StringIO и затем поместите содержимое в исходный код.

from io import StringIO 
import tarfile 

def compressHandler(inbox, outbox): 
    "process that pulls from inbox, compresses and puts to outbox" 
    supplier = iter(inbox.get, None) # stops when gets a None 
    while True: 
     try: 
      data = next(supplier) # grab data from inbox 
      pressed = compress(data) # compress it 
      ou_que.put(pressed) # put into outbox 
     except StopIteration: 
      outbox.put(None) # finished compressing, inform the writer 
      return # and quit 

def compress(data): 
    "compress file" 
    bz_file = StringIO() 

    fname, lines = dat # see reader def for package order 

    with tarfile.open(mode='w:bz2', fileobj=bz_file) as tar: 

     info = tarfile.TarInfo(fname) # store file name 
     tar.addfile(info, StringIO(''.join(lines))) # compress 

    data = bz_file.getvalue() 
    bz_file.close() 
    return data 

Процесс записи затем извлекает содержимое из очереди исходящих сообщений и записывает их на диск. Эта функция должна знать, сколько процессов сжатия было начато, поэтому он знает только, чтобы остановить, когда он услышал, что каждый процесс остановлен.

def writer(outbox, output_path, num_procs): 
    "single process that writes compressed files to disk" 
    num_fin = 0 

    while True: 
     # all compression processes have finished 
     if num_finished >= num_procs: break 

     tardata = outbox.get() 

     # a compression process has finished 
     if tardata == None: 
      num_fin += 1 
      continue 

     fn, data = tardata 
     name = os.path.join(output_path, fn) + '.tbz' 

     with open(name, 'wb') as dst: dst.write(data) 
    return 

Наконец, есть настройки, чтобы поместить их всех вместе

import multiprocessing as mp 
import os 

def setup(): 
    fld = 'file/path' 

    # multiprocess setup 
    num_procs = mp.cpu_count() 

    # inbox and outbox queues 
    inbox = mp.Queue(4*num_procs) # limit size 
    outbox = mp.Queue() 

    # one process to read 
    reader = mp.Process(target = reader, args = (inbox, fld, num_procs)) 
    reader.start() 

    # n processes to compress 
    compressors = [mp.Process(target = compressHandler, args = (inbox, outbox)) 
        for i in range(num_procs)] 
    for c in compressors: c.start() 

    # one process to write 
    writer = mp.Process(target = writer, args=(outbox, fld, num_procs)) 
    writer.start() 
    writer.join() # wait for it to finish 
    print('done!')