1

это моя первая многопроцессорная реализация, я выполнил мой код в последовательном подходе, и мне потребовалась минута, чтобы обработать 30 секунд для обработки 20 записей. Но я создал словарь с каждым ключом, имеющим набор записей, и попытался применить эту функцию, используя pool.map для каждого ключа. Теперь для обработки требуется более 2 минут, хотя я рекомендую каждое ядро ​​для каждого процесса. Может ли кто-нибудь помочь мне оптимизировать это.python - многопроцессорность медленнее, чем последовательная

def f(values): 
    data1 = itertools.combinations(values,2) 
    tuple_attr =('Age', 'Workclass', 'Fnlwgt', 'Education', 'Education-num', 'marital-status', 'Occupation', 'Relationship', 'Race', 'Sex', 'Capital-gain', 'Capital-loss', 'Hours-per-week', 'Native country', 'Probability', 'Id') 
    new = ((tuple_attr[i] for i, t in enumerate(zip(*pair)) if t[0]!=t[1]) for pair in data1) 
    skt = set(frozenset(temp) for temp in new) 
    newset = set(s for s in skt if not any(p < s for p in skt)) 

    empty = frozenset(" ") 
    tr_x = set(frozenset(i) for i in empty) 
    tr = set(frozenset(i) for i in empty) 
    for e in newset: 
     tr.clear() 
     tr = tr.union(tr_x) 
     tr_x.clear() 
     for x in tr: 
      for a in e: 
       if x == empty: 
        tmp = frozenset(frozenset([a])) 
        tr_x = tr_x.union([tmp]) 
       else : 
        tmp = frozenset(frozenset([a]).union(x)) 
        tr_x = tr_x.union([tmp]) 
     tr.clear() 
     tr = tr.union(tr_x) 
     tr = set(l for l in tr if not any(m < l for m in tr)) 

    return tr 

def main(): 
    p = Pool(len(data)) #number of processes = number of CPUs 
    keys, values= zip(*data.items()) #ordered keys and values 
    processed_values= p.map(f, values) 
    result= dict(zip(keys, processed_values)) 
    p.close() # no more tasks 
    p.join() # wrap up current tasks 
    print(result) 


if __name__ == '__main__': 
    import csv 
    dicchunk = {*****} #my dictionary 
    main() 
+0

Попробуйте использовать более крупный набор данных? Это займет довольно много работы, прежде чем несколько потоков/процессов станут стоить накладных расходов на переключение контекста, форсирование и т. Д. –

+0

Это один грязный код, вам действительно нужно работать с именами переменных. И что @Corey сказал, когда я печатал свой комментарий - большой набор данных. Но не слишком много, так как это будет довольно дорого в памяти. –

+1

Можете ли вы предоставить образец данных? – dano

ответ

1

Я создал тестовую программу, чтобы запустить этот раз с multiprocessing, и один раз без:

def main(data): 
    p = Pool(len(data)) #number of processes = number of CPUs 
    keys, values= zip(*data.items()) #ordered keys and values 
    start = time.time() 
    processed_values= p.map(f, values) 
    result= dict(zip(keys, processed_values)) 
    print("multi: {}".format(time.time() - start)) 
    p.close() # no more tasks 
    p.join() # wrap up current tasks 

    start = time.time() 
    processed_values = map(f, values) 
    result2 = dict(zip(keys, processed_values)) 
    print("non-multi: {}".format(time.time() - start)) 
    assert(result == result2) 

Вот результат:

multi: 191.249588966 
non-multi: 225.774535179 

multiprocessing быстрее, но не так много как и следовало ожидать. Причиной этого является то, что некоторые из подписок принимают много (несколько минут) дольше, чем другие. Вы никогда не будете быстрее, чем когда бы то ни было, чтобы обработать самый большой список.

Я добавил некоторую трассировку к функции рабочего, чтобы продемонстрировать это. Я сэкономил время в начале рабочего, и распечатал его в конце. Вот результат:

<Process(PoolWorker-4, started daemon)> is done. Took 0.940237998962 seconds 
<Process(PoolWorker-2, started daemon)> is done. Took 1.28068685532 seconds 
<Process(PoolWorker-1, started daemon)> is done. Took 42.9250118732 seconds 
<Process(PoolWorker-3, started daemon)> is done. Took 193.635578156 seconds 

Как вы можете видеть, рабочие делают очень неравные объемы работы, так что вы только экономить около 44 секунд против быть последовательным.

+0

Отлично. Я понял, что я вижу на своем компьютере. Есть ли способ уравновесить это? Есть ли способ сбалансировать его таким образом, что рабочий, который быстрее завершает работу, выполняет следующую доступную задачу? И любая другая возможность оптимизировать мой код, потому что я не знаю, почему занимает много времени только для определенных подписок. Это всего лишь 20 записей, я собираюсь обрабатывать записи 50 тыс. Далее .. :( –

+0

Если вы закончите передачу более чем 'cpu_count()' элементов в 'map', тогда работники будут захватывать следующий элемент из итерабельного, пока есть некоторые проблемы. Попытка сбалансировать проблему с неравными под-списками, вероятно, будет сложной. Вам нужно найти способ разделить работу, которую 'f' делает, чтобы вы могли распараллелить ее части, затем объедините их вместе. Я действительно не понимаю, что вы делаете в этом методе достаточно хорошо, чтобы сказать вам лучший способ сделать это (или, если это возможно). – dano

+0

Хорошо.Но я не могу разделить работу, которую я делаю внутри функции, она должна выполняться последовательно. В любом случае, спасибо –

Смежные вопросы