2016-12-15 1 views
7

В ореховой скорлупеизменяющего различные объекты Python в параллельных процессах, соответственно

Я хочу изменить сложные объекты питона одновременно, причем каждый объект обрабатывается только один процесс. Как я могу это сделать (наиболее эффективно)? Будет ли какая-то помощь в помощи по травлению? Будет ли это эффективно?

Полная Проблема

У меня есть структура ArrayDict питон данных, которая в основном состоит из numpy массива и словарь и отображает произвольные индексы для строк в массиве. В моем случае все ключи являются целыми числами.

a = ArrayDict() 

a[1234] = 12.5 
a[10] = 3 

print(a[1234])        #12.5 
print(a[10])         # 3.0 

print(a[1234] == a.array[a.indexDict[1234]]) #true 

Теперь у меня есть несколько таких ArrayDict с и хочу, чтобы заполнить их в myMethod(arrayDict, params). Поскольку myMethod стоит дорого, я хочу запустить его параллельно. Обратите внимание: myMethod может добавить много строк до arrayDict. Каждый процесс изменяет свой собственный ArrayDict. Мне не нужен одновременный доступ к ArrayDict.

В myMethod, меняет записи в arrayDict (то есть изменить внутренний numpy массив), добавить записи к arrayDict (то есть, я добавить еще один индекс к словарю и записать новое значение во внутреннем массив). В конце концов, я хочу иметь возможность обмениваться arrayDict внутренним массивом numpy, когда он становится слишком маленьким. Это случается нечасто, и я могу выполнить это действие в непараллельной части моей программы, если не существует лучшего решения. Мои собственные попытки не были успешными даже без обмена массивами.

Я провел дни, исследуя общую память и модуль multiprocessing python. Поскольку я, наконец, буду работать над linux, задача была довольно простой: системный вызов fork() позволяет эффективно работать с копиями аргументов. Моя мысль заключалась в том, чтобы изменить каждый ArrayDict в своем собственном процессе, вернуть измененную версию объекта и перезаписать исходный объект. Чтобы сохранить память и сохранить работу для копирования, я дополнительно использовал sharedmem массивы для хранения данных в ArrayDict. Я знаю, что словарь все равно должен быть скопирован.

from sharedmem import sharedmem 
import numpy as np 

n = ...     # length of the data array 
myData = np.empty(n, dtype=object) 
myData[:] = [ArrayDict() for _ in range(n)] 
done = False 

while not done: 
    consideredData = ... # numpy boolean array of length 
          # n with True at the index of 
          # considered data 
    args = ...   # numpy array containing arguments 
          # for myMethod 

    with sharedmem.MapReduce() as pool: 
     results = pool.map(myMethod, 
          list(zip(myData[considered], 
            args[considered])), 
          star=True) 
     myData[considered] = results 

    done = ...    # depends on what happens in 
          # myMethod 

То, что я получаю, является ошибкой ошибки сегментации. Я смог обойти эту ошибку, создав глубокие копии ArrayDict s до myMethod и сохранив их в myData. Я действительно не понимаю, почему это необходимо, и часто копируя мои (потенциально очень большие) массивы (цикл while занимает много времени) не так эффективен для меня. Однако, по крайней мере, это работало в определенной степени. Тем не менее, моя программа имеет некоторые ошибки на 3-й итерации из-за разделяемой памяти. Поэтому я считаю, что мой путь не оптимален.

Я прочитал here и here, что можно сохранить aribtrary numpy массивы в общей памяти с помощью multiprocessing.Array. Тем не менее, мне все равно придется делиться всем ArrayDict, который включает, в частности, словарь, который, в свою очередь, не подбирается.

Как я мог эффективно достичь своих целей? Было бы возможно (и эффективно) сделать мой объект подбираемым каким-то образом?

Все решения должны работать с поддержкой python 3 и полной поддержкой numpy/scipy на 64-битной Linux.

Редактировать

Я нашел here, что это как-то можно разделить произвольные объекты с помощью Multiprocessing «Менеджер» классы и пользовательские прокси-классы. Будет ли это эффективно? Я хотел бы использовать, что мне не нужен одновременный доступ к объектам, хотя они не обрабатываются в основном процессе. Будет ли возможность создать менеджера для каждого объекта, который я хочу обработать? (Возможно, у меня все еще есть некоторые заблуждения относительно того, как работают манипуляторы.)

+0

Как вы изменяете или используете arrayDict в myMethod? (Я предполагаю, что вы имеете в виду 'myMethod' не' myFunc'?) – gauteh

+0

@gauteh: Спасибо, что предупредил опечатки. Я исправил это. Я также добавил описание того, как я изменяю arrayDict в myMethod. – Samufi

+0

Важно ли, чтобы ArrayDict мог брать произвольные ключевые типы? В противном случае класс может быть реструктурирован для использования типов, которые могут быть легко доступны для разных процессов без использования менеджера. Как и сейчас, использование менеджера кажется лучшим выбором, так как проблема несколько сложная. Потери производительности могут быть незначительными. – gauteh

ответ

3

Это похоже на довольно сложный класс, и я не могу полностью ожидать, будет ли это решение работать в вашем случае. Простым компромиссом для такого сложного класса является использование ProcessPoolExecutor.

Если это не ответит на ваш вопрос, тогда это будет хорошо с минимальным рабочим примером.

from concurrent.futures import ProcessPoolExecutor 

import numpy as np 

class ArrayDict(): 
    keys = None 
    vals = None 

    def __init__ (self): 
    self.keys = dict() 
    self.vals = np.random.rand (1000) 

    def __str__ (self): 
    return "keys: " + str(self.keys) + ", vals: " + str(self.vals.mean()) 

def myMethod (ad, args): 
    print ("starting:", ad) 


if __name__ == '__main__': 
    l  = [ArrayDict() for _ in range (5)] 
    args = [2, 3, 4, 1, 3] 

    with ProcessPoolExecutor (max_workers = 2) as ex: 

    d = ex.map (myMethod, l, args) 

Объекты cloned при отправке дочернего процесса, вам необходимо вернуть результат (как изменения к объекту не будет распространяться обратно в основной процесс) и обрабатывать, как вы хотите хранить их.

Обратите внимание, что изменения в переменных класса будут распространяться на другие объекты в в том же процессе, например. если у вас больше задач, чем процессов, изменения в классах будут разделяться между экземплярами, запущенными в том же процессе. Это обычно нежелательное поведение.

Это высокоуровневый интерфейс для распараллеливания. ProcessPoolExecutor использует модуль multiprocessing и может использоваться только с pickable objects. Я подозреваю, что ProcessPoolExecutor имеет производительность, аналогичную "sharing state between processes". Под капотом ProcessPoolExecutoris using multiprocessing.Process, и должен иметь аналогичную производительность, как Pool (за исключением случаев использования very long iterables с картой). ProcessPoolExecutor, похоже, предназначен для будущего API для параллельных задач в python.

Если вы можете, то, как правило, быстрее использовать ThreadPoolExecutor (который может быть заменен только на ProcessPoolExecutor). В этом случае объект равен, который совместно используется между процессами, а обновление до одного будет распространяться обратно в основной поток.

Как упоминалось, самым быстрым вариантом, вероятно, является переструктурирование ArrayDict, так что оно использует объекты, которые могут быть представлены multiprocessing.Value или Array.

Если ProcessPoolExecutor не работает, и вы не можете оптимизировать ArrayDict, возможно, вы застряли с помощью Manager. Есть хорошие примеры того, как это сделать here.

Наибольшее усиление производительности часто встречается в myMethod. И, как я уже упоминал, накладные расходы на использование потоков is less, чем на процессы.

+0

Спасибо за ответ! Это хорошо работает и позволяет мне обменять внутренний массив ArrayDict. Не могли бы вы добавить некоторые комментарии, что происходит под капотом? В чем разница с multiprocessing.pool? Скопированы ли переменные? Я заметил, что целочисленные свойства ArrayDict не изменялись одновременно, но записи dict были. Как я могу понять и предсказать это поведение? Кроме того, я увидел, что ваше решение работает в простых случаях под Windows и Linux, но с полным ArrayDict только в Linux. Зачем? Являются ли накладные расходы, созданные в инструкции «с» или в методе «карта»? – Samufi

+0

Операция 'with' не создает дополнительных накладных расходов. Что вы имеете в виду не изменились одновременно? Это было бы полезно с примером здесь. Это может быть связано с неизменяемыми и изменяемыми типами. Ответ был несколько изменен на производительность. – gauteh

+1

Переменные копируются (но память разделяется до тех пор, пока она не будет изменена). Вы должны вернуть объект из рабочего процесса и заменить исходный объект. В противном случае используйте диспетчер. – gauteh

Смежные вопросы