2013-12-13 3 views
11

Я пытаюсь добавить multiprocessing в код, который содержит функции, которые я не могу изменить. Я хочу, чтобы эти функции выполнялись как задания для многопроцессорного пула асинхронно. Я делаю что-то похожее на код, показанный here. Однако я не уверен, как отслеживать результаты. Как я могу узнать, к какой прикладной функции соответствует возвращенный результат?Как отслеживать асинхронные результаты, возвращаемые из многопроцессорного пула

Важные моменты, которые следует подчеркнуть, это то, что я не могу изменять существующие функции (другие вещи полагаются на них, оставаясь такими, какими они есть), и что результаты могут быть возвращены в порядке, отличном от порядка, в котором применяются задания задания бассейн.

Спасибо за любые мысли!

EDIT: Некоторые пытаются код ниже:

import multiprocessing 
from multiprocessing import Pool 
import os 
import signal 
import time 
import inspect 

def multiply(multiplicand1=0, multiplicand2=0): 
    return multiplicand1*multiplicand2 

def workFunctionTest(**kwargs): 
    time.sleep(3) 
    return kwargs 

def printHR(object): 
    """ 
    This function prints a specified object in a human readable way. 
    """ 
    # dictionary 
    if isinstance(object, dict): 
     for key, value in sorted(object.items()): 
      print u'{a1}: {a2}'.format(a1=key, a2=value) 
    # list or tuple 
    elif isinstance(object, list) or isinstance(object, tuple): 
     for element in object: 
      print element 
    # other 
    else: 
     print object 

class Job(object): 
    def __init__(
     self, 
     workFunction=workFunctionTest, 
     workFunctionKeywordArguments={'testString': "hello world"}, 
     workFunctionTimeout=1, 
     naturalLanguageString=None, 
     classInstance=None, 
     resultGetter=None, 
     result=None 
     ): 
     self.workFunction=workFunction 
     self.workFunctionKeywordArguments=workFunctionKeywordArguments 
     self.workFunctionTimeout=workFunctionTimeout 
     self.naturalLanguageString=naturalLanguageString 
     self.classInstance=self.__class__.__name__ 
     self.resultGetter=resultGetter 
     self.result=result 
    def description(self): 
     descriptionString="" 
     for key, value in sorted(vars(self).items()): 
      descriptionString+=str("{a1}:{a2} ".format(a1=key, a2=value)) 
     return descriptionString 
    def printout(self): 
     """ 
     This method prints a dictionary of all data attributes. 
     """ 
     printHR(vars(self)) 

class JobGroup(object): 
    """ 
    This class acts as a container for jobs. The data attribute jobs is a list of job objects. 
    """ 
    def __init__(
     self, 
     jobs=None, 
     naturalLanguageString="null", 
     classInstance=None, 
     result=None 
     ): 
     self.jobs=jobs 
     self.naturalLanguageString=naturalLanguageString 
     self.classInstance=self.__class__.__name__ 
     self.result=result 
    def description(self): 
     descriptionString="" 
     for key, value in sorted(vars(self).items()): 
      descriptionString+=str("{a1}:{a2} ".format(a1=key, a2=value)) 
     return descriptionString 
    def printout(self): 
     """ 
     This method prints a dictionary of all data attributes. 
     """ 
     printHR(vars(self)) 

def initialise_processes(): 
    signal.signal(signal.SIGINT, signal.SIG_IGN) 

def execute(
     jobObject=None, 
     numberOfProcesses=multiprocessing.cpu_count() 
     ): 
     # Determine the current function name. 
    functionName=str(inspect.stack()[0][3]) 
    def collateResults(result): 
     """ 
     This is a process pool callback function which collates a list of results returned. 
     """ 
     # Determine the caller function name. 
     functionName=str(inspect.stack()[1][3]) 
     print("{a1}: result: {a2}".format(a1=functionName, a2=result)) 
     results.append(result) 
    def getResults(job): 
     # Determine the current function name. 
     functionName=str(inspect.stack()[0][3]) 
     while True: 
      try: 
       result=job.resultGetter.get(job.workFunctionTimeout) 
       break 
      except multiprocessing.TimeoutError: 
       print("{a1}: subprocess timeout for job".format(a1=functionName, a2=job.description())) 
     #job.result=result 
     return result 
    # Create a process pool. 
    pool1 = multiprocessing.Pool(numberOfProcesses, initialise_processes) 
    print("{a1}: pool {a2} of {a3} processes created".format(a1=functionName, a2=str(pool1), a3=str(numberOfProcesses))) 
    # Unpack the input job object and submit it to the process pool. 
    print("{a1}: unpacking and applying job object {a2} to pool...".format(a1=functionName, a2=jobObject)) 
    if isinstance(jobObject, Job): 
     # If the input job object is a job, apply it to the pool with its associated timeout specification. 
     # Return a list of results. 
     job=jobObject 
     print("{a1}: job submitted to pool: {a2}".format(a1=functionName, a2=job.description())) 
     # Apply the job to the pool, saving the object pool.ApplyResult to the job object. 
     job.resultGetter=pool1.apply_async(
       func=job.workFunction, 
       kwds=job.workFunctionKeywordArguments 
     ) 
     # Get results. 
     # Acquire the job result with respect to the specified job timeout and apply this result to the job data attribute result. 
     print("{a1}: getting results for job...".format(a1=functionName)) 
     job.result=getResults(job) 
     print("{a1}: job completed: {a2}".format(a1=functionName, a2=job.description())) 
     print("{a1}: job result: {a2}".format(a1=functionName, a2=job.result)) 
     # Return the job result from execute. 
     return job.result 
     pool1.terminate() 
     pool1.join() 
    elif isinstance(jobObject, JobGroup): 
     # If the input job object is a job group, cycle through each job and apply it to the pool with its associated timeout specification. 
     for job in jobObject.jobs: 
      print("{a1}: job submitted to pool: {a2}".format(a1=functionName, a2=job.description())) 
      # Apply the job to the pool, saving the object pool.ApplyResult to the job object. 
      job.resultGetter=pool1.apply_async(
        func=job.workFunction, 
        kwds=job.workFunctionKeywordArguments 
      ) 
     # Get results. 
     # Cycle through each job and and append the result for the job to a list of results. 
     results=[] 
     for job in jobObject.jobs: 
      # Acquire the job result with respect to the specified job timeout and apply this result to the job data attribute result. 
      print("{a1}: getting results for job...".format(a1=functionName)) 
      job.result=getResults(job) 
      print("{a1}: job completed: {a2}".format(a1=functionName, a2=job.description())) 
      #print("{a1}: job result: {a2}".format(a1=functionName, a2=job.result)) 
      # Collate the results. 
      results.append(job.result) 
     # Apply the list of results to the job group data attribute results. 
     jobObject.results=results 
     print("{a1}: job group results: {a2}".format(a1=functionName, a2=jobObject.results)) 
     # Return the job result list from execute. 
     return jobObject.results 
     pool1.terminate() 
     pool1.join() 
    else: 
     # invalid input object 
     print("{a1}: invalid job object {a2}".format(a1=functionName, a2=jobObject)) 

def main(): 
    print('-'*80) 
    print("MULTIPROCESSING SYSTEM DEMONSTRATION\n") 

    # Create a job. 
    print("# creating a job...\n") 
    job1=Job(
      workFunction=workFunctionTest, 
      workFunctionKeywordArguments={'testString': "hello world"}, 
      workFunctionTimeout=4 
    ) 
    print("- printout of new job object:") 
    job1.printout() 
    print("\n- printout of new job object in logging format:") 
    print job1.description() 

    # Create another job. 
    print("\n# creating another job...\n") 
    job2=Job(
      workFunction=multiply, 
      workFunctionKeywordArguments={'multiplicand1': 2, 'multiplicand2': 3}, 
      workFunctionTimeout=6 
    ) 
    print("- printout of new job object:") 
    job2.printout() 
    print("\n- printout of new job object in logging format:") 
    print job2.description() 

    # Create a JobGroup object. 
    print("\n# creating a job group (of jobs 1 and 2)...\n") 
    jobGroup1=JobGroup(
      jobs=[job1, job2], 
    ) 
    print("- printout of new job group object:") 
    jobGroup1.printout() 
    print("\n- printout of new job group object in logging format:") 
    print jobGroup1.description() 

    # Submit the job group. 
    print("\nready to submit job group") 
    response=raw_input("\nPress Enter to continue...\n") 
    execute(jobGroup1) 

    response=raw_input("\nNote the results printed above. Press Enter to continue the demonstration.\n") 

    # Demonstrate timeout. 
    print("\n # creating a new job in order to demonstrate timeout functionality...\n") 
    job3=Job(
      workFunction=workFunctionTest, 
      workFunctionKeywordArguments={'testString': "hello world"}, 
      workFunctionTimeout=1 
    ) 
    print("- printout of new job object:") 
    job3.printout() 
    print("\n- printout of new job object in logging format:") 
    print job3.description() 
    print("\nNote the timeout specification of only 1 second.") 

    # Submit the job. 
    print("\nready to submit job") 
    response=raw_input("\nPress Enter to continue...\n") 
    execute(job3) 

    response=raw_input("\nNote the recognition of timeouts printed above. This concludes the demonstration.") 
    print('-'*80) 

if __name__ == '__main__': 
    main() 

EDIT: Этот вопрос был помещен [на удержание] по следующей заявленной причине:

«Вопросы просят код должны продемонстрировать минимальное понимание решаемой проблемы. Включить попытки решения, почему они не работают и ожидаемые результаты. См. также: Stack Overflow question checklist «

Этот вопрос не требует кода; он просит мысли, общее руководство. Продемонстрировано минимальное понимание рассматриваемой проблемы (обратите внимание на правильное использование терминов «многопроцессорность», «пул» и «асинхронно» и отметьте the reference to prior code). Что касается попыток решения, я признаю, что попытки решения этих проблем были бы полезными. Я добавил такой код сейчас. Надеюсь, что я затронул поднятые проблемы, которые привели к статусу [на удержании].

ответ

14

Не видя фактического кода, я могу ответить только в общих чертах. Но есть два общих решения.

Прежде всего, вместо использования callback и игнорируя AsyncResult, храните их в какой-то коллекции. Тогда вы можете просто использовать эту коллекцию. Например, если вы хотите, чтобы иметь возможность посмотреть результаты для функции, используя эту функцию в качестве ключа, просто создать dict заклиненные с функциями:

def in_parallel(funcs): 
    results = {} 
    pool = mp.Pool() 
    for func in funcs: 
     results[func] = pool.apply_async(func) 
    pool.close() 
    pool.join() 
    return {func: result.get() for func, result in results.items()} 

В качестве альтернативы, вы можете изменить функцию обратного вызова для хранения результаты в вашей коллекции по ключевым словам. Например:

def in_parallel(funcs): 
    results = {} 
    pool = mp.Pool() 
    for func in funcs: 
     def callback(result, func=func): 
      results[func] = result 
     pool.apply_async(func, callback=callback) 
    pool.close() 
    pool.join() 
    return results 

Я использую саму функцию в качестве ключа. Но вместо этого вы хотите использовать индекс, это так же просто. Любое значение, которое у вас есть, можно использовать в качестве ключа.


Между тем, например, вы связаны действительно просто вызывает функцию на куче аргументов, ожидая их всех, чтобы закончить, и оставив результаты в некоторой итерации в произвольном порядке. Это именно то, что делает imap_unordered, но намного проще.Вы могли бы заменить всю сложную вещь из связанного кода с этим:

pool = mp.Pool() 
results = list(pool.imap_unordered(foo_pool, range(10))) 
pool.close() 
pool.join() 

И затем, если вы хотите, чтобы результаты в их первоначальном порядке, а не в произвольном порядке, вы можете просто переключиться на imap или map вместо этого. Итак:

pool = mp.Pool() 
results = pool.map(foo_pool, range(10)) 
pool.close() 
pool.join() 

Если вам нужно что-то подобное, но слишком сложное, чтобы вписаться в map парадигмы, concurrent.futures, вероятно, сделают вашу жизнь проще, чем multiprocessing. Если вы находитесь на Python 2.x, вам нужно будет установить the backport. Но тогда вы можете делать то, что гораздо сложнее сделать с AsyncResult или callback s (или map), как составление целого кучи фьючерсов в одно большое будущее. См. Примеры в связанных документах.


Последнее замечание:

важные моменты, подчеркивающие в том, что я не могу изменить существующие функции ...

Если вы не можете изменить функцию, вы всегда можете заверни это. Например, допустим, у меня есть функция, которая возвращает квадрат числа, но я пытаюсь построить арифметические числа отображения битов на свои квадраты, поэтому мне нужно также указать исходный номер как часть результата. Это просто:

def number_and_square(x): 
    return x, square(x) 

И теперь, я могу просто apply_async(number_and_square) вместо просто square, и получить результаты, я хочу.

Я не делал этого в приведенных выше примерах, потому что в первом случае я хранил ключ в коллекции от вызывающей стороны, а во-вторых, привязал его к функции обратного вызова. Но привязка его к оболочке вокруг функции так же просто, как и любая из них, и может быть подходящей, когда ни одна из них не является.

+0

Большое спасибо за ваши четкие предложения и рекомендации. Как вы предположили, я экспериментирую с обертыванием функции и трюком использования этой функции в качестве ключа в словаре результатов. ['' 'concurrent.futures'''] (http://docs.python.org/3/library/concurrent.futures.html) выглядит многообещающим, и я тоже скоро его рассмотрю. Еще раз спасибо. – d3pd

+2

Спасибо за такой хороший ответ. У меня очень похожая проблема, которая не решена. Что делать, если вам нужно установить тайм-аут для каждой задачи и хотите знать, какие входы привели к таймауту? – chrishiestand

Смежные вопросы