Я новичок в модуле multiprocessing
, поэтому, пожалуйста, со мной. После кода я использую для параллельной обработки данных в формате CSV:Измерение прогресса с многопроцессорной обработкой python Пул и функция отображения
#!/usr/bin/env python
import csv
from time import sleep
from multiprocessing import Pool
from multiprocessing import cpu_count
from multiprocessing import current_process
from pprint import pprint as pp
def init_worker(x):
sleep(.5)
print "(%s,%s)" % (x[0],x[1])
x.append(int(x[0])**2)
return x
def parallel_csv_processing(inputFile, outputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):
# OPEN FH FOR READING INPUT FILE
inputFH = open(inputFile, "rt")
csvReader = csv.reader(inputFH, delimiter=separator)
# SKIP HEADERS
for skip in xrange(skipRows):
csvReader.next()
# PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE
try:
p = Pool(processes = cpuCount)
results = p.map(init_worker, csvReader, chunksize = 10)
p.close()
p.join()
except KeyboardInterrupt:
p.close()
p.join()
p.terminate()
# CLOSE FH FOR READING INPUT
inputFH.close()
# OPEN FH FOR WRITING OUTPUT FILE
outputFH = open(outputFile, "wt")
csvWriter = csv.writer(outputFH, lineterminator='\n')
# WRITE HEADER TO OUTPUT FILE
csvWriter.writerow(header)
# WRITE RESULTS TO OUTPUT FILE
[csvWriter.writerow(row) for row in results]
# CLOSE FH FOR WRITING OUTPUT
outputFH.close()
print pp(results)
# print len(results)
def main():
inputFile = "input.csv"
outputFile = "output.csv"
parallel_csv_processing(inputFile, outputFile, cpuCount = cpu_count())
if __name__ == '__main__':
main()
Я хотел бы, чтобы каким-то образом измерить прогресс скрипта (просто текст не любой фантазии ASCII искусство). Единственный вариант, который приходит мне на ум, - сравнить строки, которые были успешно обработаны init_worker
ко всем строкам в input.csv, и распечатать фактическое состояние, например. каждую секунду, можете ли вы указать мне правильное решение? Я нашел несколько статей с похожими проблемами, но я не смог адаптировать их к моим потребностям, потому что ни один из них не использовал класс Pool
и метод map
. Я также хотел бы спросить о методах p.close(), p.join(), p.terminate()
, я видел их в основном с Process
не Pool
класс, они необходимы с классом Pool
и я использую их правильно? Использование p.terminate()
предназначалось для уничтожения процесса с помощью ctrl + c, но это different рассказ, который еще не доволен. Спасибо.
PS: Мой input.csv выглядит следующим образом, если это имеет значение:
0,0
1,3
2,6
3,9
...
...
48,144
49,147
PPS: как я сказал, что я новичок в multiprocessing
и код, который я поставил вместе просто работает. Единственный недостаток, который я вижу, - это то, что весь csv хранится в памяти, поэтому, если вы, ребята, лучше понимаете, не стесняйтесь поделиться им.
Редактировать
в ответ на @JFSebastian
Вот мой реальный код на основе ваших предложений:
#!/usr/bin/env python
import csv
from time import sleep
from multiprocessing import Pool
from multiprocessing import cpu_count
from multiprocessing import current_process
from pprint import pprint as pp
from tqdm import tqdm
def do_job(x):
sleep(.5)
# print "(%s,%s)" % (x[0],x[1])
x.append(int(x[0])**2)
return x
def parallel_csv_processing(inputFile, outputFile, header=["Default", "header", "please", "change"], separator=",", skipRows = 0, cpuCount = 1):
# OPEN FH FOR READING INPUT FILE
inputFH = open(inputFile, "rb")
csvReader = csv.reader(inputFH, delimiter=separator)
# SKIP HEADERS
for skip in xrange(skipRows):
csvReader.next()
# OPEN FH FOR WRITING OUTPUT FILE
outputFH = open(outputFile, "wt")
csvWriter = csv.writer(outputFH, lineterminator='\n')
# WRITE HEADER TO OUTPUT FILE
csvWriter.writerow(header)
# PARALLELIZE COMPUTING INTENSIVE OPERATIONS - CALL FUNCTION HERE
try:
p = Pool(processes = cpuCount)
# results = p.map(do_job, csvReader, chunksize = 10)
for result in tqdm(p.imap_unordered(do_job, csvReader, chunksize=10)):
csvWriter.writerow(result)
p.close()
p.join()
except KeyboardInterrupt:
p.close()
p.join()
# CLOSE FH FOR READING INPUT
inputFH.close()
# CLOSE FH FOR WRITING OUTPUT
outputFH.close()
print pp(result)
# print len(result)
def main():
inputFile = "input.csv"
outputFile = "output.csv"
parallel_csv_processing(inputFile, outputFile, cpuCount = cpu_count())
if __name__ == '__main__':
main()
Вот выход tqdm
:
1 [elapsed: 00:05, 0.20 iters/sec]
что d oes этот выход означает? На странице вы упомянутый tqdm
используются в цикле следующим образом:
>>> import time
>>> from tqdm import tqdm
>>> for i in tqdm(range(100)):
... time.sleep(1)
...
|###-------| 35/100 35% [elapsed: 00:35 left: 01:05, 1.00 iters/sec]
Этот вывод имеет смысл, но что это значит мой выход? Также не кажется, что проблема ctrl + c исправлена: после нажатия ctrl + c скрипт выдает некоторый Traceback, если я снова нажму Ctrl + C, я получу новый Traceback и так далее. Единственный способ убить его посылает его на фоне (Ctr + г), а затем убить его (убить% 1)
unrelated: используйте режим '' rb'' для файлов csv на Python 2. – jfs
также, имя 'init_worker' вводит в заблуждение. 'init_worker' может выполняться несколько раз в том же рабочем процессе в вашем случае. – jfs
несвязанный: бессмысленно называть 'p.terminate()' после 'p.join()'. – jfs