2016-10-14 4 views
2

В рамках моего исследования я ищу хороший дизайн для хранения данных панели. Я использую pandas для всех операций с памятью. Я рассмотрел следующие два вопроса/вклады: Large Data Work flows using Pandas и Query HDF5 Pandas, поскольку они приближаются к моей настройке. Однако у меня осталось пару вопросов. Прежде всего позвольте мне определить свои данные и некоторые требования:Pandas + HDF5 Пакетное хранение данных для больших данных

  1. Размер: У меня около 800 дат, 9000 идентификаторов и до 200 переменных. Следовательно, выравнивание панели (по датам и идентификаторам) соответствует строкам 7.2mio и 200 столбцам. Это может все поместиться в память или нет, допустим, это не так. Дисковое пространство не является проблемой.

  2. Переменные, как правило, вычисляются один раз, но обновления/изменения, вероятно, происходят время от времени. Как только обновления происходят, старые версии больше не имеют значения.

  3. Время от времени добавляются новые переменные, в основном по одному.

  4. Новые строки не добавляются.

  5. Выполнение запроса происходит. Например, часто мне нужно выбрать только определенный диапазон дат, например date>start_date & date<end_date. Но некоторые запросы должны учитывать условия рангов по датам. Например, получите все данные (например, столбцы), где rank(var1)>500 & rank(var1)<1000, где рангом является дата.

Целью является быстрое считывание/запрос данных. Написание данных не так критично.

я думал о следующей конструкции HDF5:

  1. Следуйте groups_map подход (от 1) для хранения переменных в различных таблицах. Ограничьте количество столбцов для каждой группы до 10 (чтобы избежать больших нагрузок памяти при обновлении отдельных переменных, см. Пункт 3).

  2. Каждая группа представляет собой одну таблицу, в которой я использую мультииндекс на основе дат & идентификаторов для каждой сохраненной таблицы.

  3. Создать функцию обновления, чтобы обновить переменные. Функции загружают таблицу со всеми (10) столбцами в память как df, удаляют таблицу на диске, заменяют обновленную переменную в df и сохраняют таблицу из памяти обратно на диск.

  4. Создайте функцию добавления, добавьте var1 в группу с менее чем 10 столбцами или при необходимости создайте новую группу. Сохранение аналогично тому, как в 3. Загрузка текущей группы в память, удаление таблицы на диске, добавление нового столбца и сохранение ее на диске.

  5. Рассчитать ранги по дате для соответствующих переменных и добавить их на дисковое хранилище в качестве rank_var1, что должно уменьшить запрос на просто rank_var1 > 500 & rank_var1<1000.

У меня есть следующие вопросы:

  1. Обновление HDFTable, я полагаю, я должен удалить всю таблицу, чтобы обновить один столбец?

  2. Когда использовать «data_columns» или просто присваивать True в HDFStore.append()?

  3. Если я хочу получить запрос на основании условия rank_var1 > 500 & rank_var1<1000, но мне нужны столбцы из других групп. Могу ли я ввести индекс, полученный из условия rank_var1, в запрос, чтобы получить другие столбцы на основе этого индекса (индекс - это мультииндекс с датой и идентификатором)? Или мне нужно будет зацикливать этот индекс по дате, а затем разбить идентификаторы, подобные предложенным в 2, и повторить процедуру для каждой группы, в которой я нуждаюсь. В качестве альтернативы, (a) я мог бы добавить к столбцам рангов таблицы таблицы групп, но он кажется крайне неэффективным с точки зрения дискового хранилища. Обратите внимание, что число переменных, в которых важна фильтрация ранжирования, ограничено (скажем, 5). Или (б) я мог бы просто использовать df_rank, полученный из запроса rank_var1, и использовать операции с оперативной памятью через df_rank.merge(df_tmp, left_index=True, right_index=True, how='left') и перебирать группы (df_tmp), где я выбираю нужные столбцы.

  4. Скажем, у меня есть данные на разных частотах. Полагаю, что разные карты group_maps (или разные хранилища) для разных частот - это путь?

  5. Копии хранилища могут использоваться в системах win/ux. Я предполагаю, что он абсолютно совместим, что-то для рассмотрения здесь?

  6. Планирую использовать pd.HDFStore(str(self.path), mode='a', complevel=9, complib='blosc'). Любые проблемы в отношении дополнения или соответствия?

Я начал писать код, как только у меня есть что показать, я отредактирую и добавлю его при желании. Пожалуйста, дайте мне знать, если вам нужна дополнительная информация.

EDIT Я здесь первая версия моего класса хранения, пожалуйста, отрегулируйте путь внизу. Извините за длину кода, комментарии приветствуются

import pandas as pd 
import numpy as np 
import string 

class LargeDFStorage(): 

    # TODO add index features to ensure correct indexes 
    # index_names = ('date', 'id') 

    def __init__(self, h5_path, groups_map): 
     """ 

     Parameters 
     ---------- 
     h5_path: str 
      hdf5 storage path 
     groups_map: dict 
      where keys are group_names and values are dict, with at least key 
      'columns' where the value is list of column names. 
      A special group_name is reserved for group_name/key "query", which 
      can be used as queering and conditioning table when getting data, 
      see :meth:`.get`. 
     """ 

     self.path = str(h5_path) 
     self.groups_map = groups_map 
     self.column_map = self._get_column_map() 
     # if desired make part of arguments 
     self.complib = 'blosc' 
     self.complevel = 9 

    def _get_column_map(self): 
     """ Calc the inverse of the groups_map/ensures uniqueness of cols 

     Returns 
     ------- 
     dict: with cols as keys and group_names as values 
     """ 
     column_map = dict() 
     for g, value in self.groups_map.items(): 
      if len(set(column_map.keys()) & set(value['columns'])) > 0: 
       raise ValueError('Columns have to be unique') 
      for col in value['columns']: 
       column_map[col] = g 

     return column_map 

    @staticmethod 
    def group_col_names(store, group_name): 
     """ Returns all column names of specific group 

     Parameters 
     ---------- 
     store: pd.HDFStore 
     group_name: str 

     Returns 
     ------- 
     list: 
      of all column names in the group 
     """ 
     if group_name not in store: 
      return [] 

     # hack to get column names, straightforward way!? 
     return store.select(group_name, start=0, stop=0).columns.tolist() 

    @staticmethod 
    def stored_cols(store): 
     """ Collects all columns stored in HDF5 store 

     Parameters 
     ---------- 
     store: pd.HDFStore 

     Returns 
     ------- 
     list: 
      a list of all columns currently in the store 
     """ 
     stored_cols = list() 
     for x in store.items(): 
      group_name = x[0][1:] 
      stored_cols += LargeDFStorage.group_col_names(store, group_name) 

     return stored_cols 

    def _find_groups(self, columns): 
     """ Searches all groups required for covering columns 

     Parameters 
     ---------- 
     columns: list 
      list of valid columns 

     Returns 
     ------- 
     list: 
      of unique groups 
     """ 
     groups = list() 
     for column in columns: 
      groups.append(self.column_map[column]) 

     return list(set(groups)) 

    def add_columns(self, df): 
     """ Adds columns to storage for the first time. If columns should 
     be updated use(use :meth:`.update` instead) 

     Parameters 
     ---------- 
     df: pandas.DataFrame 
      with new columns (not yet stored in any of the tables) 

     Returns 
     ------- 

     """ 
     store = pd.HDFStore(self.path, mode='a' , complevel=self.complevel, 
          complib=self.complib) 

     # check if any column has been stored already 
     if df.columns.isin(self.stored_cols(store)).any(): 
      store.close() 
      raise ValueError('Some cols are already in the store') 

     # find all groups needed to store the data 
     groups = self._find_groups(df.columns) 

     for group in groups: 
      v = self.groups_map[group] 

      # select columns of current group in df 
      select_cols = df.columns[df.columns.isin(v['columns'])].tolist() 
      tmp = df.reindex(columns=select_cols, copy=False) 

      # set data column to False only in case of query data 
      dc = None 
      if group=='query': 
       dc = True 

      stored_cols = self.group_col_names(store,group) 
      # no columns in group (group does not exists yet) 
      if len(stored_cols)==0: 
       store.append(group, tmp, data_columns=dc) 
      else: 
       # load current disk data to memory 
       df_grp = store.get(group) 
       # remove data from disk 
       store.remove(group) 
       # add new column(s) to df_disk 
       df_grp = df_grp.merge(tmp, left_index=True, right_index=True, 
             how='left') 
       # save old data with new, additional columns 
       store.append(group, df_grp, data_columns=dc) 

     store.close() 

    def _query_table(self, store, columns, where): 
     """ Selects data from table 'query' and uses where expression 

     Parameters 
     ---------- 
     store: pd.HDFStore 
     columns: list 
      desired data columns 
     where: str 
      a valid select expression 

     Returns 
     ------- 

     """ 

     query_cols = self.group_col_names(store, 'query') 
     if len(query_cols) == 0: 
      store.close() 
      raise ValueError('No data to query table') 
     get_cols = list(set(query_cols) & set(columns)) 
     if len(get_cols) == 0: 
      # load only one column to minimize memory usage 
      df_query = store.select('query', columns=query_cols[0], 
            where=where) 
      add_query = False 
     else: 
      # load columns which are anyways needed already 
      df_query = store.select('query', columns=get_cols, where=where) 
      add_query = True 

     return df_query, add_query 

    def get(self, columns, where=None): 
     """ Retrieve data from storage 

     Parameters 
     ---------- 
     columns: list/str 
      list of columns to use, or use 'all' if all columns should be 
      retrieved 
     where: str 
      a valid select statement 

     Returns 
     ------- 
     pandas.DataFrame 
      with all requested columns and considering where 
     """ 
     store = pd.HDFStore(str(self.path), mode='r') 

     # get all columns in stored in HDFStorage 
     stored_cols = self.stored_cols(store) 

     if columns == 'all': 
      columns = stored_cols 

     # check if all desired columns can be found in storage 
     if len(set(columns) - set(stored_cols)) > 0: 
      store.close() 
      raise ValueError('Column(s): {}. not in storage'.format(
       set(columns)- set(stored_cols))) 

     # get all relevant groups (where columns are taken from) 
     groups = self._find_groups(columns) 

     # if where query is defined retrieve data from storage, eventually 
     # only index of df_query might be used 
     if where is not None: 
      df_query, add_df_query = self._query_table(store, columns, where) 
     else: 
      df_query, add_df_query = None, False 

     # dd collector 
     df = list() 
     for group in groups: 
      # skip in case where was used and columns used from 
      if where is not None and group=='query': 
       continue 
      # all columns which are in group but also requested 
      get_cols = list(
       set(self.group_col_names(store, group)) & set(columns)) 

      tmp_df = store.select(group, columns=get_cols) 
      if df_query is None: 
       df.append(tmp_df) 
      else: 
       # align query index with df index from storage 
       df_query, tmp_df = df_query.align(tmp_df, join='left', axis=0) 
       df.append(tmp_df) 

     store.close() 

     # if any data of query should be added 
     if add_df_query: 
      df.append(df_query) 

     # combine all columns 
     df = pd.concat(df, axis=1) 

     return df 

    def update(self, df): 
     """ Updates data in storage, all columns have to be stored already in 
     order to be accepted for updating (use :meth:`.add_columns` instead) 

     Parameters 
     ---------- 
     df: pd.DataFrame 
      with index as in storage, and column as desired 


     Returns 
     ------- 

     """ 
     store = pd.HDFStore(self.path, mode='a' , complevel=self.complevel, 
          complib=self.complib) 

     # check if all column have been stored already 
     if df.columns.isin(self.stored_cols(store)).all() is False: 
      store.close() 
      raise ValueError('Some cols have not been stored yet') 

     # find all groups needed to store the data 
     groups = self._find_groups(df.columns) 
     for group in groups: 
      dc = None 
      if group=='query': 
       dc = True 
      # load current disk data to memory 
      group_df = store.get(group) 
      # remove data from disk 
      store.remove(group) 
      # update with new data 
      group_df.update(df) 
      # save updated df back to disk 
      store.append(group, group_df, data_columns=dc) 

     store.close() 


class DataGenerator(): 
    np.random.seed(1282) 

    @staticmethod 
    def get_df(rows=100, cols=10, freq='M'): 
     """ Simulate data frame 
     """ 
     if cols < 26: 
      col_name = list(string.ascii_lowercase[:cols]) 
     else: 
      col_name = range(cols) 
     if rows > 2000: 
      freq = 'Min' 
     index = pd.date_range('19870825', periods=rows, freq=freq) 
     df = pd.DataFrame(np.random.standard_normal((rows, cols)), 
          columns=col_name, index=index) 
     df.index.name = 'date' 
     df.columns.name = 'ID' 
     return df 

    @staticmethod 
    def get_panel(rows=1000, cols=500, items=10): 
     """ simulate panel data 
     """ 

     if items < 26: 
      item_names = list(string.ascii_lowercase[:cols]) 
     else: 
      item_names = range(cols) 
     panel_ = dict() 

     for item in item_names: 
      panel_[item] = DataGenerator.get_df(rows=rows, cols=cols) 

     return pd.Panel(panel_) 


def main(): 
    # Example of with DataFrame 
    path = 'D:\\fc_storage.h5' 
    groups_map = dict(
     a=dict(columns=['a', 'b', 'c', 'd', 'k']), 
     query=dict(columns=['e', 'f', 'g', 'rank_a']), 
    ) 
    storage = LargeDFStorage(path, groups_map=groups_map) 
    df = DataGenerator.get_df(rows=200000, cols=15) 
    storage.add_columns(df[['a', 'b', 'c', 'e', 'f']]) 
    storage.update(df[['a']]*3) 
    storage.add_columns(df[['d', 'g']]) 

    print(storage.get(columns=['a','b', 'f'], where='f<0 & e<0')) 

    # Example with panel and rank condition 
    path2 = 'D:\\panel_storage.h5' 
    storage_pnl = LargeDFStorage(path2, groups_map=groups_map) 
    panel = DataGenerator.get_panel(rows=800, cols=2000, items=24) 
    df = panel.to_frame() 
    df['rank_a'] = df[['a']].groupby(level='date').rank() 
    storage_pnl.add_columns(df[['a', 'b', 'c', 'e', 'f']]) 
    storage_pnl.update(df[['a']]*3) 
    storage_pnl.add_columns(df[['d', 'g', 'rank_a']]) 
    print(storage_pnl.get(columns=['a','b','e', 'f', 'rank_a'], 
          where='f>0 & e>0 & rank_a <100')) 


if __name__ == '__main__': 
    main() 

ответ

2

Это немного сложно ответить на эти вопросы без конкретных примеров ...

Обновление HDFTable, я полагаю, я должен удалить всю таблицу в чтобы обновить один столбец?

AFAIK yes, если вы не храните отдельные столбцы отдельно, но это будет сделано автоматически, вам просто нужно записать DF/Panel обратно в HDF Store.

Когда использовать 'data_columns', или я должен просто назначить Правда в HDFStore.append()?

data_columns=True - индекс все ваши колонки - IMO это пустая трата ресурсов, если вы не собираетесь использовать все столбцов в параметре где (то есть, если все столбцы должны быть проиндексированы). Я бы указал там только те столбцы, которые будут часто использоваться для поиска в статье where=. Рассмотрим эти столбцы как индексированные столбцы в таблице базы данных.

Если я хочу, чтобы запросить на основе состояния rank_var1> 500 & rank_var1 < 1000, но мне нужны столбцы из других групп.Могу ли я ввести индекс , полученный из условия rank_var1, в запрос, чтобы получить других столбцов на основе этого индекса (индекс - это мультииндекс с датой и идентификатором)?

Я думаю, что мы должны были бы некоторые воспроизводимые данные примеры и примеры запросов для того, чтобы дать разумный ответ ...

Копии хранения могут быть использованы в системах выигрыша/УБ. Я предполагаю, что это perferctly совместимый, что-нибудь, чтобы рассмотреть здесь?

Да, он должен быть полностью совместим

Я планирую использовать pd.HDFStore (ул (self.path), режим = 'а', complevel = 9, complib = 'blosc'). Любые проблемы в отношении дополнения или соответствия?

Test его данных - результаты могут зависеть от dtypes, количество уникальных значений, и т.д. Вы также можете рассмотреть lzo complib - это может быть быстрее в некоторых потребительных случаях. Проверьте this. Иногда высокий complevel не дает вам лучшего коэффициента сцепления, но будет медленнее (см. Результаты my old comparison)

+0

поэтому я добавил длинный пример кода (извините за это). Может быть, test_pnl exampel показывает, что я имею в виду с запросом на обработку данных по рангу. Если вам интересно, посмотрите на метод '.get()' для деталей. Функция 'get()', используемая с 'where', может быть не самая эффективная реализация с точки зрения управления памятью, но она довольно быстро работает на моем оборудовании. –

Смежные вопросы