2015-11-05 5 views
5

Итак, что я пытаюсь сделать со следующим кодом, это прочитать список списков и поместить их через функцию checker, а затем обработать log_result с результатом функции checker. Я пытаюсь сделать это с помощью многопоточности, потому что имя переменной rows_to_parse в действительности имеет миллионы строк, поэтому использование нескольких ядер должно ускорить этот процесс на значительную сумму.Многопроцессорная запись в pandas dataframe

Код в настоящий момент не работает и сбрасывает Python.

Проблемы и вопросы, у меня есть:

  1. Хочет существующий ФР, который проводится в переменной df для поддержания индекса в течение процесса, так как в противном случае log_result получит спутать, какой ряд нуждается в обновлении.
  2. Я вполне уверен, что apply_async не является подходящей функцией многопроцессорности для выполнения этой обязанности, поскольку я считаю, что порядок , на котором компьютер считывает и записывает df, может испортить его ???
  3. Я думаю, что может потребоваться настройка очереди для записи и чтения df , но я не уверен, как я буду это делать.

Благодарим за любую помощь.

import pandas as pd 
import multiprocessing 
from functools import partial 

def checker(a,b,c,d,e): 
    match = df[(df['a'] == a) & (df['b'] == b) & (df['c'] == c) & (df['d'] == d) & (df['e'] == e)] 
    index_of_match = match.index.tolist() 
    if len(index_of_match) == 1: #one match in df 
     return index_of_match 
    elif len(index_of_match) > 1: #not likely because duplicates will be removed prior to: if "__name__" == __main__: 
     return [index_of_match[0]] 
    else: #no match, returns a result which then gets processed by the else statement in log_result. this means that [a,b,c,d,e] get written to the df 
     return [a,b,c,d,e] 



def log_result(result, dataf): 
    if len(result) == 1: # 
     dataf.loc[result[0]]['e'] += 1 
    else: #append new row to exisiting df 
     new_row = pd.DataFrame([result],columns=cols) 
     dataf = dataf.append(new_row,ignore_index=True) 


def apply_async_with_callback(parsing_material, dfr): 
    pool = multiprocessing.Pool() 
    for var_a, var_b, var_c, var_d, var_e in parsing_material: 
     pool.apply_async(checker, args = (var_a, var_b, var_c, var_d, var_e), callback = partial(log_result,dataf=dfr)) 
    pool.close() 
    pool.join() 



if __name__ == '__main__': 
    #setting up main dataframe 
    cols = ['a','b','c','d','e'] 
    existing_data = [["YES","A","16052011","13031999",3], 
        ["NO","Q","11022003","15081999",3], 
        ["YES","A","22082010","03012001",9]] 

    #main dataframe 
    df = pd.DataFrame(existing_data,columns=cols) 

    #new data 
    rows_to_parse = [['NO', 'A', '09061997', '06122003', 5], 
        ['YES', 'W', '17061992', '26032012', 6], 
        ['YES', 'G', '01122006', '07082014', 2], 
        ['YES', 'N', '06081992', '21052008', 9], 
        ['YES', 'Y', '18051995', '24011996', 6], 
        ['NO', 'Q', '11022003', '15081999', 3], 
        ['NO', 'O', '20112004', '28062008', 0], 
        ['YES', 'R', '10071994', '03091996', 8], 
        ['NO', 'C', '09091998', '22051992', 1], 
        ['YES', 'Q', '01051995', '02012000', 3], 
        ['YES', 'Q', '26022015', '26092007', 5], 
        ['NO', 'F', '15072002', '17062001', 8], 
        ['YES', 'I', '24092006', '03112003', 2], 
        ['YES', 'A', '22082010', '03012001', 9], 
        ['YES', 'I', '15072016', '30092005', 7], 
        ['YES', 'Y', '08111999', '02022006', 3], 
        ['NO', 'V', '04012016', '10061996', 1], 
        ['NO', 'I', '21012003', '11022001', 6], 
        ['NO', 'P', '06041992', '30111993', 6], 
        ['NO', 'W', '30081992', '02012016', 6]] 


    apply_async_with_callback(rows_to_parse, df) 
+0

Что еще: #no match, дать ему аргументы для записи в df, который должен выполняться? Я думаю, что если вы вернете [a, b, c, d, e], ваш код на самом деле завершится, но у вас будут другие проблемы, вы также никогда не будете использовать dataf где-нибудь –

+0

спасибо, что указали это, я внесла поправки в код. поэтому '[a, b, c, d, e]' записывается в df в функции 'log_result'. – user3374113

+0

'partial (log_result, dataf = dfr)' не соответствует сигнатуре 'log_results' – mdurant

ответ

8

Обновление DataFrames, как это в MultiProcessing не будет работать:

dataf = dataf.append(new_row,ignore_index=True) 

С одной стороны это очень неэффективно (O (п) для каждого добавления так O (N^2) в Всего предпочтительным способом является объединение нескольких объектов за один проход.

Для другой, и что более важно, dataf не блокирует каждое обновление, поэтому нет никакой гарантии, что две операции не будут конфликтовать (я предполагаю, это сбой python).

Наконец, append не действует на месте, поэтому переменная dataf отбрасывается после завершения обратного вызова! и никакие изменения не будут внесены родителям dataf.


Мы могли бы использовать MultiProcessing list или dict. если вы не заботитесь о заказе или dict, если вы это сделаете (например, перечисление), так как вы должны заметить, что значения возвращаются не в четко определенном порядке от async.
(или мы можем создать объект, который реализует замыкаться см Eli Bendersky.)
Так следующие изменения:

df = pd.DataFrame(existing_data,columns=cols) 
# becomes 
df = pd.DataFrame(existing_data,columns=cols) 
d = MultiProcessing.list([df]) 

dataf = dataf.append(new_row,ignore_index=True) 
# becomes 
d.append(new_row) 

Теперь, когда асинхронная Закончился у вас есть MultiProcessing.list из DataFrames. Вы можете Concat эти (и ignore_index), чтобы получить желаемый результат:

pd.concat(d, ignore_index=True) 

Если сделать трюк.


Примечание: создание NewRow DataFrame на каждом этапе также менее эффективным, что выпускающие панды анализировать список списков непосредственно к DataFrame в один присест. Надеюсь, это пример игрушек, действительно, вы хотите, чтобы ваши куски были достаточно большими, чтобы получить выигрыши с MultiProcessing (я слышал, что 50kb как правило большого пальца ...), строка за раз никогда не будет победить здесь.


стороны: Вы должны избегать использования глобал (например, DF) в вашем коде, это гораздо чище, чтобы передать их вокруг в своих функциях (в данном случае, в качестве аргумента Checker).

Смежные вопросы