2013-07-25 4 views
0

У меня есть 8 ядер процессора и 200 задач. Каждая задача изолирована. Нет необходимости ждать или делиться результатами. Я ищу способ запуска 8 задач/процессов за раз (Максимум) и когда один из них закончен. Остальная задача будет автоматически запускаться.задача планирования многопроцессорности python

Как узнать, когда был выполнен дочерний процесс, и начать новый дочерний процесс. Сначала я пытаюсь использовать процесс (многопроцессорность), и его сложно понять. Затем я пытаюсь использовать пул и лицо с проблемой рассола, потому что мне нужно использовать динамический экземпляр.

Edited: Добавление мой код Бассейн

class Collectorparallel(): 

def fire(self,obj): 
    collectorController = Collectorcontroller() 
    collectorController.crawlTask(obj) 

def start(self): 
    log_to_stderr(logging.DEBUG) 
    pluginObjectList = [] 
    for pluginName in self.settingModel.getAllCollectorName(): 
     name = pluginName.capitalize() 
     #Get plugin class and instanitiate object 
     module = __import__('plugins.'+pluginName,fromlist=[name]) 
     pluginClass = getattr(module,name) 
     pluginObject = pluginClass() 
     pluginObjectList.append(pluginObject) 



    pool = Pool(8) 
    jobs = pool.map(self.fire,pluginObjectList) 
    pool.close() 

    print pluginObjectList 

pluginObjectList получил что-то вроде

[<plugins.name1.Name1 instance at 0x1f54290>, <plugins.name2.Name2 instance at 0x1f54f38>] 

PicklingError: Может не рассол: атрибут поиска .instancemethod встроенный не удалось

но Версия процесса работает нормально

+0

Решение заключается в использовании 'multiprocessing.Pool'. Как решить проблему окна травления * полностью * зависит от кода. ** Оставьте какой-нибудь код! ** Если вы можете написать * маленький * и * автономный * пример, который мы можем использовать для тестирования и который четко показывает ваш прецедент. – Bakuriu

+0

@Bakuriu Я добавил свой код – Runicer

ответ

0

Предупреждение Это субъективно для развертывания и ситуации, но моя текущая настройка выглядит следующим образом:

У меня есть рабочая программа, я запускаю 6 копий (у меня 6 ядер). Каждый работник делает следующее;

  1. Подключение к экземпляру Redis
  2. Try и поп некоторые работы определенного списка
  3. отодвигает информацию журнала
  4. либо бездельничает или оканчивающийся на отсутствие работы в «очереди»

Затем каждая программа по существу автономна, но при этом выполняет требуемую работу с отдельной системой очередей. Поскольку у вас нет взаимодействия между процессами, это может быть решением вашей проблемы.

0

Решение вашей проблемы тривиально. Прежде всего, обратите внимание, что методы не могут быть маринованными. На самом деле только тип, перечисленные в pickle's documentation можно маринованные:

  • None , True , and False
  • integers, long integers, floating point numbers, complex numbers
  • normal and Unicode strings
  • tuple s, list s, set s, and dict ionaries containing only picklable objects
  • functions defined at the top level of a module
  • built-in functions defined at the top level of a module
  • classes that are defined at the top level of a module
  • instances of such classes whose __dict__ or the result of calling __getstate__() is picklable (see section The pickle protocol for details).

[...]

Note that functions (built-in and user-defined) are pickled by “fully qualified” name reference, not by value. This means that only the function name is pickled, along with the name of the module the function is defined in. Neither the function’s code, nor any of its function attributes are pickled. Thus the defining module must be importable in the unpickling environment, and the module must contain the named object, otherwise an exception will be raised. [4]

Similarly, classes are pickled by named reference, so the same restrictions in the unpickling environment apply. Note that none of the class’s code or data is pickled[...]

Очевидно, что метод не является функцией, определенной на верхнем уровне модуля, следовательно, он не может быть маринованным (внимательно прочитать, что часть документации. чтобы в будущем избежать проблем с рассолом) Но это совершенно тривиальное заменить метод с глобальной функцией и передачами self в качестве дополнительного параметра:

import itertools as it 


def global_fire(argument): 
    self, obj = argument 
    self.fire(obj) 


class Collectorparallel(): 

    def fire(self,obj): 
     collectorController = Collectorcontroller() 
     collectorController.crawlTask(obj) 

    def start(self): 
     log_to_stderr(logging.DEBUG) 
     pluginObjectList = [] 
     for pluginName in self.settingModel.getAllCollectorName(): 
      name = pluginName.capitalize() 
      #Get plugin class and instanitiate object 
      module = __import__('plugins.'+pluginName,fromlist=[name]) 
      pluginClass = getattr(module,name) 
      pluginObject = pluginClass() 
      pluginObjectList.append(pluginObject) 



     pool = Pool(8) 
     jobs = pool.map(global_fire, zip(it.repeat(self), pluginObjectList)) 
     pool.close() 

     print pluginObjectList 

Обратите внимания, что, поскольку Pool.map вызывает данную функцию только один аргумент, мы должны «собрать вместе» и self, и фактический аргумент. Для этого у меня есть zip ped it.repeat(self) и оригинальный итерируемый.

Если вам не нужен порядок, в котором звонки завершены, то с помощью pool.imap_unorderedможет обеспечить лучшие показатели. Однако он возвращает итерируемый, а не список, поэтому, если вам нужен список результатов, вам нужно будет сделать jobs = list(pool.imap_unordered(...)).

0

Я считаю, что этот код устранит все проблемы с травлением.

class Collectorparallel(): 

def __call__(self,cNames): 
    for pluginName in cNames: 
     name = pluginName.capitalize() 
     #Get plugin class and instanitiate object 
     module = __import__('plugins.'+pluginName,fromlist=[name]) 
     pluginClass = getattr(module,name) 
     pluginObject = pluginClass() 
     pluginObjectList.append(pluginObject) 

    collectorController = Collectorcontroller() 
    collectorController.crawlTask(obj) 

def start(self): 
    log_to_stderr(logging.DEBUG) 
    pool = Pool(8) 
    jobs = pool.map(self,self.settingModel.getAllCollectorName()) 
    pool.close() 

Что здесь произошло то, что Collectorparallel был превращен в вызываемым. Список имен плагинов используется как итеративный для пула, фактическое определение плагинов и их создание выполняется в каждом из рабочих процессов, а объект экземпляра класса используется как вызываемый для каждого рабочего процесса.

Смежные вопросы