Я хочу запустить параллельное вычисление на некоторые входные данные, которые загружаются из файла. (Файл может быть действительно большим, поэтому я использую генератор для этого.)многопроцессорность - чтение больших входных данных - зависание программы
На определенное количество элементов мой код работает нормально, но над этим порогом зависает программа (некоторые из рабочих процессов не заканчиваются).
Любые предложения? (Я запускаю это с помощью python2.7, 8 процессоров, 5000 строк все еще в порядке, 7 500 не работает.)
Во-первых, вам нужен входной файл. Сформировать его в Баш:
for i in {0..10000}; do echo -e "$i"'\r' >> counter.txt; done
Затем запустите это:
python2.7 main.py 100 counter.txt > run_log.txt
main.py: не
#!/usr/bin/python2.7
import os, sys, signal, time
import Queue
import multiprocessing as mp
def eat_queue(job_queue, result_queue):
"""Eats input queue, feeds output queue
"""
proc_name = mp.current_process().name
while True:
try:
job = job_queue.get(block=False)
if job == None:
print(proc_name + " DONE")
return
result_queue.put(execute(job))
except Queue.Empty:
pass
def execute(x):
"""Does the computation on the input data
"""
return x*x
def save_result(result):
"""Saves results in a list
"""
result_list.append(result)
def load(ifilename):
"""Generator reading the input file and
yielding it row by row
"""
ifile = open(ifilename, "r")
for line in ifile:
line = line.strip()
num = int(line)
yield (num)
ifile.close()
print("file closed".upper())
def put_tasks(job_queue, ifilename):
"""Feeds the job queue
"""
for item in load(ifilename):
job_queue.put(item)
for _ in range(get_max_workers()):
job_queue.put(None)
def get_max_workers():
"""Returns optimal number of processes to run
"""
max_workers = mp.cpu_count() - 2
if max_workers < 1:
return 1
return max_workers
def run(workers_num, ifilename):
job_queue = mp.Queue()
result_queue = mp.Queue()
# decide how many processes are to be created
max_workers = get_max_workers()
print "processes available: %d" % max_workers
if workers_num < 1 or workers_num > max_workers:
workers_num = max_workers
workers_list = []
# a process for feeding job queue with the input file
task_gen = mp.Process(target=put_tasks, name="task_gen",
args=(job_queue, ifilename))
workers_list.append(task_gen)
for i in range(workers_num):
tmp = mp.Process(target=eat_queue, name="w%d" % (i+1),
args=(job_queue, result_queue))
workers_list.append(tmp)
for worker in workers_list:
worker.start()
for worker in workers_list:
worker.join()
print "worker %s finished!" % worker.name
if __name__ == '__main__':
result_list = []
args = sys.argv
workers_num = int(args[1])
ifilename = args[2]
run(workers_num, ifilename)
Спасибо! Теперь программа работает правильно. Спасибо за все предложения. (Я действительно прочитал эту часть документации во время работы с вводом и не понял этого в случае очереди вывода.) Почему я использовал 'blocks = false': я просто скомпилировал схему кода из различных фрагментов кода , Но я не уверен, что он на самом деле делает - я запутался (даже в документах в этом случае): если это «ложь», то ребенок, который должен прочитать ввод, не ждет следующего пункта? (Если так, что происходит?) – galapah
'block = False' означает, что вызов немедленно возвращается, независимо от того, было ли что-либо в очереди. В общем случае игнорируйте аргументы «block» и «timeout». Все эти вещи прекрасно используются без них для подавляющего большинства задач, и использование их * может привести вас к различным неприятностям. Требуется много опыта, чтобы знать, когда вам нужно * неблокирующий или тайм-аут, и даже тогда большинство людей этого не делают ;-) –