2014-10-09 2 views
1

Я попытался (безуспешно) распараллелить цикл с использованием многопроцессорности. Вот мой код Python:Многопроцессорность Python - ошибка при использовании нескольких процессов

from MMTK import * 
from MMTK.Trajectory import Trajectory, TrajectoryOutput, SnapshotGenerator 
from MMTK.Proteins import Protein, PeptideChain 
import numpy as np 

filename = 'traj_prot_nojump.nc' 

trajectory = Trajectory(None, filename) 
universe = trajectory.universe 
proteins = universe.objectList(Protein) 
chain = proteins[0][0] 

def calpha_2dmap_mult(t = range(0,len(trajectory))): 
    dist = [] 
    global trajectory 
    universe = trajectory.universe 
    proteins = universe.objectList(Protein) 
    chain = proteins[0][0] 
    traj = trajectory[t] 
    dt = 1000 # calculate distance every 1000 steps 
    for n, step in enumerate(traj): 
     if n % dt == 0: 
      universe.setConfiguration(step['configuration']) 
      for i in np.arange(len(chain)-1): 
       for j in np.arange(len(chain)-1): 
        dist.append(universe.distance(chain[i].peptide.C_alpha, 
                chain[j].peptide.C_alpha)) 
    return(dist) 

dist1 = calpha_2dmap_mult(range(1000,2000)) 
dist2 = calpha_2dmap_mult(range(2000,3000)) 

# Multiprocessing 
from multiprocessing import Pool, cpu_count 

pool = Pool(processes=2) 
dist_pool = [pool.apply(calpha_2dmap_mult, args=(t,)) for t in [range(1000,2000), range(2000,3000)]] 

print(dist_pool[0]==dist1) 
print(dist_pool[1]==dist2) 

Если я пытаюсь Pool(processes = 1), код работает, как ожидалось, но как только я прошу более одного процесса, код вылетает с этой ошибкой:

python: posixio.c:286: px_pgin: Assertion `*posp == ((off_t)(-1)) || *posp == lseek(nciop->fd, 0, 1)' failed. 

Если кто-то есть предложение, то он будет очень признателен ;-)

ответ

0

Я подозреваю, что это из-за этого:

trajectory = Trajectory(None, filename) 

Вы открываете файл только один раз, с самого начала. Вероятно, вы должны просто передать имя файла в целевую функцию многопроцессорности и открыть его там.

0

Если вы используете этот код для OS X или любой другой Unix-подобной системы, многопроцессор использует forking для создания подпроцессов.

При форсировании дескрипторы файлов совместно используются родительским процессом. Насколько я могу судить, объект траектории содержит ссылку на дескриптор файла.

Чтобы исправить это, вы должны поместить

trajectory = Trajectory(None, filename)

в calpha_2dmap_mult, чтобы гарантировать, что каждый подпроцесс открывает файл отдельно.

+0

Благодаря вашим комментариям (@John и @Wynand), я знаю, что можно использовать более одного процесса ... но производительность не улучшилась вообще! Новый скрипт написан в следующем ответе! – guillaume

0

Вот новый скрипт, который позволяет использовать более одного процесса (но не улучшение производительности):

from MMTK import * 
from MMTK.Trajectory import Trajectory, TrajectoryOutput, SnapshotGenerator 
from MMTK.Proteins import Protein, PeptideChain 
import numpy as np 
import time 

filename = 'traj_prot_nojump.nc' 


trajectory = Trajectory(None, filename) 
universe = trajectory.universe 
proteins = universe.objectList(Protein) 
chain = proteins[0][0] 

def calpha_2dmap_mult(trajectory = trajectory, t = range(0,len(trajectory))): 
    dist = [] 
    universe = trajectory.universe 
    proteins = universe.objectList(Protein) 
    chain = proteins[0][0] 
    traj = trajectory[t] 
    dt = 1000 # calculate distance every 1000 steps 
    for n, step in enumerate(traj): 
     if n % dt == 0: 
      universe.setConfiguration(step['configuration']) 
      for i in np.arange(len(chain)-1): 
       for j in np.arange(len(chain)-1): 
        dist.append(universe.distance(chain[i].peptide.C_alpha, 
                chain[j].peptide.C_alpha)) 
    return(dist) 

c0 = time.time() 
dist1 = calpha_2dmap_mult(trajectory, range(0,11001)) 
#dist1 = calpha_2dmap_mult(trajectory, range(0,11001)) 
c1 = time.time() - c0 
print(c1) 


# Multiprocessing 
from multiprocessing import Pool, cpu_count 

pool = Pool(processes=4) 
c0 = time.time() 
dist_pool = [pool.apply(calpha_2dmap_mult, args=(trajectory, t,)) for t in 
      [range(0,2001), range(3000,5001), range(6000,8001), 
       range(9000,11001)]] 
c1 = time.time() - c0 
print(c1) 


dist1 = np.array(dist1) 
dist_pool = np.array(dist_pool) 
dist_pool = dist_pool.flatten() 
print(np.all((dist_pool == dist1))) 

Время, затраченное для вычисления расстояния является «же» без (70.1s) или с многопроцессорность (70,2 с)! Возможно, я не ожидал улучшения в 4 раз, но я, по крайней мере, ожидал некоторых улучшений!

0

Похоже, что это может быть проблемой при чтении файла netCDF поверх NFS. Является traj_prot_nojump.nc на хранилище NFS? См. this Unidata mailing list post и this post to the IDL newsgroup. В последнем случае было предложено обходное решение, чтобы скопировать файл в локальное хранилище.

+0

Трюк состоял в том, чтобы использовать pool.apply_async вместо pool.apply, чтобы получить ожидаемую производительность. Для объяснения см. [Http://stackoverflow.com/questions/26356757/python-multiprocessing-no-performance-gain-with-multiple-processes]. – guillaume

Смежные вопросы