В рамках моего исследования я ищу хороший дизайн для хранения данных панели. Я использую pandas для всех операций с памятью. Я рассмотрел следующие два вопроса/вклады: Large Data Work flows using Pandas и Query HDF5 Pandas, поскольку они приближаются к моей настройке. Однако у меня осталось пару вопросов. Прежде всего позвольте мне определить свои данные и некоторые требования:Pandas + HDF5 Пакетное хранение данных для больших данных
Размер: У меня около 800 дат, 9000 идентификаторов и до 200 переменных. Следовательно, выравнивание панели (по датам и идентификаторам) соответствует строкам 7.2mio и 200 столбцам. Это может все поместиться в память или нет, допустим, это не так. Дисковое пространство не является проблемой.
Переменные, как правило, вычисляются один раз, но обновления/изменения, вероятно, происходят время от времени. Как только обновления происходят, старые версии больше не имеют значения.
Время от времени добавляются новые переменные, в основном по одному.
Новые строки не добавляются.
Выполнение запроса происходит. Например, часто мне нужно выбрать только определенный диапазон дат, например
date>start_date & date<end_date
. Но некоторые запросы должны учитывать условия рангов по датам. Например, получите все данные (например, столбцы), гдеrank(var1)>500 & rank(var1)<1000
, где рангом является дата.
Целью является быстрое считывание/запрос данных. Написание данных не так критично.
я думал о следующей конструкции HDF5:
Следуйте groups_map подход (от 1) для хранения переменных в различных таблицах. Ограничьте количество столбцов для каждой группы до 10 (чтобы избежать больших нагрузок памяти при обновлении отдельных переменных, см. Пункт 3).
Каждая группа представляет собой одну таблицу, в которой я использую мультииндекс на основе дат & идентификаторов для каждой сохраненной таблицы.
Создать функцию обновления, чтобы обновить переменные. Функции загружают таблицу со всеми (10) столбцами в память как df, удаляют таблицу на диске, заменяют обновленную переменную в df и сохраняют таблицу из памяти обратно на диск.
Создайте функцию добавления, добавьте var1 в группу с менее чем 10 столбцами или при необходимости создайте новую группу. Сохранение аналогично тому, как в 3. Загрузка текущей группы в память, удаление таблицы на диске, добавление нового столбца и сохранение ее на диске.
Рассчитать ранги по дате для соответствующих переменных и добавить их на дисковое хранилище в качестве rank_var1, что должно уменьшить запрос на просто
rank_var1 > 500 & rank_var1<1000
.
У меня есть следующие вопросы:
Обновление HDFTable, я полагаю, я должен удалить всю таблицу, чтобы обновить один столбец?
Когда использовать «data_columns» или просто присваивать True в HDFStore.append()?
Если я хочу получить запрос на основании условия
rank_var1 > 500 & rank_var1<1000
, но мне нужны столбцы из других групп. Могу ли я ввести индекс, полученный из условия rank_var1, в запрос, чтобы получить другие столбцы на основе этого индекса (индекс - это мультииндекс с датой и идентификатором)? Или мне нужно будет зацикливать этот индекс по дате, а затем разбить идентификаторы, подобные предложенным в 2, и повторить процедуру для каждой группы, в которой я нуждаюсь. В качестве альтернативы, (a) я мог бы добавить к столбцам рангов таблицы таблицы групп, но он кажется крайне неэффективным с точки зрения дискового хранилища. Обратите внимание, что число переменных, в которых важна фильтрация ранжирования, ограничено (скажем, 5). Или (б) я мог бы просто использовать df_rank, полученный из запроса rank_var1, и использовать операции с оперативной памятью черезdf_rank.merge(df_tmp, left_index=True, right_index=True, how='left')
и перебирать группы (df_tmp), где я выбираю нужные столбцы.Скажем, у меня есть данные на разных частотах. Полагаю, что разные карты group_maps (или разные хранилища) для разных частот - это путь?
Копии хранилища могут использоваться в системах win/ux. Я предполагаю, что он абсолютно совместим, что-то для рассмотрения здесь?
Планирую использовать
pd.HDFStore(str(self.path), mode='a', complevel=9, complib='blosc')
. Любые проблемы в отношении дополнения или соответствия?
Я начал писать код, как только у меня есть что показать, я отредактирую и добавлю его при желании. Пожалуйста, дайте мне знать, если вам нужна дополнительная информация.
EDIT Я здесь первая версия моего класса хранения, пожалуйста, отрегулируйте путь внизу. Извините за длину кода, комментарии приветствуются
import pandas as pd
import numpy as np
import string
class LargeDFStorage():
# TODO add index features to ensure correct indexes
# index_names = ('date', 'id')
def __init__(self, h5_path, groups_map):
"""
Parameters
----------
h5_path: str
hdf5 storage path
groups_map: dict
where keys are group_names and values are dict, with at least key
'columns' where the value is list of column names.
A special group_name is reserved for group_name/key "query", which
can be used as queering and conditioning table when getting data,
see :meth:`.get`.
"""
self.path = str(h5_path)
self.groups_map = groups_map
self.column_map = self._get_column_map()
# if desired make part of arguments
self.complib = 'blosc'
self.complevel = 9
def _get_column_map(self):
""" Calc the inverse of the groups_map/ensures uniqueness of cols
Returns
-------
dict: with cols as keys and group_names as values
"""
column_map = dict()
for g, value in self.groups_map.items():
if len(set(column_map.keys()) & set(value['columns'])) > 0:
raise ValueError('Columns have to be unique')
for col in value['columns']:
column_map[col] = g
return column_map
@staticmethod
def group_col_names(store, group_name):
""" Returns all column names of specific group
Parameters
----------
store: pd.HDFStore
group_name: str
Returns
-------
list:
of all column names in the group
"""
if group_name not in store:
return []
# hack to get column names, straightforward way!?
return store.select(group_name, start=0, stop=0).columns.tolist()
@staticmethod
def stored_cols(store):
""" Collects all columns stored in HDF5 store
Parameters
----------
store: pd.HDFStore
Returns
-------
list:
a list of all columns currently in the store
"""
stored_cols = list()
for x in store.items():
group_name = x[0][1:]
stored_cols += LargeDFStorage.group_col_names(store, group_name)
return stored_cols
def _find_groups(self, columns):
""" Searches all groups required for covering columns
Parameters
----------
columns: list
list of valid columns
Returns
-------
list:
of unique groups
"""
groups = list()
for column in columns:
groups.append(self.column_map[column])
return list(set(groups))
def add_columns(self, df):
""" Adds columns to storage for the first time. If columns should
be updated use(use :meth:`.update` instead)
Parameters
----------
df: pandas.DataFrame
with new columns (not yet stored in any of the tables)
Returns
-------
"""
store = pd.HDFStore(self.path, mode='a' , complevel=self.complevel,
complib=self.complib)
# check if any column has been stored already
if df.columns.isin(self.stored_cols(store)).any():
store.close()
raise ValueError('Some cols are already in the store')
# find all groups needed to store the data
groups = self._find_groups(df.columns)
for group in groups:
v = self.groups_map[group]
# select columns of current group in df
select_cols = df.columns[df.columns.isin(v['columns'])].tolist()
tmp = df.reindex(columns=select_cols, copy=False)
# set data column to False only in case of query data
dc = None
if group=='query':
dc = True
stored_cols = self.group_col_names(store,group)
# no columns in group (group does not exists yet)
if len(stored_cols)==0:
store.append(group, tmp, data_columns=dc)
else:
# load current disk data to memory
df_grp = store.get(group)
# remove data from disk
store.remove(group)
# add new column(s) to df_disk
df_grp = df_grp.merge(tmp, left_index=True, right_index=True,
how='left')
# save old data with new, additional columns
store.append(group, df_grp, data_columns=dc)
store.close()
def _query_table(self, store, columns, where):
""" Selects data from table 'query' and uses where expression
Parameters
----------
store: pd.HDFStore
columns: list
desired data columns
where: str
a valid select expression
Returns
-------
"""
query_cols = self.group_col_names(store, 'query')
if len(query_cols) == 0:
store.close()
raise ValueError('No data to query table')
get_cols = list(set(query_cols) & set(columns))
if len(get_cols) == 0:
# load only one column to minimize memory usage
df_query = store.select('query', columns=query_cols[0],
where=where)
add_query = False
else:
# load columns which are anyways needed already
df_query = store.select('query', columns=get_cols, where=where)
add_query = True
return df_query, add_query
def get(self, columns, where=None):
""" Retrieve data from storage
Parameters
----------
columns: list/str
list of columns to use, or use 'all' if all columns should be
retrieved
where: str
a valid select statement
Returns
-------
pandas.DataFrame
with all requested columns and considering where
"""
store = pd.HDFStore(str(self.path), mode='r')
# get all columns in stored in HDFStorage
stored_cols = self.stored_cols(store)
if columns == 'all':
columns = stored_cols
# check if all desired columns can be found in storage
if len(set(columns) - set(stored_cols)) > 0:
store.close()
raise ValueError('Column(s): {}. not in storage'.format(
set(columns)- set(stored_cols)))
# get all relevant groups (where columns are taken from)
groups = self._find_groups(columns)
# if where query is defined retrieve data from storage, eventually
# only index of df_query might be used
if where is not None:
df_query, add_df_query = self._query_table(store, columns, where)
else:
df_query, add_df_query = None, False
# dd collector
df = list()
for group in groups:
# skip in case where was used and columns used from
if where is not None and group=='query':
continue
# all columns which are in group but also requested
get_cols = list(
set(self.group_col_names(store, group)) & set(columns))
tmp_df = store.select(group, columns=get_cols)
if df_query is None:
df.append(tmp_df)
else:
# align query index with df index from storage
df_query, tmp_df = df_query.align(tmp_df, join='left', axis=0)
df.append(tmp_df)
store.close()
# if any data of query should be added
if add_df_query:
df.append(df_query)
# combine all columns
df = pd.concat(df, axis=1)
return df
def update(self, df):
""" Updates data in storage, all columns have to be stored already in
order to be accepted for updating (use :meth:`.add_columns` instead)
Parameters
----------
df: pd.DataFrame
with index as in storage, and column as desired
Returns
-------
"""
store = pd.HDFStore(self.path, mode='a' , complevel=self.complevel,
complib=self.complib)
# check if all column have been stored already
if df.columns.isin(self.stored_cols(store)).all() is False:
store.close()
raise ValueError('Some cols have not been stored yet')
# find all groups needed to store the data
groups = self._find_groups(df.columns)
for group in groups:
dc = None
if group=='query':
dc = True
# load current disk data to memory
group_df = store.get(group)
# remove data from disk
store.remove(group)
# update with new data
group_df.update(df)
# save updated df back to disk
store.append(group, group_df, data_columns=dc)
store.close()
class DataGenerator():
np.random.seed(1282)
@staticmethod
def get_df(rows=100, cols=10, freq='M'):
""" Simulate data frame
"""
if cols < 26:
col_name = list(string.ascii_lowercase[:cols])
else:
col_name = range(cols)
if rows > 2000:
freq = 'Min'
index = pd.date_range('19870825', periods=rows, freq=freq)
df = pd.DataFrame(np.random.standard_normal((rows, cols)),
columns=col_name, index=index)
df.index.name = 'date'
df.columns.name = 'ID'
return df
@staticmethod
def get_panel(rows=1000, cols=500, items=10):
""" simulate panel data
"""
if items < 26:
item_names = list(string.ascii_lowercase[:cols])
else:
item_names = range(cols)
panel_ = dict()
for item in item_names:
panel_[item] = DataGenerator.get_df(rows=rows, cols=cols)
return pd.Panel(panel_)
def main():
# Example of with DataFrame
path = 'D:\\fc_storage.h5'
groups_map = dict(
a=dict(columns=['a', 'b', 'c', 'd', 'k']),
query=dict(columns=['e', 'f', 'g', 'rank_a']),
)
storage = LargeDFStorage(path, groups_map=groups_map)
df = DataGenerator.get_df(rows=200000, cols=15)
storage.add_columns(df[['a', 'b', 'c', 'e', 'f']])
storage.update(df[['a']]*3)
storage.add_columns(df[['d', 'g']])
print(storage.get(columns=['a','b', 'f'], where='f<0 & e<0'))
# Example with panel and rank condition
path2 = 'D:\\panel_storage.h5'
storage_pnl = LargeDFStorage(path2, groups_map=groups_map)
panel = DataGenerator.get_panel(rows=800, cols=2000, items=24)
df = panel.to_frame()
df['rank_a'] = df[['a']].groupby(level='date').rank()
storage_pnl.add_columns(df[['a', 'b', 'c', 'e', 'f']])
storage_pnl.update(df[['a']]*3)
storage_pnl.add_columns(df[['d', 'g', 'rank_a']])
print(storage_pnl.get(columns=['a','b','e', 'f', 'rank_a'],
where='f>0 & e>0 & rank_a <100'))
if __name__ == '__main__':
main()
поэтому я добавил длинный пример кода (извините за это). Может быть, test_pnl exampel показывает, что я имею в виду с запросом на обработку данных по рангу. Если вам интересно, посмотрите на метод '.get()' для деталей. Функция 'get()', используемая с 'where', может быть не самая эффективная реализация с точки зрения управления памятью, но она довольно быстро работает на моем оборудовании. –