2016-11-22 4 views
1

Так что у меня две панды dataframes создан с помощьюKeyError с помощью `dask.merge()`

df1 = pd.read_cvs("first1.csv") 
df2 = pd.read_csv("second2.csv") 

Они оба имеют столбец column1. Для двойной проверки,

print(df1.columns) 
print(df2.columns) 

оба возвращают колонку 'column1'.

Итак, я хотел бы объединить эти два dataframes с DASK, используя 60 темы, локально (с помощью внешнего слияния):

dd1 = dd.merge(df1, df2, on="column1", how="outer", suffixes=("","_repeat")).compute(num_workers=60) 

Это терпит неудачу с KeyError, KeyError: 'column1'

Traceback (most recent call last): 
    File "INSTALLATIONPATH/python3.5/site-packages/pandas/indexes/base.py", line 2134, in get_loc 
    return self._engine.get_loc(key) 
    File "pandas/index.pyx", line 139, in pandas.index.IndexEngine.get_loc (pandas/index.c:4443) 
    File "pandas/index.pyx", line 161, in pandas.index.IndexEngine.get_loc (pandas/index.c:4289) 
    File "pandas/src/hashtable_class_helper.pxi", line 732, in pandas.hashtable.PyObjectHashTable.get_item (pandas/hashtable.c:13733) 
    File "pandas/src/hashtable_class_helper.pxi", line 740, in pandas.hashtable.PyObjectHashTable.get_item (pandas/hashtable.c:13687) 
KeyError: 'column1' 

I подумал бы, что это параллелизуемая задача, т. е. dd.merge(df1, df2, on='id')

Есть ли для этого операция «эквивалентная дак»? Я также попытался переиндексации панд dataframes на chr (т.е. df1 = df1.reset_index('chr')), а затем попытался присоединиться по индексу

dd.merge(df1, df2, left_index=True, right_index=True) 

Это не сработало, ту же ошибку.

http://dask.pydata.org/en/latest/dataframe-overview.html

+0

Вы вызываете dd.merge на 'pandas.DataFrame' или на' Dask.dataframe'? –

ответ

1

С вашей ошибки, я бы перепроверить свой первоначальный dataframe, чтобы убедиться, что у вас есть column1 в обоих (без лишних пробелов или что-нибудь) в качестве фактического столба, потому что он должен работать нормально (без ошибок в следующий код)

Кроме того, существует разница между слиянием вызовов по pandas.DataFrame или по телефону Dask.dataframe.

Вот несколько примеров данных:

df1 = pd.DataFrame(np.transpose([np.arange(1000), 
          np.arange(1000)]), columns=['column1','column1_1']) 

df2 = pd.DataFrame(np.transpose([np.arange(1000), 
          np.arange(1000, 2000)]), columns=['column1','column1_2']) 

И их dask эквивалент:

ddf1 = dd.from_pandas(df1, npartitions=100) 
ddf2 = dd.from_pandas(df2, npartitions=100) 

Использование pandas.DataFrame:

In [1]: type(dd.merge(df1, df2, on="column1", how="outer")) 

Out [1]: pandas.core.frame.DataFrame 

Так что это возвращает pandas.DataFrame, так что вы не может ll compute() на нем.

Использование dask.dataframe:

In [2]: type(dd.merge(ddf1, ddf2, on="column1", how="outer")) 
Out[2]: dask.dataframe.core.DataFrame 

Здесь Вы можете позвонить compute:

In [3]: dd.merge(ddf1,ddf2, how='outer').compute(num_workers=60) 

Out[3]: 
    column1 column1_1 column1_2 
0  0   0  1000 
1  400  400  1400 
2  100  100  1100 
3  500  500  1500 
4  300  300  1300 

Side Примечание: в зависимости от размера ваших данных и вашего оборудования, вы можете хочу проверить, не будет ли делать pandas.join быстрее:

df1.set_index('column1').join(df2.set_index('column1'), how='outer').reset_index() 

Используя размер (1 000 000, 2) для каждого DF это быстрее, чем DASK решения на моем оборудовании.

+0

«Используя размер (1 000 000, 2) для каждого df, это быстрее, чем решение dask на моем оборудовании« Почему это соединение будет быстрее слияния? – EB2127

+1

Потому что это другая реализация, кроме слияния. pd.join работает с индексом. Он последовательно быстрее, чем pd.merge (в этом случае коэффициент 4). Это покрыто множеством вопросов о SO. –

+1

Понял. Спасибо за помощь! – EB2127