Итак, что я пытаюсь сделать со следующим кодом, это прочитать список списков и поместить их через функцию checker
, а затем обработать log_result
с результатом функции checker
. Я пытаюсь сделать это с помощью многопоточности, потому что имя переменной rows_to_parse
в действительности имеет миллионы строк, поэтому использование нескольких ядер должно ускорить этот процесс на значительную сумму.Многопроцессорная запись в pandas dataframe
Код в настоящий момент не работает и сбрасывает Python.
Проблемы и вопросы, у меня есть:
- Хочет существующий ФР, который проводится в переменной
df
для поддержания индекса в течение процесса, так как в противном случаеlog_result
получит спутать, какой ряд нуждается в обновлении. - Я вполне уверен, что
apply_async
не является подходящей функцией многопроцессорности для выполнения этой обязанности, поскольку я считаю, что порядок , на котором компьютер считывает и записывает df, может испортить его ??? - Я думаю, что может потребоваться настройка очереди для записи и чтения
df
, но я не уверен, как я буду это делать.
Благодарим за любую помощь.
import pandas as pd
import multiprocessing
from functools import partial
def checker(a,b,c,d,e):
match = df[(df['a'] == a) & (df['b'] == b) & (df['c'] == c) & (df['d'] == d) & (df['e'] == e)]
index_of_match = match.index.tolist()
if len(index_of_match) == 1: #one match in df
return index_of_match
elif len(index_of_match) > 1: #not likely because duplicates will be removed prior to: if "__name__" == __main__:
return [index_of_match[0]]
else: #no match, returns a result which then gets processed by the else statement in log_result. this means that [a,b,c,d,e] get written to the df
return [a,b,c,d,e]
def log_result(result, dataf):
if len(result) == 1: #
dataf.loc[result[0]]['e'] += 1
else: #append new row to exisiting df
new_row = pd.DataFrame([result],columns=cols)
dataf = dataf.append(new_row,ignore_index=True)
def apply_async_with_callback(parsing_material, dfr):
pool = multiprocessing.Pool()
for var_a, var_b, var_c, var_d, var_e in parsing_material:
pool.apply_async(checker, args = (var_a, var_b, var_c, var_d, var_e), callback = partial(log_result,dataf=dfr))
pool.close()
pool.join()
if __name__ == '__main__':
#setting up main dataframe
cols = ['a','b','c','d','e']
existing_data = [["YES","A","16052011","13031999",3],
["NO","Q","11022003","15081999",3],
["YES","A","22082010","03012001",9]]
#main dataframe
df = pd.DataFrame(existing_data,columns=cols)
#new data
rows_to_parse = [['NO', 'A', '09061997', '06122003', 5],
['YES', 'W', '17061992', '26032012', 6],
['YES', 'G', '01122006', '07082014', 2],
['YES', 'N', '06081992', '21052008', 9],
['YES', 'Y', '18051995', '24011996', 6],
['NO', 'Q', '11022003', '15081999', 3],
['NO', 'O', '20112004', '28062008', 0],
['YES', 'R', '10071994', '03091996', 8],
['NO', 'C', '09091998', '22051992', 1],
['YES', 'Q', '01051995', '02012000', 3],
['YES', 'Q', '26022015', '26092007', 5],
['NO', 'F', '15072002', '17062001', 8],
['YES', 'I', '24092006', '03112003', 2],
['YES', 'A', '22082010', '03012001', 9],
['YES', 'I', '15072016', '30092005', 7],
['YES', 'Y', '08111999', '02022006', 3],
['NO', 'V', '04012016', '10061996', 1],
['NO', 'I', '21012003', '11022001', 6],
['NO', 'P', '06041992', '30111993', 6],
['NO', 'W', '30081992', '02012016', 6]]
apply_async_with_callback(rows_to_parse, df)
Что еще: #no match, дать ему аргументы для записи в df, который должен выполняться? Я думаю, что если вы вернете [a, b, c, d, e], ваш код на самом деле завершится, но у вас будут другие проблемы, вы также никогда не будете использовать dataf где-нибудь –
спасибо, что указали это, я внесла поправки в код. поэтому '[a, b, c, d, e]' записывается в df в функции 'log_result'. – user3374113
'partial (log_result, dataf = dfr)' не соответствует сигнатуре 'log_results' – mdurant