2014-09-06 2 views
20

У меня есть большой кадр данных (несколько миллионов строк).Как правильно перебирать последовательные куски данных DataFrame

Я хочу иметь возможность выполнять операцию groupby на нем, а просто группировать произвольные последовательные (предпочтительно равные) подмножества строк, а не использовать какое-либо конкретное свойство отдельных строк, чтобы решить, к какой группе они переходят ,

Вариант использования: Я хочу применить функцию к каждой строке через параллельную карту в IPython. Не имеет значения, какие строки попадают в какой-то back-end движок, поскольку функция вычисляет результат на основе одной строки за раз. (Концептуально, по крайней мере, в действительности это векторизация.)

Я придумал что-то вроде этого:

# Generate a number from 0-9 for each row, indicating which tenth of the DF it belongs to 
max_idx = dataframe.index.max() 
tenths = ((10 * dataframe.index)/(1 + max_idx)).astype(np.uint32) 

# Use this value to perform a groupby, yielding 10 consecutive chunks 
groups = [g[1] for g in dataframe.groupby(tenths)] 

# Process chunks in parallel 
results = dview.map_sync(my_function, groups) 

Но это, кажется, очень многословно, и не гарантирует куски одинакового размера. Особенно, если индекс разрежен или нецелое или что-то еще.

Любые предложения по улучшению?

Спасибо!

ответ

20

На практике вы не можете гарантировать равные размеры блоков: количество строк может быть простым, в конце концов, и в этом случае вашими единственными параметрами chunkks будут куски размером 1 или один большой кусок. Я склонен передавать массив до groupby. Начиная от:

>>> df = pd.DataFrame(np.random.rand(15, 5), index=[0]*15) 
>>> df[0] = range(15) 
>>> df 
    0   1   2   3   4 
0 0 0.746300 0.346277 0.220362 0.172680 
0 1 0.657324 0.687169 0.384196 0.214118 
0 2 0.016062 0.858784 0.236364 0.963389 
[...] 
0 13 0.510273 0.051608 0.230402 0.756921 
0 14 0.950544 0.576539 0.642602 0.907850 

[15 rows x 5 columns] 

где я намеренно сделал индекс неинформативного, установив его на 0, мы просто решили на нашем размере (здесь 10) и целое число, разделить массив на него:

>>> df.groupby(np.arange(len(df))//10) 
<pandas.core.groupby.DataFrameGroupBy object at 0xb208492c> 
>>> for k,g in df.groupby(np.arange(len(df))//10): 
...  print(k,g) 
...  
0 0   1   2   3   4 
0 0 0.746300 0.346277 0.220362 0.172680 
0 1 0.657324 0.687169 0.384196 0.214118 
0 2 0.016062 0.858784 0.236364 0.963389 
[...] 
0 8 0.241049 0.246149 0.241935 0.563428 
0 9 0.493819 0.918858 0.193236 0.266257 

[10 rows x 5 columns] 
1  0   1   2   3   4 
0 10 0.037693 0.370789 0.369117 0.401041 
0 11 0.721843 0.862295 0.671733 0.605006 
[...] 
0 14 0.950544 0.576539 0.642602 0.907850 

[5 rows x 5 columns] 

Методы, основанные на разрезе DataFrame, могут завершиться неудачно, если индекс несовместим с этим, хотя вы всегда можете использовать .iloc[a:b], чтобы игнорировать значения индекса и получать доступ к данным по положению.

+0

Это то, что я имел в виду! Ну технически "df.groupby (np.arange (len (df)) // (len (df)/10))", чтобы получить фиксированное количество групп (1 на ядро) вместо фиксированного размера. По какой-то причине мне не пришло в голову, что ключ группировки вообще не должен быть связан с индексом ... –

+1

Стоит отметить, что для эффективности, вероятно, лучше прочитать исходный файл с помощью «итератора» (https://pandas.pydata.org/pandas-docs/stable/generated/pandas.read_csv.html) и «chunksize», так что функция read_csv выполняет чтение, и каждый фрагмент может быть передан в отдельный процесс, как описано в @Ryan –

19

Я не уверен, что это именно то, что вы хотите, но я нашел эти функции группы another SO thread достаточно полезными для создания многопроцессорного пула.

Вот небольшой пример из этого потока, который мог бы сделать что-то вроде того, что вы хотите:

import numpy as np 
import pandas as pds 

df = pds.DataFrame(np.random.rand(14,4), columns=['a', 'b', 'c', 'd']) 

def chunker(seq, size): 
    return (seq[pos:pos + size] for pos in xrange(0, len(seq), size)) 

for i in chunker(df,5): 
    print i 

, который дает вам что-то вроде этого:

  a   b   c   d 
0 0.860574 0.059326 0.339192 0.786399 
1 0.029196 0.395613 0.524240 0.380265 
2 0.235759 0.164282 0.350042 0.877004 
3 0.545394 0.881960 0.994079 0.721279 
4 0.584504 0.648308 0.655147 0.511390 
      a   b   c   d 
5 0.276160 0.982803 0.451825 0.845363 
6 0.728453 0.246870 0.515770 0.343479 
7 0.971947 0.278430 0.006910 0.888512 
8 0.044888 0.875791 0.842361 0.890675 
9 0.200563 0.246080 0.333202 0.574488 
      a   b   c   d 
10 0.971125 0.106790 0.274001 0.960579 
11 0.722224 0.575325 0.465267 0.258976 
12 0.574039 0.258625 0.469209 0.886768 
13 0.915423 0.713076 0.073338 0.622967 

Я надеюсь, что помогает.

РЕДАКТИРОВАТЬ

В этом случае, я использовал эту функцию с pool of processors в (приблизительно) таким образом:

from multiprocessing import Pool 

nprocs = 4 

pool = Pool(nprocs) 

for chunk in chunker(df, nprocs): 
    data = pool.map(myfunction, chunk) 
    data.domorestuff() 

Я полагаю, это должно быть очень похоже на использование IPython распределенных машин, но я убежище Не пробовал.

+0

Это, безусловно, сделало бы трюк. Я до сих пор предпочитаю какую-то опрятную группу с одним лайнером, но если ничего подобного не материализуется, вы получаете приз :-) –

7

Признаком хорошей среды много вариантов, поэтому я добавлю это от Anaconda Blaze, на самом деле, используя Odo

import blaze as bz 
import pandas as pd 

df = pd.DataFrame({'col1':[1,2,3,4,5], 'col2':[2,4,6,8,10]}) 

for chunk in bz.odo(df, target=bz.chunks(pd.DataFrame), chunksize=2): 
    # Do stuff with chunked dataframe 
6

Использование NumPy имеет этот встроенный: np.array_split()

import numpy as np 
import pandas as pd 

data = pd.DataFrame(np.random.rand(10, 3)) 
for chunk in np.array_split(data, 5): 
    assert len(chunk) == len(data)/5 
Смежные вопросы