Я пишу скрипты для обработки (очень больших) файлов путем многократного разбрасывания объектов до EOF. Я хотел бы разбить файл и разделить отдельные процессы (в облаке) и обработать отдельные части.Unpickling mid-stream (python)
Однако мой разделитель не является интеллектуальным, он не знает границ между маринованными объектами в файле (поскольку эти границы зависят от типов маринованных объектов и т. Д.).
Есть ли способ отсканировать файл для «запущенного маринованного объекта»? Наивный способ заключается в попытке рассыпания при последовательных смещениях байта до тех пор, пока объект не будет успешно маринован, но это приведет к непредвиденным ошибкам. Похоже, что для некоторых комбинаций ввода неакционер выпадает из синхронизации и ничего не возвращает для остальной части файла (см. Код ниже).
import cPickle
import os
def stream_unpickle(file_obj):
while True:
start_pos = file_obj.tell()
try:
yield cPickle.load(file_obj)
except (EOFError, KeyboardInterrupt):
break
except (cPickle.UnpicklingError, ValueError, KeyError, TypeError, ImportError):
file_obj.seek(start_pos+1, os.SEEK_SET)
if __name__ == '__main__':
import random
from StringIO import StringIO
# create some data
sio = StringIO()
[cPickle.dump(random.random(), sio, cPickle.HIGHEST_PROTOCOL) for _ in xrange(1000)]
sio.flush()
# read from subsequent offsets and find discontinuous jumps in object count
size = sio.tell()
last_count = None
for step in xrange(size):
sio.seek(step, os.SEEK_SET)
count = sum(1 for _ in stream_unpickle(file_obj))
if last_count is None or count == last_count - 1:
last_count = count
elif count != last_count:
# if successful, these should never print (but they do...)
print '%d elements read from byte %d' % (count, step)
print '(%d elements read from byte %d)' % (last_count, step-1)
last_count = count
Можете ли вы изменить программу, которая создает файл, написать каждый маринад в отдельный файл? –