2016-11-18 4 views
3

Я студент PhD, изучающий микроструктуру рынка. Мне нужно иметь дело с очень большими наборами данных (миллисекунды данных, которые составляют сотни ГБ). Я использую SAS, который довольно хорош для обработки больших данных в формате фрейма данных. Однако это дорого. Я бы хотел использовать Python для изучения/исследования. У меня есть некоторые, но не продвинутые навыки в Python. Я слышал о Pandas, который достаточно эффективен в обработке кадров данных, но ограничен RAM, что не очень хорошо для моей цели.Ошибка памяти при чтении больших файлов csv в словаре

То, что я пробовал: Я пытался перебирать строку данных по линии, обрабатывать их и хранить в словарях, но это имеет ограничение памяти. У меня ошибка памяти, и я вижу, как Python жевал всю RAM (у меня 32gb). Этот набор данных по-прежнему очень мал (500 мб) по сравнению с тем, с чем я буду иметь дело позже (50 ~ 100 гб). Кроме того, есть вещи, которые трудно сделать по строкам, такие как регрессии, графики и т. Д. Итак, мой вопрос заключается в том, как обрабатывать и хранить такие данные?

Входные данные выглядит следующим образом:

#RIC Date[L]  Time[L] Type Price Volume Bid Price Ask Price 
TPI.AX 20140820 00:11.7 Quote        0.91 
TPI.AX 20140820 00:11.7 Trade 0.91 10000  
TPI.AX 20140820 00:21.5 Quote        0.91 
TPI.AX 20140820 00:22.1 Quote     0.905 
TPI.AX 20140820 00:42.2 Quote     0.905 
TPI.AX 20140820 00:42.6 Trade 0.9075 117  
TPI.AX 20140820 00:43.1 Trade 0.9075 495  
TPI.AX 20140820 00:49.6 Quote     0.905 
TPI.AX 20140820 00:57.6 Quote     0.905 
TPI.AX 20140820 00:57.6 Quote     0.905 
TPI.AX 20140820 00:58.3 Quote     0.905 
TPI.AX 20140820 01:02.6 Quote        0.91 
TPI.AX 20140820 01:02.6 Quote        0.91 
TPI.AX 20140820 01:02.6 Quote     0.905 
TPI.AX 20140820 01:02.6 Trade 0.91 9365   
TPI.AX 20140820 01:02.6 Trade 0.91 9041   

Это мой код:

def spread_calculation(input_file_list, output_file): 
    """This function calculates the spreads for securities in input_file_list 
    input: trade and quote data from TRTH 
    2 parameters: 1. list of file names, 2.output file name 
    output: csv file contains spreads""" 
    # Set variables: 
    date = None 
    exchange_bbo = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(float))))) 
    effective_spread = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(float))))) 
    time_bucket = [i * 100000.0 for i in range(0, (16 * 60 * 60 * 1000) * 1000/100000)] 
    for file in input_file_list: 
     file_to_open = '%s.csv' % file 
     reader = csv.DictReader(open(file_to_open, 'rb')) 
     for i in reader: 
      if not bool(date): 
       date = i['Date[L]'][0:4] + "-" + i['Date[L]'][4:6] + "-" + i['Date[L]'][6:8] 
      if i['Type'] == 'Quote' and (time_to_milli(i['Time[L]']) <= (16*60*60*1000)*1000): 
       security = i['#RIC'].split('.')[0] 
       exchange = i['#RIC'].split('.')[1] 
       timestamp = float(time_to_milli(i['Time[L]'])) 
       bucket = ceil(float(time_to_milli(i['Time[L]']))/100000.0) * 100000.0 
       if i['Bid Price'] == "": 
        bid = 0.0 
       else: 
        bid = float(i['Bid Price']) 
       if i['Ask Price'] == "": 
        ask = 0.0 
       else: 
        ask = float(i['Ask Price']) 
       if bid < ask < 199999.99: 
        if not bool(exchange_bbo[security][exchange][date][bucket]['ask']): 
         exchange_bbo[security][exchange][date][bucket]['ask'] = ask 
         exchange_bbo[security][exchange][date][bucket]['diff_ask'] = bucket - timestamp 
        elif exchange_bbo[security][exchange][date][bucket]['diff_ask'] > bucket - timestamp: 
         exchange_bbo[security][exchange][date][bucket]['ask'] = ask 
         exchange_bbo[security][exchange][date][bucket]['diff_ask'] = bucket - timestamp 
        if not bool(exchange_bbo[security][exchange][date][bucket]['bid']): 
         exchange_bbo[security][exchange][date][bucket]['bid'] = bid 
         exchange_bbo[security][exchange][date][bucket]['diff_bid'] = bucket - timestamp 
        elif exchange_bbo[security][exchange][date][bucket]['diff_bid'] > bucket - timestamp: 
         exchange_bbo[security][exchange][date][bucket]['bid'] = bid 
         exchange_bbo[security][exchange][date][bucket]['diff_bid'] = bucket - timestamp 
      if i['Type'] == 'Trade' and i['Price'] != "" and i['Price'] != 0.0: 
       timestamp = float(time_to_milli(i['Time[L]'])) 
       bucket = ceil(float(time_to_milli(i['Time[L]']))/100000.0) * 100000.0 
       security = i['#RIC'].split('.')[0] 
       exchange = i['#RIC'].split('.')[1] 
       price = float(i['Price']) 
       volume= float(i['Volume']) 
       if not bool(exchange_bbo[security][exchange][date][bucket]['price']): 
        exchange_bbo[security][exchange][date][bucket]['price'] = price 
        exchange_bbo[security][exchange][date][bucket]['volume'] = volume 
        exchange_bbo[security][exchange][date][bucket]['time_diff'] = bucket - timestamp 
       elif exchange_bbo[security][exchange][date][bucket]['time_diff'] > bucket - timestamp and price != 0.0: 
        exchange_bbo[security][exchange][date][bucket]['price'] = price 
        exchange_bbo[security][exchange][date][bucket]['volume'] = volume 
        exchange_bbo[security][exchange][date][bucket]['time_diff'] = bucket - timestamp 

     # Fill the empty buckets - exchange level 
     for security in exchange_bbo: 
      for exchange in exchange_bbo[security]: 
       for date in exchange_bbo[security][exchange]: 
        for bucket in time_bucket: 
         previous = bucket - 100000.0 
         # best offer 
         bo_t = exchange_bbo[security][exchange][date][bucket]['ask'] 
         bo_t1 = exchange_bbo[security][exchange][date][previous]['ask'] 
         if bo_t == 0.0 and bo_t1 != 0.0: 
          exchange_bbo[security][exchange][date][bucket]['ask'] = bo_t1 
         # best bid 
         bb_t = exchange_bbo[security][exchange][date][bucket]['bid'] 
         bb_t1 = exchange_bbo[security][exchange][date][previous]['bid'] 
         if bb_t == 0.0 and bb_t1 != 0.0: 
          exchange_bbo[security][exchange][date][bucket]['bid'] = bb_t1 

     for security in exchange_bbo: 
      for exchange in exchange_bbo[security]: 
       for date in exchange_bbo[security][exchange]: 
        for bucket in exchange_bbo[security][exchange][date]: 
         if not bool(exchange_bbo[security][exchange][date][bucket]['price']): 
          nbo = exchange_bbo[security][exchange][date][bucket]['ask'] 
          nbb = exchange_bbo[security][exchange][date][bucket]['bid'] 
          midpoint = (nbo + nbb)/2.0 
          price = exchange_bbo[security][exchange][date][bucket]['price'] 
          volume= exchange_bbo[security][exchange][date][bucket]['volume'] 
          # print security, exchange, bucket, price, midpoint 
          if price > 0.0 and midpoint != 0.0: 
           effective_spread[security][exchange][date][bucket]['espread_bps'] = 2.0 * abs(price - midpoint)/midpoint 
           effective_spread[security][exchange][date][bucket]['volume']=volume 
           effective_spread[security][exchange][date]['count'] += 1.0 

     data_writer = csv.DictWriter(open(output_file, 'wb'), 
            fieldnames=['security', 'exchange', 'date', 'bucket' 'espread_bps', 'volume', 'count']) 

     data_writer.writeheader() 

     for security in effective_spread: 
      for exchange in effective_spread[security]: 
       for date in effective_spread[security][exchange]: 
        for bucket in effective_spread[security][exchange][date]: 
         espread_bps = effective_spread[security][exchange][date][bucket]['espread_bps'] 
         volume = effective_spread[security][exchange][date][bucket]['volume'] 
         count = effective_spread[security][exchange][date][bucket]['count'] 
         data_writer.writerow({'security': security, 'exchange': exchange, 'date': date, 'bucket': bucket, 
               'espread_bps': espread_bps, 'volume': volume, 'count': count}) 

input_files = ['ScandinavianTAQ'] 

Спасибо большое

ответ

0

100 GB не так много данных , База данных SQL и Pandas должны быть все, что вам нужно. Вам нужно научиться писать SQL-запросы, и я бы рекомендовал захватить копию Wes McKinney's book. Я не смотрел ваш код, но мне кажется, что самая большая проблема в том, что вы делаете все подряд, а не группируете свои операции.
Кроме того, проверьте Dask

0

Я бы проверить elastic, если вы собираетесь иметь дело с большим количеством dicts для внешнего хранения. Хорошо работает с большими данными и имеет среднюю кривую обучения.

Для больших файлов с памятью вы можете посмотреть memmap и lazy reading, если линия за линию является приемлемой. Обычно итерация является принятым методом.

Операции группировки также помогают в вашем контексте, например, подумайте, есть ли независимые операции, которые могут выполняться параллельно. Для этого проверьте некоторые примеры сообщений SO, например this. Вам будет полезно поговорить с экспертами в вашей области об оптимизации вычислений.

Также у вас есть доступ к внешнему серверу? Если вы это сделаете и ваша распределенная система, ваши варианты еще больше.

Смежные вопросы