2015-09-02 1 views
0

Я хочу сделать кластеризацию на 10 000 моделей. До этого я должен вычислить коэффициент коррализации pearson, связанный с каждой из двух моделей. Это большой объем вычислений, поэтому я использую мультипроцессирование нереститься процессов, назначая вычислительную работу 16 кода cpus.My, как это:python: Зачем мне заставлять меня ждать?

import numpy as np 
from multiprocessing import Process, Queue 

def cc_calculator(begin, end, q): 
    index=lambda i,j,n: i*n+j-i*(i+1)/2-i-1 
    for i in range(begin, end): 
     for j in range(i, nmodel): 
      all_cc[i][j]=get_cc(i,j) 
      q.put((index(i,j,nmodel),all_cc[i][j])) 

def func(i): 
    res=(16-i)/16 
    res=res**0.5 
    res=int(nmodel*(1-res)) 
    return res 

nmodel=int(raw_input("Entering the number of models:")) 
all_cc=np.zeros((nmodel,nmodel)) 
ncc=int(nmodel*(nmodel-1)/2) 
condensed_cc=[0]*ncc 
q=Queue() 
mprocess=[] 

for ii in range(16): 
    begin=func(i) 
    end=func(i+1) 
    p=Process(target=cc_calculator,args=(begin,end,q)) 
    mprocess+=[p] 
    p.start() 

for x in mprocess: 
    x.join() 

while not q.empty(): 
    (ind, value)=q.get() 
    ind=int(ind) 
    condensed_cc[ind]=value 
np.save("condensed_cc",condensed_cc) 

где get_cc (I, J) вычисляет коэффициент corralation, связанный с моделью i и j. all_cc - верхняя треугольная матрица, а all_cc [i] [j] сохраняет значение cc. condensed_cc - еще одна версия all_cc. Я обработаю его, чтобы получить conded_dist, чтобы выполнить кластеризацию. Функция «func» помогает назначать каждому процессору почти те же самые вычисления.

Я успешно запускаю программу с помощью nmodel = 20. Однако, когда я пытаюсь запустить программу с nmodel = 10 000, кажется, что она никогда не заканчивается. Я жду около двух дней и использую верхнюю команду в другом окне терминала, процесс с командой «python» все еще работает. Но программа все еще работает, и нет выходного файла. Я использую Ctrl + C, чтобы заставить его остановиться, он указывает на строку: x.join(). nmodel = 40 быстро работал, но не справился с той же проблемой.

Возможно, эта проблема имеет какое-то отношение к q. Потому что, если я комментирую строку: q.put (...), он работает successfully.Or что-то вроде этого:

q.put(...) 
q.get() 

Он также ok.but два метода не даст правильный condensed_cc. Они не меняют all_cc или condensed_cc.

Другой пример только с одним подпроцесса:

from multiprocessing import Process, Queue 

def g(q): 
    num=10**2 
    for i in range(num): 
     print '='*10 
     print i 
     q.put((i,i+2)) 
     print "qsize: ", q.qsize() 

q=Queue() 
p=Process(target=g,args=(q,)) 
p.start() 
p.join() 

while not q.empty(): 
    q.get() 

Это нормально с NUM = 100, но не с NUM = 10000. Даже при num = 100 ** 2 они печатали все i и q.qsizes. Я не могу понять, почему. Кроме того, Ctrl + C возвращает трассировку назад к p.join().

Я хочу сказать больше о проблеме размера очереди. Documentation about Queue and its put method представляет Queue as Queue ([maxsize]), и он говорит о методе put: ... block, если необходимо, до тех пор, пока не будет доступен свободный слот. Все это заставляет думать, что подпроцесс заблокирован из-за истечения пробелов в очереди. Однако, как я уже упоминал ранее во втором примере, результат, напечатанный на экране, доказывает увеличение qsize, что означает, что очередь не заполнена. Добавить одну строку:

print q.full() 

после утверждения размера печати, всегда ложно для Num = 10000, а программа все еще застряли где-то. Подчеркните одно: верхняя команда в другом терминале не показывает процесс с командой python. Это меня озадачивает.

Я использую python 2.7.9.

+0

Вы начали каждый процесс в списке 'mprocess' перед тем, как присоединиться к ним? 'for x in mprocess: x.start()' – ozgur

+0

@ozgur Я сделал. Извините за отсутствие этого в моем вопросе. Я снова отредактировал вопрос. – dudu

+0

звучит так, будто ваша очередь заканчивается, вы, вероятно, должны ее пустить, поскольку она заполняется. вы также пытаетесь изменить глобальный список в процессе, который не будет работать. – bj0

ответ

0

Я считаю, что проблема, которую вы работаете в описано в руководствах многопроцессорная программирования: https://docs.python.org/2/library/multiprocessing.html#multiprocessing-programming

В частности, этот раздел:

Объединение процессов, которые используют очереди

Имейте в виду, что процесс который помещает предметы в очередь, будет ждать до завершения, пока все буферизованные элементы не будут подаваться нитью фидера в базовый канал. (Для предотвращения этого поведения дочерний процесс может вызвать метод cancel_join_thread() очереди.)

Это означает, что всякий раз, когда вы используете очередь, вы должны убедиться, что все элементы, которые были помещены в очередь, в конечном итоге будут удалены до присоединения процесса. В противном случае вы не можете быть уверены, что процессы, которые поместили элементы в очередь, будут завершены. Помните также, что не-демонические процессы будут соединены автоматически.

Пример, который тупиковой является следующее:

from multiprocessing import Process, Queue 

def f(q): 
    q.put('X' * 1000000) 

if __name__ == '__main__': 
    queue = Queue() 
    p = Process(target=f, args=(queue,)) 
    p.start() 
    p.join()     # this deadlocks 
    obj = queue.get() 

Исправление здесь было бы поменять местами две последние строки (или просто удалить строку p.join()).

Вы также можете ознакомиться с разделом «Избегайте общего состояния».

Похоже, вы используете .join, чтобы избежать состояния гонки q.empty(), возвращая True, прежде чем что-то будет добавлено к нему. Вы не должны полагаться на .empty() вообще при использовании многопроцессорности (или многопоточности). Вместо этого вы должны обработать это сигналом от рабочего процесса до основного процесса, когда это делается, добавив элементы в очередь. Обычно это делается путем размещения в очереди ценной стоимости, но есть и другие варианты.

Смежные вопросы