3

Я новичок в модуле multiprocessing, поэтому, пожалуйста, со мной. После кода я использую для параллельной обработки данных в формате CSV:Измерение прогресса с многопроцессорной обработкой python Пул и функция отображения

#!/usr/bin/env python 

import csv 
from time import sleep 
from multiprocessing import Pool 
from multiprocessing import cpu_count 
from multiprocessing import current_process 
from pprint import pprint as pp 

def init_worker(x): 
    sleep(.5) 
    print "(%s,%s)" % (x[0],x[1]) 
    x.append(int(x[0])**2) 
    return x 

def parallel_csv_processing(inputFile, outputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1): 
    # OPEN FH FOR READING INPUT FILE 
    inputFH = open(inputFile, "rt") 
    csvReader = csv.reader(inputFH, delimiter=separator) 

    # SKIP HEADERS 
    for skip in xrange(skipRows): 
    csvReader.next() 

    # PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE 
    try: 
    p = Pool(processes = cpuCount) 
    results = p.map(init_worker, csvReader, chunksize = 10) 
    p.close() 
    p.join() 
    except KeyboardInterrupt: 
    p.close() 
    p.join() 
    p.terminate() 

    # CLOSE FH FOR READING INPUT 
    inputFH.close() 

    # OPEN FH FOR WRITING OUTPUT FILE 
    outputFH = open(outputFile, "wt") 
    csvWriter = csv.writer(outputFH, lineterminator='\n') 

    # WRITE HEADER TO OUTPUT FILE 
    csvWriter.writerow(header) 

    # WRITE RESULTS TO OUTPUT FILE 
    [csvWriter.writerow(row) for row in results] 

    # CLOSE FH FOR WRITING OUTPUT 
    outputFH.close() 

    print pp(results) 
    # print len(results) 

def main(): 
    inputFile = "input.csv" 
    outputFile = "output.csv" 
    parallel_csv_processing(inputFile, outputFile, cpuCount = cpu_count()) 

if __name__ == '__main__': 
    main() 

Я хотел бы, чтобы каким-то образом измерить прогресс скрипта (просто текст не любой фантазии ASCII искусство). Единственный вариант, который приходит мне на ум, - сравнить строки, которые были успешно обработаны init_worker ко всем строкам в input.csv, и распечатать фактическое состояние, например. каждую секунду, можете ли вы указать мне правильное решение? Я нашел несколько статей с похожими проблемами, но я не смог адаптировать их к моим потребностям, потому что ни один из них не использовал класс Pool и метод map. Я также хотел бы спросить о методах p.close(), p.join(), p.terminate(), я видел их в основном с Process не Pool класс, они необходимы с классом Pool и я использую их правильно? Использование p.terminate() предназначалось для уничтожения процесса с помощью ctrl + c, но это different рассказ, который еще не доволен. Спасибо.

PS: Мой input.csv выглядит следующим образом, если это имеет значение:

0,0 
1,3 
2,6 
3,9 
... 
... 
48,144 
49,147 

PPS: как я сказал, что я новичок в multiprocessing и код, который я поставил вместе просто работает. Единственный недостаток, который я вижу, - это то, что весь csv хранится в памяти, поэтому, если вы, ребята, лучше понимаете, не стесняйтесь поделиться им.

Редактировать

в ответ на @JFSebastian

Вот мой реальный код на основе ваших предложений:

#!/usr/bin/env python 

import csv 
from time import sleep 
from multiprocessing import Pool 
from multiprocessing import cpu_count 
from multiprocessing import current_process 
from pprint import pprint as pp 
from tqdm import tqdm 

def do_job(x): 
    sleep(.5) 
    # print "(%s,%s)" % (x[0],x[1]) 
    x.append(int(x[0])**2) 
    return x 

def parallel_csv_processing(inputFile, outputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1): 

    # OPEN FH FOR READING INPUT FILE 
    inputFH = open(inputFile, "rb") 
    csvReader = csv.reader(inputFH, delimiter=separator) 

    # SKIP HEADERS 
    for skip in xrange(skipRows): 
    csvReader.next() 

    # OPEN FH FOR WRITING OUTPUT FILE 
    outputFH = open(outputFile, "wt") 
    csvWriter = csv.writer(outputFH, lineterminator='\n') 

    # WRITE HEADER TO OUTPUT FILE 
    csvWriter.writerow(header) 

    # PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE 
    try: 
    p = Pool(processes = cpuCount) 
    # results = p.map(do_job, csvReader, chunksize = 10) 
    for result in tqdm(p.imap_unordered(do_job, csvReader, chunksize=10)): 
     csvWriter.writerow(result) 
    p.close() 
    p.join() 
    except KeyboardInterrupt: 
    p.close() 
    p.join() 

    # CLOSE FH FOR READING INPUT 
    inputFH.close() 

    # CLOSE FH FOR WRITING OUTPUT 
    outputFH.close() 

    print pp(result) 
    # print len(result) 

def main(): 
    inputFile = "input.csv" 
    outputFile = "output.csv" 
    parallel_csv_processing(inputFile, outputFile, cpuCount = cpu_count()) 

if __name__ == '__main__': 
    main() 

Вот выход tqdm:

1 [elapsed: 00:05, 0.20 iters/sec] 

что d oes этот выход означает? На странице вы упомянутый tqdm используются в цикле следующим образом:

>>> import time 
>>> from tqdm import tqdm 
>>> for i in tqdm(range(100)): 
...  time.sleep(1) 
... 
|###-------| 35/100 35% [elapsed: 00:35 left: 01:05, 1.00 iters/sec] 

Этот вывод имеет смысл, но что это значит мой выход? Также не кажется, что проблема ctrl + c исправлена: после нажатия ctrl + c скрипт выдает некоторый Traceback, если я снова нажму Ctrl + C, я получу новый Traceback и так далее. Единственный способ убить его посылает его на фоне (Ctr + г), а затем убить его (убить% 1)

+0

unrelated: используйте режим '' rb'' для файлов csv на Python 2. – jfs

+0

также, имя 'init_worker' вводит в заблуждение. 'init_worker' может выполняться несколько раз в том же рабочем процессе в вашем случае. – jfs

+0

несвязанный: бессмысленно называть 'p.terminate()' после 'p.join()'. – jfs

ответ

11

Чтобы показать прогресс, заменить pool.map с pool.imap_unordered:

from tqdm import tqdm # $ pip install tqdm 

for result in tqdm(pool.imap_unordered(init_worker, csvReader, chunksize=10)): 
    csvWriter.writerow(result) 

tqdm часть является необязательной см Text Progress Bar in the Console

нечаянно, он фиксирует ваш «весь CSV хранится в памяти» и «KeyboardInterrupt не поднимается» проблемы.

Вот полный пример кода:

#!/usr/bin/env python 
import itertools 
import logging 
import multiprocessing 
import time 

def compute(i): 
    time.sleep(.5) 
    return i**2 

if __name__ == "__main__": 
    logging.basicConfig(format="%(asctime)-15s %(levelname)s %(message)s", 
         datefmt="%F %T", level=logging.DEBUG) 
    pool = multiprocessing.Pool() 
    try: 
     for square in pool.imap_unordered(compute, itertools.count(), chunksize=10): 
      logging.debug(square) # report progress by printing the result 
    except KeyboardInterrupt: 
     logging.warning("got Ctrl+C") 
    finally: 
     pool.terminate() 
     pool.join() 

Вы должны увидеть результат в партиях каждые .5 * chunksize секунд.Если вы нажмете Ctrl+C; вы должны увидеть KeyboardInterrupt, поднятый в дочерних процессах и в основном процессе. В Python 3 основной процесс завершается немедленно. В Python 2 KeyboardInterrupt задерживается до следующей печати пакета (ошибка в Python).

+0

К сожалению, это не помогло, пожалуйста, проверьте отредактированный OP. Большое спасибо. –

+0

@WakanTanka: (1) если вы не понимаете вывод 'tqdm', просто отпустите его и распечатайте отчет о ходе на каждой итерации, как вам нравится. (2) используйте 'p.terminate()' в обработчике исключений. Поместите это * перед * 'join()' – jfs

+0

Привет @ J.F. Себастьян сожалеет о более позднем ответе. Я понимаю вывод tqdm, но проблема в том, что он сообщается в процессе не для задачи в целом. PS: Я поставил 'p.terminate()' между 'p.close()' и 'p.join()' в ветке 'except KeyboardInterrupt:', и я все равно получаю такое же поведение после нажатия 'ctrl + c'. Можете ли вы отправить весь код, который работает, поэтому я могу принять ответ. Спасибо. –

Смежные вопросы