2014-09-10 3 views
1

Мой вопрос, надеюсь, достаточно конкретный, чтобы не относиться ни к одному из других, которые я читал. Я хочу использовать подпроцесс и многопроцессорную обработку, чтобы периодически генерировать множество заданий и возвращать код возврата ко мне. Проблема в том, что я не хочу ждать(), поэтому я могу создавать задания сразу, но я хочу знать, когда она закончится, чтобы я мог получить код возврата. У меня есть эта странная проблема, где, если я опросу() процесса, она не будет работать. Он просто зависает в мониторе активности без запуска (я нахожусь на Mac). Я думал, что могу использовать поток наблюдателей, но я вишу на q_out.get(), что заставляет меня поверить, что, возможно, я заполняю буфер и тупик. Я не уверен, как обойти это. В основном это мой код. Если у кого-нибудь есть лучшие идеи о том, как это сделать, я был бы счастлив полностью изменить свой подход.Код возврата подпроцесса Python без ожидания

def watchJob(p1,out_q): 
    while p1.poll() == None: 
     pass 
    print "Job is done" 
    out_q.put(p1.returncode) 

def runJob(out_q): 
    LOGFILE = open('job_to_run.log','w') 
    p1 = Popen(['../../bin/jobexe','job_to_run'], stdout = LOGFILE) 
    t = threading.Thread(target=watchJob, args=(p1,out_q)) 
    t.start() 

out_q= Queue() 
outlst=[] 
for i in range(len(nprocs)): 
    proc = Process(target=runJob, args=(out_q,)) 
    proc.start() 
    outlst.append(out_q.get()) # This hangs indefinitely 
    proc.join() 
+0

Какая-либо конкретная причина иметь как многопоточность, так и многопроцессорность? – 2014-09-10 19:15:32

+0

Почему вы трахаете задание «stdout», если у вас нет намерения читать содержимое? в зависимости от того, производят ли задания много вывода или нет, задания могут просто блокироваться при записи в стандартный вывод. – isedev

+0

Я бы посмотрел на [Многопроцессорные пулы] (https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.pool). У них есть несколько способов запуска процессов синхронно или асинхронно, а получение кода возврата (либо путем ожидания, либо проверки позже) довольно гибко. Кроме того, есть несколько способов подключения обратных вызовов, которые выполняются, когда ваш подпроцесс завершается. – skrrgwasme

ответ

2

Вам не нужно ни мультипроцессирование, ни продевал здесь. Вы можете запустить несколько процессов ребенка параллельно и собирать свои уставы все в одном потоке:

#!/usr/bin/env python3 
from subprocess import Popen 

def run(cmd, log_filename): 
    with open(log_filename, 'wb', 0) as logfile: 
     return Popen(cmd, stdout=logfile) 

# start several subprocesses 
processes = {run(['echo', c], 'subprocess.%s.log' % c) for c in 'abc'} 
# now they all run in parallel 
# report as soon as a child process exits 
while processes: 
    for p in processes: 
     if p.poll() is not None: 
      processes.remove(p) 
      print('{} done, status {}'.format(p.args, p.returncode)) 
      break 

p.args магазинов cmd в Python 3.3+, отслеживать cmd себя на более ранних версиях Python.

Смотрите также:

Чтобы ограничить количество параллельных заданий, которые ThreadPool могут быть использованы (как показано на the first link):

#!/usr/bin/env python3 
from multiprocessing.dummy import Pool # use threads 
from subprocess import Popen 

def run_until_done(args): 
    cmd, log_filename = args 
    try: 
     with open(log_filename, 'wb', 0) as logfile: 
      p = Popen(cmd, stdout=logfile) 
     return cmd, p.wait(), None 
    except Exception as e: 
     return cmd, None, str(e) 

commands = ((('echo', str(d)), 'subprocess.%03d.log' % d) for d in range(500)) 
pool = Pool(128) # 128 concurrent commands at a time 
for cmd, status, error in pool.imap_unordered(run_until_done, commands): 
    if error is None: 
     fmt = '{cmd} done, status {status}' 
    else: 
     fmt = 'failed to run {cmd}, reason: {error}' 
    print(fmt.format_map(vars())) # or fmt.format(**vars()) on older versions 

Пул потоков в примере имеет 128 потоков (не более, не менее). Он не может выполнять более 128 заданий одновременно. Как только любой из потоков освобождается (выполняется с заданием), он принимает другой и т. Д. Общее количество заданий, выполняемых одновременно, ограничено количеством потоков. Новая работа не ждет завершения всех 128 предыдущих заданий.Он запускается, когда выполнено все старых заданий.

+0

Это вызовет исключение, потому что размер 'процессов' изменяется во время повтора по нему. Вам придется перебирать копию «процессов». – dano

+0

@ дано: спасибо, что заметили. Я исправил это. – jfs

+0

Нет проблем. Из любопытства, почему вы также переключились с набора на список? 'Remove' более эффективен с' set', и я бы предположил, что 'set.copy()' is O (n), то же самое, что и 'list [:]'. Конечно, для iterables этот маленький это на самом деле не имеет большого значения ... – dano

1

Если вы собираетесь запускать watchJob в потоке, нет никаких оснований занятой петли с p1.poll; просто позвоните p1.wait(), чтобы завершить процесс до завершения процесса. Использование цикла «занято» требует, чтобы GIL постоянно выпускался/повторно приобретался, что замедляло основной поток, а также привязывало CPU, что еще больше ухудшало производительность.

Кроме того, если вы не используете stdout дочернего процесса, вы не должны отправить его PIPE, потому что это может привести к тупиковой ситуации, если процесс записывает достаточно данных в stdout буфер, чтобы заполнить его (который на самом деле может быть то, что происходит в вашем случае). Здесь также нет необходимости использовать multiprocessing; просто позвоните Popen в основной поток, а затем подождите watchJob, пока процесс завершится.

import threading 
from subprocess import Popen 
from Queue import Queue 

def watchJob(p1, out_q): 
    p1.wait() 
    out_q.put(p1.returncode) 

out_q = Queue() 
outlst=[] 
p1 = Popen(['../../bin/jobexe','job_to_run']) 
t = threading.Thread(target=watchJob, args=(p1,out_q)) 
t.start() 
outlst.append(out_q.get()) 
t.join() 

Edit:

Вот как выполнять несколько заданий одновременно таким образом:

out_q = Queue() 
outlst = [] 
threads = [] 
num_jobs = 3 
for _ in range(num_jobs): 
    p = Popen(['../../bin/jobexe','job_to_run']) 
    t = threading.Thread(target=watchJob, args=(p1, out_q)) 
    t.start() 
    # Don't consume from the queue yet. 

# All jobs are running, so now we can start 
# consuming results from the queue. 
for _ in range(num_jobs): 
    outlst.append(out_q.get()) 
    t.join() 
+0

Извините, что здесь нет полного кода. Он требует многопроцессорности, потому что когда он будет завершен, он будет создавать множество заданий для нескольких процессоров на HPC. Я пробовал код с p1.wait(), а stdout перешел в файл журнала и не направлял его вообще. Он по-прежнему просто зависает после завершения программы на outlst.append (out_q.get()). Таким образом, ваше решение технически работает, если мне не нужно создавать несколько заданий, но просто цикл вокруг Popen заставит программу ждать. Вот почему у меня был поток наблюдателей, потому что я надеялся, что это не заставит программу ждать. – fatalaccidents

+0

@fatalaccidents Разве он зависает с моим кодом выше или с вашим кодом, который использует «multiprocessing.Process»? Какую «очередь» вы используете в коде, который вы используете? – dano

+0

Ваш код не виснет. Если вы добавите процесс (и очередь из многопроцессорности) в микс, вы можете заставить его не зависать, но если вы поместите его в цикл, он просто запустит одно задание, а затем следующее. Таким образом, он не запускает их параллельно. Сможете ли вы написать пример, который запускает задания одновременно? Спасибо за вашу помощь по этому поводу. – fatalaccidents

Смежные вопросы