2014-11-06 4 views
12

Я пытаюсь использовать многопроцессорную обработку с помощью фрейма данных pandas, который разделяет dataframe на 8 частей. примените некоторую функцию к каждой части, используя apply (каждая часть обрабатывается в другом процессе).pandas multiprocessing apply

EDIT: Вот решение, которое я наконец-то нашел:

import multiprocessing as mp 
import pandas.util.testing as pdt 

def process_apply(x): 
    # do some stuff to data here 

def process(df): 
    res = df.apply(process_apply, axis=1) 
    return res 

if __name__ == '__main__': 
    p = mp.Pool(processes=8) 
    split_dfs = np.array_split(big_df,8) 
    pool_results = p.map(aoi_proc, split_dfs) 
    p.close() 
    p.join() 

    # merging parts processed by different processes 
    parts = pd.concat(pool_results, axis=0) 

    # merging newly calculated parts to big_df 
    big_df = pd.concat([big_df, parts], axis=1) 

    # checking if the dfs were merged correctly 
    pdt.assert_series_equal(parts['id'], big_df['id']) 
+0

есть пробел в 'res = df.apply (process apply, axis = 1)', это правильно? –

+1

@yemu, чего вы точно пытаетесь достичь этим кодом? – Dalek

+0

в настоящее время применяется только для насыщения одного ядра процессора. Я хочу использовать многопроцессор и использовать все ядра для уменьшения времени обработки – yemu

ответ

3

Поскольку у меня нет много вашего сценария данных, это предположение, но я предлагаю использовать p.map вместо apply_async с Перезвони.

p = mp.Pool(8) 
pool_results = p.map(process, np.array_split(big_df,8)) 
p.close() 
p.join() 
results = [] 
for result in pool_results: 
    results.extend(result) 
+0

@yemu выполнил эту работу для вас? –

+0

Мне пришлось поместить вызов внутрь, если __name__ == '__main__'. и с другими небольшими изменениями мне удалось заставить его работать, однако я не уверен, вернутся ли результаты данных в результаты пула в том же порядке, в каком они были разделены. Я должен проверить это. – yemu

+0

см. Здесь для решения с 'dask' https://stackoverflow.com/questions/37979167/how-to-parallelize-many-fuzzy-string-comparisons-using-apply-in-pandas –

0

Я также столкнулся с той же проблемой, когда я использую multiprocessing.map() применить функцию к различному куску большого dataframe.

Я просто хочу добавить несколько пунктов на всякий случай, если другие люди столкнутся с той же проблемой, что и я.

  1. забудьте добавить if __name__ == '__main__':
  2. выполнить файл в .py файл, если вы используете ipython/jupyter notebook, то вы не можете запустить multiprocessing (это верно для моего случая, хотя я понятия не имею,)