2015-07-21 7 views
2

Итак, я установил короткий скрипт, который выполняет внешнюю программу (написанную на языке Fortran 77). Я хочу запускать несколько экземпляров программы и так как у меня 8 ядер на моем компьютере, самое простое решением я нашел:Запуск нескольких внешних программ

import subprocess 


import os 


i = n 

while(i<n): 
    dire = "dir/Run"+str(i)+"/" 
    os.chdir(dire) 
    p1 = subprocess.Popen(['./mej']) 
    dire = "dir/Run"+str(i+1)+"/" 
    os.chdir(dire) 
    p2 = subprocess.Popen(['./mej']) 
    dire = "dir/Run"+str(i+2)+"/" 
    os.chdir(dire) 
    p3 = subprocess.Popen(['./mej']) 
    dire = "dir/Run"+str(i+3)+"/" 
    os.chdir(dire) 
    p4 = subprocess.Popen(['./mej']) 
    dire = "dir/Run"+str(i+4)+"/" 
    os.chdir(dire) 
    p5 = subprocess.Popen(['./mej']) 
    dire = "dir/Run"+str(i+5)+"/" 
    os.chdir(dire) 
    p6 = subprocess.Popen(['./mej']) 
    dire = "dir/Run"+str(i+6)+"/" 
    os.chdir(dire) 
    p7 = subprocess.Popen(['./mej']) 
    dire = "dir/Run"+str(i+7)+"/" 
    os.chdir(dire) 
    p8 = subprocess.Popen(['./mej']) 
    dire = "/Run"+str(i+8)+"/" 
    os.chdir(dire) 
    p3 = subprocess.Popen(['./mej']) 
    exit_codes = [p.wait() for p in p1, p2, p3, p4, p5, p6, p7, p8] 
    i = i + 8 



print "Job's done!" 

Теперь это работало в основном штраф в первом, но я просто изменил к переменному шагу времени и при этом время каждой интеграции выполняется значительно. Теперь проблема в том, что скрипт будет ждать, пока самый медленный будет закончен, прежде чем запускать новый набор интеграций. Как я могу написать его, чтобы у меня всегда было 8 экземпляров?

ответ

1

Вы можете использовать пул потоков, чтобы держать все процессоры заняты:

#!/usr/bin/env python 
import os 
import subprocess 
from multiprocessing.pool import ThreadPool 

def run(i): 
    working_dir = "dir/Run/" + str(i + 1) 
    return i, subprocess.call(os.path.join(working_dir, 'mej'), cwd=working_dir) 

results = ThreadPool().map(run, range(n)) 

Как только заканчивается один mej процесс, следующий начинается. Одновременно выполняется не более os.cpu_count() одновременных рабочих процессов.

+0

Далеко короче и лучше, чем мое предложение – innoSPG

+0

Ничего себе, какое чистое решение. Раньше я должен был попасть в Python. Благодаря! –

0

Хотя время выполнения для данного пробега может значительно различаться, часто можно предположить, что время, затрачиваемое, например, 10 последовательных прогонов будут иметь гораздо меньшую дисперсию.

Таким образом, простое решение A должно запустить 8 процессов, каждый из которых вызывает внешнюю программу 10 раз, а затем дождаться завершения этих процессов. Вам все равно придется ждать самого медленного процесса, но накладные расходы будут значительно меньше.

Конечно, есть очевидное решение B: создать пул ожидающих прогонов с 8 процессами, которые собирают новый прогон из пула, как только они закончат свой текущий прогон. Это действительно минимизирует ovehead, но здесь вам придется иметь дело с примитивами syncronization.

Вот небольшая иллюстрация из этих 3-х подходов (тот, который вы используете, и два я говорю):

enter image description here

Маленькие красные квадраты показывают, где возможности для улучшения есть. В принципе, подход A позволяет избежать остановки каждого потока, но один после каждого прогона. Подход B идет еще дальше и позволяет нитку, которая закончила все ее прогоны, взять один из другого потока.

+1

Я получил хорошее решение от другого плаката, но спасибо за идеи/иллюстрации, я буду помнить об этом в будущем. –

0

Вы можете написать что-то похожее. Определите общее количество прогонов и количество доступных ядер, а также задержку, чтобы проверить, выполнено ли это. Для задержки просто укажите количество секунд, которое является разумным. Если один процесс выполняется за 10 минут в среднем, 60 секунд задержки или меньше может быть достаточно хорошим.

import subprocess 
import time 
import os 

def runIt(rootDir, prog, i): 
    dire = "dir/Run/" + str(i + 1) 
    os.chdir(dire) 
    return subprocess.Popen(['./mej']) 

n=16 #total number of runs 
nProc = 8 # number of cores 
i = 0 
delay = 2 #delays in second to check if one has returned 

pList = [runIt(p) for p in range(min(nProc, n))] 
i = len(pList) 
while(i<n): 
    time.sleep(delay) # delays for delay seconds 
    for j in range(len(pList)): 
     pList[j].poll() 
     if pList[j].returncode is not None and i<n: 
      pList[j] = runIt(i) 
      i = i+1 
print "Job's done!" 
Смежные вопросы