2012-01-18 5 views
2

Я пишу скрипты для обработки (очень больших) файлов путем многократного разбрасывания объектов до EOF. Я хотел бы разбить файл и разделить отдельные процессы (в облаке) и обработать отдельные части.Unpickling mid-stream (python)

Однако мой разделитель не является интеллектуальным, он не знает границ между маринованными объектами в файле (поскольку эти границы зависят от типов маринованных объектов и т. Д.).

Есть ли способ отсканировать файл для «запущенного маринованного объекта»? Наивный способ заключается в попытке рассыпания при последовательных смещениях байта до тех пор, пока объект не будет успешно маринован, но это приведет к непредвиденным ошибкам. Похоже, что для некоторых комбинаций ввода неакционер выпадает из синхронизации и ничего не возвращает для остальной части файла (см. Код ниже).

import cPickle 
import os 

def stream_unpickle(file_obj): 
    while True: 
     start_pos = file_obj.tell() 
     try: 
      yield cPickle.load(file_obj) 
     except (EOFError, KeyboardInterrupt): 
      break 
     except (cPickle.UnpicklingError, ValueError, KeyError, TypeError, ImportError): 
      file_obj.seek(start_pos+1, os.SEEK_SET) 

if __name__ == '__main__': 
    import random 
    from StringIO import StringIO 

    # create some data 
    sio = StringIO() 
    [cPickle.dump(random.random(), sio, cPickle.HIGHEST_PROTOCOL) for _ in xrange(1000)] 
    sio.flush() 

    # read from subsequent offsets and find discontinuous jumps in object count 
    size = sio.tell() 
    last_count = None 
    for step in xrange(size): 
     sio.seek(step, os.SEEK_SET) 
     count = sum(1 for _ in stream_unpickle(file_obj)) 
     if last_count is None or count == last_count - 1: 
      last_count = count 
     elif count != last_count: 
      # if successful, these should never print (but they do...) 
      print '%d elements read from byte %d' % (count, step) 
      print '(%d elements read from byte %d)' % (last_count, step-1) 
      last_count = count 
+0

Можете ли вы изменить программу, которая создает файл, написать каждый маринад в отдельный файл? –

ответ

0

Извините, что ответите на мой вопрос и спасибо @ Раймонд Хеттингер за идею добавления часовых.

Вот что сработало для меня. Я создал читателей и писателей, которые используют отправитель '#S', за которым следует длина блока данных в начале каждой записи. Писатель должен позаботиться о том, чтобы найти записи '#' в записанных данных и удвоить их (в '##'). Затем читатель использует регулярное выражение look-behind для поиска часовых, отличных от любых совпадающих значений, которые могут быть в исходном потоке, а также проверяет количество байтов между этим сигналом и последующим.

RecordWriter - менеджер контекста (поэтому при необходимости несколько вызовов write() могут быть инкапсулированы в одну запись). RecordReader - это генератор.

Не знаете, как это работает. Любые более быстрые/элегантные решения приветствуются.

import re 
import cPickle 
from functools import partial 
from cStringIO import StringIO 

SENTINEL = '#S' 

# when scanning look for #S, but NOT ##S 
sentinel_pattern = '(?<!#)#S' # uses negative look-behind 
sentinel_re = re.compile(sentinel_pattern) 
find_sentinel = sentinel_re.search 

# when writing replace single # with double ## 
write_pattern = '#' 
write_re = re.compile(write_pattern) 
fix_write = partial(write_re.sub, '##') 

# when reading, replace double ## with single # 
read_pattern = '##' 
read_re = re.compile(read_pattern) 
fix_read = partial(read_re.sub, '#') 

class RecordWriter(object): 
    def __init__(self, stream): 
     self._stream = stream 
     self._write_buffer = None 

    def __enter__(self): 
     self._write_buffer = StringIO() 
     return self 

    def __exit__(self, et, ex, tb): 
     if self._write_buffer.tell(): 
      self._stream.write(SENTINEL) # start 
      cPickle.dump(self._write_buffer.tell(), self._stream, cPickle.HIGHEST_PROTOCOL) # byte length of user's original data 
      self._stream.write(fix_write(self._write_buffer.getvalue())) 
      self._write_buffer = None 
     return False 

    def write(self, data): 
     if not self._write_buffer: 
      raise ValueError("Must use StreamWriter as a context manager") 
     self._write_buffer.write(data) 

class BadBlock(Exception): pass 

def verify_length(block): 
    fobj = StringIO(block) 
    try: 
     stated_length = cPickle.load(fobj) 
    except (ValueError, IndexError, cPickle.UnpicklingError): 
     raise BadBlock 
    data = fobj.read() 
    if len(data) != stated_length: 
     raise BadBlock 
    return data 

def RecordReader(stream): 
    ' Read one record ' 
    accum = StringIO() 
    seen_sentinel = False 
    data = '' 
    while True: 
     m = find_sentinel(data) 
     if not m: 
      if seen_sentinel: 
       accum.write(data) 
      data = stream.read(80) 
      if not data: 
       if accum.tell(): 
        try: yield verify_length(fix_read(accum.getvalue())) 
        except BadBlock: pass 
       return 
     else: 
      if seen_sentinel: 
       accum.write(data[:m.start()]) 
       try: yield verify_length(fix_read(accum.getvalue())) 
       except BadBlock: pass 
       accum = StringIO() 
      else: 
       seen_sentinel = True 
      data = data[m.end():] # toss 

if __name__ == '__main__': 
    import random 

    stream = StringIO() 
    data = [str(random.random()) for _ in xrange(3)] 
    # test with a string containing sentinel and almost-sentinel 
    data.append('abc12#jeoht38#SoSooihetS#') 
    count = len(data) 
    for i in data: 
     with RecordWriter(stream) as r: 
      r.write(i) 

    size = stream.tell() 
    start_pos = random.random() * size 
    stream.seek(start_pos, os.SEEK_SET) 
    read_data = [s for s in RecordReader(stream)] 
    print 'Original data: ', data 
    print 'After seeking to %d, RecordReader returned: %s' % (start_pos, read_data) 
1

Модуль pickletools имеет дис функцию, которая показывает опкоды. Это показывает, что существует СТОП опкод, что вы можете сканировать:

>>> import pickle, pickletools, StringIO 
>>> s = StringIO.StringIO() 
>>> pickle.dump('abc', s) 
>>> p = s.getvalue() 
>>> pickletools.dis(p) 
    0: S STRING  'abc' 
    7: p PUT  0 
    10: . STOP 
highest protocol among opcodes = 0 

Примечание, с помощью СТОП опкод немного сложнее, так как коды переменной длины, но она может служить полезным намеком о том, где обрезаются.

Если вы контролируете шаг травления на другом конце, то вы можете исправить ситуацию, добавив свой собственный однозначный альтернативный разделитель:

>>> sep = '\xDE\xAD\xBE\xEF' 
>>> s = StringIO.StringIO() 
>>> pickle.dump('abc', s) 
>>> s.write(sep) 
>>> pickle.dump([10, 20], s) 
>>> s.write(sep) 
>>> pickle.dump('def', s) 
>>> s.write(sep) 
>>> pickle.dump([30, 40], s) 
>>> p = s.getvalue() 

Перед распаковкой, разделить на отдельные солений с использованием известного сепаратора:

>>> for pick in p.split(sep): 
     print pickle.loads(pick) 

abc 
[10, 20] 
def 
[30, 40] 
+0

Я пробовал с различными объектами (включая вложенные структуры), и я только что видел один код операции STOP в самом конце маринованной строки. – jcollado

+0

Я вижу несколько STOPS, по одному для каждого маринования. Например, следующая строка содержит три рассола, каждая из которых заканчивается символом «.». символ: '' S'abc '\ np0 \ n. (lp0 \ nI10 \ naI20 \ na. (lp0 \ nI30 \ naI40 \ na.''. Это наблюдение подтверждается исходным кодом для Lib/pickle.py on line 103: '' STOP = '.' # Каждый развод заканчивается с помощью STOP''. –

+0

Я пробовал сканирование для STOP, используя 'while file_obj.read (1)! = '.': Pass', если я читаю на точка, отличная от начала файла, но я все еще получаю ошибки. Я предполагаю, что код остановки stop в двоичном файле - это то, что не сравнивается с «.», или «.» появляется в другом месте в рассоле. – vsekhar

0

В маринованном файле некоторые коды операций имеют аргумент - значение данных, которое следует за кодом операции. Значения данных различаются по длине и могут содержать байты, идентичные кодам операций. Поэтому, если вы начнете читать файл с произвольной позиции, вы не сможете узнать, ищете ли вы код операции или в середине аргумента. Вы должны прочитать файл с начала и проанализировать коды операций.

Я приготовил эту функцию, которая пропускает один маринад из файла, т. Е. Читает его и анализирует коды операций, но не создает объекты. Кажется немного быстрее, чем cPickle.loads в некоторых файлах, которые у меня есть. Вы можете переписать это на C для большей скорости. (после надлежащего тестирования)

Затем вы можете сделать один проход по всему файлу, чтобы получить позицию поиска каждого рассола.

from pickletools import code2op, UP_TO_NEWLINE, TAKEN_FROM_ARGUMENT1, TAKEN_FROM_ARGUMENT4 
from marshal import loads as mloads 

def skip_pickle(f): 
    """Skip one pickle from file. 

    'f' is a file-like object containing the pickle. 

    """ 
    while True: 
     code = f.read(1) 
     if not code: 
      raise EOFError 
     opcode = code2op[code] 
     if opcode.arg is not None: 
      n = opcode.arg.n 
      if n > 0: 
       f.read(n) 
      elif n == UP_TO_NEWLINE: 
       f.readline() 
      elif n == TAKEN_FROM_ARGUMENT1: 
       n = ord(f.read(1)) 
       f.read(n) 
      elif n == TAKEN_FROM_ARGUMENT4: 
       n = mloads('i' + f.read(4)) 
       f.read(n) 
     if code == '.': 
      break   
Смежные вопросы