2016-06-22 2 views
0

У меня есть код, который мне удалось paralelize поблагодарить this question:Multiprocessing - Предел использования CPU

1| def function(name, params): 
2| results = fits.open(name) 
3| <do something more to results> 
4| return results 
5| 
6| def function_wrapper(args): 
7|  return function(*args) 
8| 
9| params = [...,...,..., etc]  
10| 
11| p = multiprocessing..Pool(processes=(max([2, mproc.cpu_count() // 10]))) 
12| args_generator = ((name, params) for name in names) 
13| 
14| dictionary = dict(zip(names, p.map(function_wrapper, args_generator))) 

Если я правильно понял, как pool СМР, ПНР, Numer процессов, указанных в строке должна быть максимальное количество процессов, которые порождаются в данный момент времени. Таким образом, это должно ограничивать использование моего процессора, не так ли? Я имею в виду, как я это понимаю, как указано в строке 11, максимальное количество используемых процессов/процессоров должно быть не более [2, number_of_cpus/10].

Тем не менее, когда я запускаю свой код, я вижу, что вскоре после запуска всех процессоров на 100%. Я что-то упускаю?

ПРИМЕЧАНИЕ: Для контекста мне необходимо ограничить использование моего процессора максимальным количеством ядер, так как я буду использовать общий сервер.

ОБНОВЛЕНИЕ: добавьте исправленную версию моего кода. Вместо открытия файла fits я создаю шумную гауссову кривую, подобную моему спектру (хотя и лучше себя ...).

Обрезка помогла решить проблему. Внутренняя функция fnBootstrapInstance была выполнена на двухмерном массиве (в основном, эшелевом спектре), который я повторил с помощью for loop. По какой-то причине удаление цикла, решение проблемы, и было использовано только количество ядер, которые я указал. Я предполагаю, что по какой-то причине цикл for породил серию подпроцессов (так оно появилось на htop). Итерация по одному порядку спектров эшелелей в то время решала проблему.

# Imports 
#%matplotlib inline 
import sys 
import numpy as np 
import matplotlib.pyplot as mplt 
import numpy.random as rnd 
import scipy.optimize as opt 
import multiprocessing as mproc 

# Functions ================================================== 
def fnBootstrapInstance(XXX = None, YYY= None, function= None, lenght=None, fitBounds= None, initParams=None, **kwargs): 

    # define samples 
    indexes = sorted(rnd.choice(range(len(XXX)), size=lenght, replace=True)) 
    samplesXXX = XXX[indexes] 
    samplesYYY = YYY[indexes] 

    fitBounds = ([-np.inf,-np.inf,0,-np.inf],[np.inf,np.inf,np.inf,np.inf]) 

    params, cov = opt.curve_fit(function, samplesXXX.ravel(), samplesYYY.ravel(), p0=initParams, 
           bounds = fitBounds, 
           ) 

    return params 

def wrapper_fnBootstrapInstance(args): 
    return fnBootstrapInstance(**args) 

def fnGaussian(dataXXX, Amp, mean, FWHM, B): 
    return B - Amp * np.exp(-4 * np.log(2) * (((dataXXX - mean)/FWHM) ** 2)) 
# Functions ================================================== 


# Noise Parameters 
arrLen = 1000 
noiseAmp = 0. 
noiseSTD = .25 

# Gaussian Data Parameters 
amp = 1. 
mean = 10 
FWHM = 30. 
B = 1. 

# generate random gauss data 
arrGaussXXX = np.linspace(-50, 60,num = arrLen) 
arrGaussNoise = rnd.normal(noiseAmp,noiseSTD, arrLen) 
arrGaussYYY = fnGaussian(arrGaussXXX, amp, mean, FWHM, B) + arrGaussNoise 

# multiprocessing bit 
numIterations = 1000 

mprocPool = mproc.Pool(processes=(max([2, mproc.cpu_count() // 10]))) 

initParams = [max(arrGaussYYY) - min(arrGaussYYY), np.median(arrGaussXXX), 
         max(arrGaussXXX) - min(arrGaussXXX), max(arrGaussYYY)] 

args_generator = [{'XXX':arrGaussXXX, 'YYY':arrGaussYYY, 'function':fnGaussian, 'initParams':initParams, 
        'lenght':200} for n in range(numIterations)] 

fitParams = [] 
for results in mprocPool.imap(wrapper_fnBootstrapInstance, args_generator): 
    fitParams.append([results[0],results[1],results[2],results[3]]) 



bootParams = [(np.nanmedian(param),np.nanstd(param)) for param in np.array(fitParams).T] 
print '\n'.join('{:.2f}+-{:.2f} ({:.1f}%)'.format(param[0],param[1], param[1]/param[0]*100) for param in bootParams) 

mplt.figure(figsize=(20,10)) 
mplt.plot(arrGaussXXX, arrGaussYYY,'+') 
for params in fitParams: 
    mplt.plot(arrGaussXXX,fnGaussian(arrGaussXXX,*params),'r', alpha = .5) 
mplt.show() 


mprocPool.close() 

Спасибо всем!

+2

Это должно действительно (ограничить количество используемых ЦП). Пример вашего кода даже не синтаксически недействителен, но гораздо менее возможен, поэтому невозможно сказать, что может быть неправильным. – torek

+0

Что такое вывод 'mproc.cpu_count()'? Что такое вывод 'mproc.cpu_count() // 10'? – proinsias

+1

Этот код ограничивает количество процессов в пуле, которые вы можете проверить независимо (например, 'ps x' на linux). Но это также зависит от того, что делают эти процессы. Если они также отжимают многопроцессы или звонят во что-то, что создает много потоков (возможно, «pandas'), вы все равно будете использовать весь процессор. – tdelaney

ответ

1

Рассмотрите возможность использования multiprocessing.pool.ThreadPool. Он предоставляет тот же API, что и multiprocessing.Pool, но вместо этого абстрагирует рабочую нагрузку на коллекцию потоков. Обратите внимание: если ваш процессор поддерживает Hyper-threading, он скорее всего распределит рабочую нагрузку по физическим ядрам.

+0

Спасибо, я посмотрю. – jorgehumberto

Смежные вопросы