2013-08-16 2 views
1

Я пишу скрипт python (для сред cygwin и linux) для запуска регрессионного тестирования в программе, которая запускается из командной строки с использованием подпроцесса .Popen(). В принципе, у меня есть набор заданий, подмножество которых необходимо запускать в зависимости от потребностей разработчика (порядка от 10 до 1000). Каждая работа может занять от нескольких секунд до 20 минут.Задания динамического переопределения в многопроцессорном пуле в Python

У меня есть работа, выполняющаяся успешно на нескольких процессорах, но я пытаюсь экономить время, разумно упорядочивая задания (исходя из прошлой производительности), чтобы сначала запускать более длинные задания. Усложнение состоит в том, что некоторые работы (расчеты стационарного состояния) должны выполняться перед другими (переходные процессы на основе начальных условий, определяемых установившимся состоянием).

Мой текущий метод обработки этого заключается в том, чтобы возвращать родительское задание и все дочерние задания рекурсивно в один и тот же процесс, но некоторые задания имеют несколько длинных детей. Как только родительское задание будет завершено, я хотел бы добавить детей обратно в пул, чтобы перейти к другим процессам, но их нужно будет добавить в голову очереди. Я не уверен, что смогу это сделать с помощью многопроцессорной обработки. Я искал примеры с Менеджером, но все они основаны на сетевом взаимодействии, и это не особенно применимо. Любая помощь в виде кода или ссылок на хороший учебник по многопроцессорности (я googled ...) была бы высоко оценена. Вот скелет кода для того, что у меня до сих пор, прокомментировал, чтобы указать дочерние задания, которые я хотел бы создать на других процессорах.

import multiprocessing 
import subprocess 

class Job(object): 
    def __init__(self, popenArgs, runTime, children) 
    self.popenArgs = popenArgs #list to be fed to popen 
    self.runTime = runTime #Approximate runTime for the job 
    self.children = children #Jobs that require this job to run first 

def runJob(job): 
    subprocess.Popen(job.popenArgs).wait() 
    #################################################### 
    #I want to remove this, and instead kick these back to the pool 
    for j in job.children: 
    runJob(j) 
    #################################################### 

def main(jobs): 
    # This jobs argument contains only jobs which are ready to be run 
    # ie no children, only parent-less jobs 
    jobs.sort(key=lambda job: job.runTime, reverse=True) 
    multiprocessing.Pool(4).map(runJob, jobs) 
+0

Случайное примечание: я бы рекомендовал не использовать многопроцессорность здесь, поскольку это не то, для чего он предназначен. Вы можете получить тот же результат с потоками или даже традиционным способом, запускающим и ожидающим процессы (модуль 'subprocess' и' os.wait() '). –

ответ

0

Во-первых, позвольте мне повторить комментарий Армина Риго: здесь нет причин использовать несколько процессов вместо нескольких потоков. В процессе контроля вы тратите большую часть своего времени, ожидая завершения субпроцессов; у вас нет работы с интенсивным процессором для распараллеливания.

Использование потоков также упростит вашу основную проблему. Прямо сейчас вы сохраняете задания в атрибутах других заданий, неявный график зависимостей. Вам нужна отдельная структура данных, которая заказывает задания с точки зрения планирования. Кроме того, каждое дерево рабочих мест в настоящее время связано с одним рабочим процессом. Вы хотите отделить своих работников от структуры данных, которую вы используете для хранения заданий. Затем рабочие рисуют задания из одной очереди задач; после того, как работник заканчивает свою работу, он ставит детей в заложники, которые затем могут обрабатываться любым имеющимся рабочим.

Поскольку вы хотите, чтобы дочерние задания были вставлены в начале строки, когда их родительский контур завершен, стековый контейнер, по-видимому, будет соответствовать вашим потребностям; модуль Queue обеспечивает потокобезопасный класс LifoQueue, который вы можете использовать.

import threading 
import subprocess 
from Queue import LifoQueue 

class Job(object): 
    def __init__(self, popenArgs, runTime, children): 
    self.popenArgs = popenArgs 
    self.runTime = runTime 
    self.children = children 

def run_jobs(queue): 
    while True: 
    job = queue.get() 
    subprocess.Popen(job.popenArgs).wait() 
    for child in job.children: 
     queue.put(child) 
    queue.task_done() 

# Parameter 'jobs' contains the jobs that have no parent. 
def main(jobs): 
    job_queue = LifoQueue() 
    num_workers = 4 
    jobs.sort(key=lambda job: job.runTime) 
    for job in jobs: 
    job_queue.put(job) 
    for i in range(num_workers): 
    t = threading.Thread(target=run_jobs, args=(job_queue,)) 
    t.daemon = True 
    t.start() 
    job_queue.join() 

Несколько примечаний: (1) Мы не можем знать, когда вся работа осуществляется мониторинг рабочих потоков, так как они не следят за работой, чтобы сделать. Это задача очереди. Таким образом, основной поток контролирует объект очереди, чтобы знать, когда вся работа завершена (job_queue.join()). Таким образом, мы можем отметить рабочие потоки как потоки демона, поэтому процесс будет исчезать всякий раз, когда основной поток работает, не дожидаясь рабочих. Таким образом, мы избегаем необходимости общения между основным потоком и рабочими потоками, чтобы сообщить последнему, когда выходить из своих петель и останавливаться.

(2) Мы знаем, что вся работа выполняется, когда все задачи, которые были установлены в очередь, были отмечены как выполненные (в частности, когда task_done() было вызвано числом раз, равным количеству элементов, которые были выставлены в очередь). Неправильно было бы использовать пустую очередь в качестве условия выполнения всей работы; очередь может быть кратковременно и вводить в заблуждение между высказыванием задания из него и вложением детей этой работы.

+0

Я думаю, что это уже сделано. Позвольте мне закончить тестирование, и я вручу вам чек. Большое спасибо за Вашу помощь! – joshindc

Смежные вопросы