2015-02-06 2 views
8

У меня есть функция, которая выполняет некоторую симуляцию, а возвращает массив в строчном формате.Многопроцессорность Python - отслеживание процесса работы pool.map

Я хочу запустить симуляцию (функцию) для различных значений входных параметров, более 10000 возможных входных значений, и записать результаты в один файл.

Я использую многопроцессорную обработку, в частности, функцию pool.map для параллельного запуска моделирования.

Поскольку весь процесс запуска функции моделирования более 10000 раз занимает очень много времени, я действительно хотел бы отслеживать процесс всей операции.

Я думаю, что проблема в моем текущем коде ниже заключается в том, что pool.map запускает функцию 10000 раз без отслеживания процесса во время этих операций. Как только параллельная обработка завершит выполнение 10000 симуляций (может быть от нескольких часов до нескольких дней.), Я продолжаю отслеживать, когда 10000 результатов моделирования сохраняются в файле. Так что это не отслеживает обработку операции pool.map.

Есть ли легкое исправление для моего кода, который позволит отслеживать процессы?

def simFunction(input): 
    # Does some simulation and outputs simResult 
    return str(simResult) 

# Parallel processing 

inputs = np.arange(0,10000,1) 

if __name__ == "__main__": 
    numCores = multiprocessing.cpu_count() 
    pool = multiprocessing.Pool(processes = numCores) 
    t = pool.map(simFunction, inputs) 
    with open('results.txt','w') as out: 
     print("Starting to simulate " + str(len(inputs)) + " input values...") 
     counter = 0 
     for i in t: 
      out.write(i + '\n') 
      counter = counter + 1 
      if counter%100==0: 
       print(str(counter) + " of " + str(len(inputs)) + " input values simulated") 
    print('Finished!!!!') 

ответ

7

Если вы используете итерированную функцию map, довольно легко отслеживать прогресс.

>>> from pathos.multiprocessing import ProcessingPool as Pool 
>>> def simFunction(x,y): 
... import time 
... time.sleep(2) 
... return x**2 + y 
... 
>>> x,y = range(100),range(-100,100,2) 
>>> res = Pool().imap(simFunction, x,y) 
>>> with open('results.txt', 'w') as out: 
... for i in x: 
...  out.write("%s\n" % res.next()) 
...  if i%10 is 0: 
...  print "%s of %s simulated" % (i, len(x)) 
... 
0 of 100 simulated 
10 of 100 simulated 
20 of 100 simulated 
30 of 100 simulated 
40 of 100 simulated 
50 of 100 simulated 
60 of 100 simulated 
70 of 100 simulated 
80 of 100 simulated 
90 of 100 simulated 

Или, вы можете использовать асинхронный map. Здесь я буду делать все по-другому, просто чтобы перепутать.

>>> import time 
>>> res = Pool().amap(simFunction, x,y) 
>>> while not res.ready(): 
... print "waiting..." 
... time.sleep(5) 
... 
waiting... 
waiting... 
waiting... 
waiting... 
>>> res.get() 
[-100, -97, -92, -85, -76, -65, -52, -37, -20, -1, 20, 43, 68, 95, 124, 155, 188, 223, 260, 299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1835, 1924, 2015, 2108, 2203, 2300, 2399, 2500, 2603, 2708, 2815, 2924, 3035, 3148, 3263, 3380, 3499, 3620, 3743, 3868, 3995, 4124, 4255, 4388, 4523, 4660, 4799, 4940, 5083, 5228, 5375, 5524, 5675, 5828, 5983, 6140, 6299, 6460, 6623, 6788, 6955, 7124, 7295, 7468, 7643, 7820, 7999, 8180, 8363, 8548, 8735, 8924, 9115, 9308, 9503, 9700, 9899] 

Обратите внимание, что я использую pathos.multiprocessing вместо multiprocessing. Это всего лишь вилка multiprocessing, которая позволяет вам делать map функции с несколькими входами, имеет намного лучшую сериализацию и позволяет выполнять вызовы map в любом месте (не только в __main__). Вы можете использовать multiprocessing, чтобы сделать это, но код будет немного отличаться.

Либо повторный или асинхронный map позволит вам написать любой код, который вы хотите улучшить отслеживание процессов.Например, передайте уникальный «идентификатор» для каждого задания и посмотрите, какие они вернутся, или возвратите каждое задание его идентификатора процесса. Существует множество способов отслеживания прогресса и процессов ... но вышеприведенное должно дать вам начало.

Вы можете получить pathos здесь: https://github.com/uqfoundation

+0

большое вам спасибо! – user32147

3

Нет «легкого исправления». map - это все, что вам нужно. И в этом случае вы хотите подробнее. То есть, вещи становятся немного более сложными, по определению. Вам нужно изменить парадигму общения. Есть много способов сделать это.

Первый: создать очередь для сбора ваших результатов и позволить вашим работникам поместить результаты в эту очередь. Затем вы можете в рамках потока мониторинга или процесса смотреть в очередь и потреблять результаты по мере их поступления. При потреблении вы можете анализировать их и генерировать выходные данные журнала. Это может быть наиболее общий способ отслеживания прогресса: вы можете в любой момент реагировать на поступающие результаты, в режиме реального времени.

Более простой способ может заключаться в том, чтобы слегка изменить вашу рабочую функцию и создать там вывод журнала. Тщательно анализируя выход журнала с помощью внешних инструментов (таких как grep и wc), вы можете найти очень простые средства отслеживания.

+1

спасибо. не могли бы вы представить простой пример? – user32147

3

Я думаю, что вам нужно, это файл журнала.

Я бы порекомендовал вам использовать протокол регистрации , который является частью стандартной библиотеки Python. Но, к сожалению, протоколирование не является многопроцессорным. Поэтому вы не можете использовать его в своем приложении.

Таким образом, вам необходимо будет использовать обработчик протокола с многопроцессорной обработкой или реализовать вашу систему с помощью очереди или блокировки вместе с протоколом , регистрирующим.

Об этом в Stackoverflow. Это, например: How should I log while using multiprocessing in Python?

Если большая часть нагрузки CPU находится в функции моделирования и вы не собираетесь использовать ротацию журнала, вероятно, можно использовать простой механизм блокировки, как это:

import multiprocessing 
import logging 

from random import random 
import time 


logging.basicConfig(
    level=logging.DEBUG, 
    format='%(asctime)s %(process)s %(levelname)s %(message)s', 
    filename='results.log', 
    filemode='a' 
) 


def simulation(a): 
    # logging 
    with multiprocessing.Lock(): 
     logging.debug("Simulating with %s" % a) 

    # simulation 
    time.sleep(random()) 
    result = a*2 

    # logging 
    with multiprocessing.Lock(): 
     logging.debug("Finished simulation with %s. Result is %s" % (a, result)) 

    return result 

if __name__ == '__main__': 

    logging.debug("Starting the simulation") 
    inputs = [x for x in xrange(100)] 
    num_cores = multiprocessing.cpu_count() 
    print "num_cores: %d" % num_cores 
    pool = multiprocessing.Pool(processes=num_cores) 
    t = pool.map(simulation, inputs) 
    logging.debug("The simulation has ended") 

Вы можете «tail -f» ваш файл журнала при запуске. Это то, что вы должны увидеть:

2015-02-08 18:10:00,616 3957 DEBUG Starting the simulation 
2015-02-08 18:10:00,819 3961 DEBUG Simulating with 12 
2015-02-08 18:10:00,861 3965 DEBUG Simulating with 28 
2015-02-08 18:10:00,843 3960 DEBUG Simulating with 20 
2015-02-08 18:10:00,832 3959 DEBUG Simulating with 16 
2015-02-08 18:10:00,812 3958 DEBUG Simulating with 8 
2015-02-08 18:10:00,798 3963 DEBUG Simulating with 4 
2015-02-08 18:10:00,855 3964 DEBUG Simulating with 24 
2015-02-08 18:10:00,781 3962 DEBUG Simulating with 0 
2015-02-08 18:10:00,981 3961 DEBUG Finished simulation with 12. Result is 24 
2015-02-08 18:10:00,981 3961 DEBUG Simulating with 13 
2015-02-08 18:10:00,991 3958 DEBUG Finished simulation with 8. Result is 16 
2015-02-08 18:10:00,991 3958 DEBUG Simulating with 9 
2015-02-08 18:10:01,130 3964 DEBUG Finished simulation with 24. Result is 48 
2015-02-08 18:10:01,131 3964 DEBUG Simulating with 25 
2015-02-08 18:10:01,134 3964 DEBUG Finished simulation with 25. Result is 50 
2015-02-08 18:10:01,134 3964 DEBUG Simulating with 26 
2015-02-08 18:10:01,315 3961 DEBUG Finished simulation with 13. Result is 26 
2015-02-08 18:10:01,316 3961 DEBUG Simulating with 14 
2015-02-08 18:10:01,391 3961 DEBUG Finished simulation with 14. Result is 28 
2015-02-08 18:10:01,391 3961 DEBUG Simulating with 15 
2015-02-08 18:10:01,392 3963 DEBUG Finished simulation with 4. Result is 8 
2015-02-08 18:10:01,393 3963 DEBUG Simulating with 5 

Пробовал Windows и Linux.

Надеюсь, это поможет

+0

'multiprocessing.get_logger()' возвращает ограниченный функцией журнал регистрации, защищенный блокировками, см. Https://docs.python.org/2/library/multiprocessing.html#logging –

+0

Да, но это регистратор модуля ... так что вы можете использовать его, ваш журнал будет смешан с сообщениями на уровне модуля: попробуйте, и вы увидите такие сообщения: 2015-02-08 23: 47: 10,954 9288 DEBUG создал сальто с ручкой 448 –

+0

О, вы право, я никогда не использовал его на самом деле и слишком быстро просмотрел документы. –

Смежные вопросы