2017-02-05 4 views
2

Я использую библиотеку DEAP в python для многоцелевой оптимизации. Я хотел бы использовать несколько процессоров для этой задачи; однако, у меня проблемы.Программирование DEAP со Scoop

Чтобы дать некоторый контекст, я использую networkx в сочетании с DEAP. Я также определяю функции функции фитнеса, кроссовера и мутации (которые я не буду показывать здесь по определенным причинам).

Это говорит here, что все, что мне нужно сделать, это установить Scoop и добавить строки

from scoop import futures 

toolbox.register("map", futures.map) 

Однако я, кажется, получаю сообщение об ошибке:

scoop._comm.scoopexceptions.ReferenceBroken: 'module' object has no attribute 'Chromosome' 

После выполнения некоторого рытья, я нашел что мне нужно переместить вызовы creator.create в основном модуле, как указано here.

После этого, я получаю другую ошибку:

scoop._comm.scoopexceptions.ReferenceBroken: This element could not be pickled: FutureId(worker='127.0.0.1:49663', rank=1):partial(<Chromosome representation of a solution here>)=None 

Я не совсем знаком с параллельными вычислениями, и я не совсем уверен, что это означает, что с помощью «не может быть маринованные». Полный код можно увидеть здесь с некоторыми изменениями:

def genetic(network, creator, no_sensors, sfpd, lambda1, lambda2, lambda3, k): 
    locations = network.graph.nodes() 
    #move creator.create calls to the main module 
    ######################################## 
    creator.create("FitnessMax", base.Fitness, weights=(lambda1, -lambda2, lambda3)) 
    creator.create("Chromosome", list, fitness=creator.FitnessMax) 
    ######################################## 

    toolbox = base.Toolbox() 
    toolbox.register("attr_item", random.sample, locations, no_sensors) 
    toolbox.register("chromosome", tools.initRepeat, creator.Chromosome, toolbox.attr_item, n=1) 
    toolbox.register("population", tools.initRepeat, list, toolbox.chromosome) 

    toolbox.register("map", futures.map) #######<-- this line ############## 

    def evaluate(chromosome): 
     #fitness function defined here 

    # Crossover 
    def crossover(chromosome1, chromosome2): # Uniform Crossover 
     #crossover is defined here 

    # Mutation 
    def mutation(chromosome): 
     #mutation is defined here 

    toolbox.register("evaluate", evaluate) 
    toolbox.register("mate", crossover) 
    toolbox.register("mutate", mutation) 
    toolbox.register("select", tools.selNSGA2) 

    random.seed(64) 
    pop = toolbox.population(n=MU) 
    hof = tools.ParetoFront() 
    stats = tools.Statistics(lambda ind: ind.fitness.values) 
    stats.register("avg", numpy.mean, axis=0) 
    stats.register("min", numpy.min, axis=0) 
    stats.register("max", numpy.max, axis=0) 

    algorithms.eaMuPlusLambda(pop, toolbox, MU, LAMBDA, CXPB, MUTPB, NGEN, stats, halloffame=hof) 

    return list(hof) 

Спасибо, и любое понимание будет очень ценным.

+0

Не могли бы вы предоставить рабочий минимальный пример с использованием встроенной функции карты (без использования совок)? – Ohjeah

+0

Я использую алгоритм eaMuPlusLambda, который использует встроенную функцию карты. Есть ли способ распараллеливать это? – meraxes

+0

Без минимального примера вы не можете рассчитывать на большую помощь. Альтернативой scoop являются dask.distributed, ipyparallel или multiprocessing (на укропе). Все они приходят с плюсами и минусами. – Ohjeah

ответ

1

Вот обходной путь с использованием joblib и укропа.

Первое: monkeypatch joblib, чтобы сделать его мариновать, используя укроп

import dill 
from dill import Pickler 
import joblib 
joblib.parallel.pickle = dill 
joblib.pool.dumps = dill.dumps 
joblib.pool.Pickler = Pickler 

from joblib.pool import CustomizablePicklingQueue 
from io import BytesIO 
from pickle import HIGHEST_PROTOCOL 


class CustomizablePickler(Pickler): 
    """Pickler that accepts custom reducers. 
    HIGHEST_PROTOCOL is selected by default as this pickler is used 
    to pickle ephemeral datastructures for interprocess communication 
    hence no backward compatibility is required. 
    `reducers` is expected expected to be a dictionary with key/values 
    being `(type, callable)` pairs where `callable` is a function that 
    give an instance of `type` will return a tuple `(constructor, 
    tuple_of_objects)` to rebuild an instance out of the pickled 
    `tuple_of_objects` as would return a `__reduce__` method. See the 
    standard library documentation on pickling for more details. 
    """ 

    # We override the pure Python pickler as its the only way to be able to 
    # customize the dispatch table without side effects in Python 2.6 
    # to 3.2. For Python 3.3+ leverage the new dispatch_table 
    # feature from http://bugs.python.org/issue14166 that makes it possible 
    # to use the C implementation of the Pickler which is faster. 

    def __init__(self, writer, reducers=None, protocol=HIGHEST_PROTOCOL): 
     Pickler.__init__(self, writer, protocol=protocol) 
     if reducers is None: 
      reducers = {} 
     # Make the dispatch registry an instance level attribute instead of 
     # a reference to the class dictionary under Python 2 
     self.dispatch = Pickler.dispatch.copy() 
     for type, reduce_func in reducers.items(): 
      self.register(type, reduce_func) 

    def register(self, type, reduce_func): 
     if hasattr(Pickler, 'dispatch'): 
      # Python 2 pickler dispatching is not explicitly customizable. 
      # Let us use a closure to workaround this limitation. 
      def dispatcher(self, obj): 
       reduced = reduce_func(obj) 
       self.save_reduce(obj=obj, *reduced) 
      self.dispatch[type] = dispatcher 
     else: 
      self.dispatch_table[type] = reduce_func 

joblib.pool.CustomizablePickler = CustomizablePickler 


def _make_methods(self): 
    self._recv = recv = self._reader.recv 
    racquire, rrelease = self._rlock.acquire, self._rlock.release 

    def get(): 
     racquire() 
     try: 
      return recv() 
     finally: 
      rrelease() 

    self.get = get 

    def send(obj): 
     buffer = BytesIO() 
     CustomizablePickler(buffer, self._reducers).dump(obj) 
     self._writer.send_bytes(buffer.getvalue()) 

    self._send = send 

    if self._wlock is None: 
     # writes to a message oriented win32 pipe are atomic 
     self.put = send 
    else: 
     wlock_acquire, wlock_release = (
      self._wlock.acquire, self._wlock.release) 

     def put(obj): 
      wlock_acquire() 
      try: 
       return send(obj) 
      finally: 
       wlock_release() 

     self.put = put 

CustomizablePicklingQueue._make_methods = _make_methods 

Второе:

from joblib import Parallel, delayed 

def mymap(f, *iters): 
    return Parallel(n_jobs=-1)(delayed(f)(*args) for args in zip(*iters)) 

И, наконец, просто зарегистрировать карту:

toolbox.register("map", mymap) 

Он прекрасно работает с примером вы связаны. Вы можете указать integrate dask и joblib, чтобы масштабировать это решение до кластера. Используйте dask-drmaa, и у вас почти такая же функциональность, что и у совок.

Пример кода можно найти here.

+0

Он работает! Спасибо! – meraxes

+0

Не могли бы вы рассказать мне немного о том, что здесь происходит? Я не слишком разбираюсь в параллельных вычислениях; все, что я знаю, это то, что это помогает ускорить процессы, запуская их на нескольких ядрах. Сколько процессов выполняется параллельно и как я могу это контролировать? – meraxes

+0

Итак, первый блок кода просто сообщает joblib использовать укроп вместо рассола для выполнения сериализации/десериализации объектов python.Если вы хотите узнать больше о joblib, вы должны посмотреть их [документацию] (https://pythonhosted.org/joblib/). Количество процессов/потоков контролируется параметром n_jobs. -1 означает использование всех доступных ядер. Вы увидите этот параметр в таких вещах, как sklearn. – Ohjeah

Смежные вопросы