2011-01-26 3 views
9

Описание проблемы, с которой я сталкиваюсь, немного сложнее, и я буду ошибаться на стороне предоставления более полной информации. Для нетерпеливого, здесь кратчайший способ я могу резюмировать:Эффективные методы буферизации и сканирования файлов для больших файлов в python

Что самое быстрое (наималейшее выполнения времени) способ разделить текстовый файл на ALL (перекрытие) подстрок размера N (связанно N , например 36) , выкидывая символы новой строки.

Я пишу модуль, который анализирует файлы в формате генома на основе ФАСИ. Эти файлы содержат так называемый человеческий справочный геном 'hg18', который вы можете скачать с UCSC genome browser (поймите пули!), Если хотите.

Как вы заметили, файлы генома состоят из chr [1..22] .fa и chr [XY] .fa, а также набор других небольших файлов, которые не используются в этом модуле.

Несколько модулей уже существуют для разбора файлов FASTA, таких как SeqIO BioPython. (К сожалению, я бы опубликовал ссылку, но пока у меня нет таких моментов). К сожалению, каждый модуль, который я смог найти, не выполняет определенную операцию, которую я пытаюсь сделать.

Моему модулю необходимо разделить данные генома (например, CAGTACGTCAGACTATACGGAGCTA) может быть строкой для каждой перекрывающейся подстроки N-длины. Позвольте мне привести пример, используя очень маленький файл (фактические хромосомные файлы между 355 и 20 миллионов символов) и N = 8

 
>>>import cStringIO 
>>>example_file = cStringIO.StringIO("""\ 
>header 
CAGTcag 
TFgcACF 
""") 
>>>for read in parse(example_file): 
... print read 
... 
CAGTCAGTF 
AGTCAGTFG 
GTCAGTFGC 
TCAGTFGCA 
CAGTFGCAC 
AGTFGCACF 

функция, которую я нашел имел абсолютную максимальную производительность методов I Could думать так:


def parse(file): 
    size = 8 # of course in my code this is a function argument 
    file.readline() # skip past the header 
    buffer = '' 
    for line in file: 
    buffer += line.rstrip().upper() 
    while len(buffer) >= size: 
     yield buffer[:size] 
     buffer = buffer[1:] 

Это работает, но, к сожалению, до сих пор занимает около 1,5 часов (смотрите примечание ниже) для анализа генома человека таким образом. Возможно, это самое лучшее, что я увижу с помощью этого метода (полный рефакторинг кода может быть в порядке, но я бы хотел его избежать, поскольку этот подход имеет некоторые особые преимущества в других областях кода), но я подумал, что я передам это сообществу.

Спасибо!

  • Примечание. На этот раз включает в себя множество дополнительных вычислений, таких как вычисление противовесной строки и выполнение поиска хеш-таблиц на хеше размером примерно 5G.

пост-ответ вывод: Оказывается, что с помощью fileobj.read(), а затем манипулировать результирующую строку (string.replace() и т.д.) занимает относительно немного времени и памяти по сравнению с остальной частью программы, и поэтому я использовал этот подход. Всем спасибо!

ответ

3

Некоторые классические изменения, связанные с IO.

  • Используйте операцию чтения более низкого уровня, например os.read, и читайте в большом фиксированном буфере.
  • Использовать многопоточность/многопроцессорную обработку, где читаются и буферизуются и другие процессы.
  • Если у вас есть несколько процессоров/машин, используйте многопроцессорную обработку/mq, чтобы разделить обработку на процессорах ala map-reduce.

Использование операции чтения более низкого уровня не будет таким переписанным. Остальные будут довольно большими переписываниями.

+1

Os.read, похоже, не имеет существенного отличия (результаты в моем ответе). –

+0

Спасибо за ответ. Я рассматриваю рефакторинг проблемы для подхода с уменьшением карты, но я должен признать, что мой нерешительность проистекает из того, что он не видел алгоритм в действии и, таким образом, немного наивен. Не могли бы вы порекомендовать мне какую-нибудь конкретную литературу? – eblume

+0

Я тоже добавлю это - я на самом деле уже распараллеливаюсь в очень высокой степени, что показывает значительные преимущества IO. После того, как я задал этот вопрос, я начинаю подозревать, что моя проблема на самом деле не IO вообще, а преобразования, которые я делаю в этом IO. – eblume

3

Я подозреваю, что проблема в том, что у вас есть столько данных, хранятся в виде строки, которая действительно расточительно для случая использования, что вы бежите из оперативной памяти и обмолота подкачки. 128 GB должен быть достаточно, чтобы избежать этого ... :)

Поскольку вы указали в комментариях, что вам нужно хранить дополнительную информацию в любом случае, отдельный класс, который ссылается родитель строка будет мой выбор.Я провел короткий тест, используя chr21.fa от chromFa.zip от hg18; файл составляет около 48 МБ и чуть меньше 1 М строк. У меня только 1 ГБ памяти, поэтому я просто отбрасываю объекты после этого. Этот тест, таким образом, не будет показывать проблемы с фрагментацией, кэш, или связанной с ними, но я думаю, что она должна быть хорошей отправной точкой для измерения пропускной способности парсинга:

import mmap 
import os 
import time 
import sys 

class Subseq(object): 
    __slots__ = ("parent", "offset", "length") 

    def __init__(self, parent, offset, length): 
    self.parent = parent 
    self.offset = offset 
    self.length = length 

    # these are discussed in comments: 
    def __str__(self): 
    return self.parent[self.offset:self.offset + self.length] 

    def __hash__(self): 
    return hash(str(self)) 

    def __getitem__(self, index): 
    # doesn't currently handle slicing 
    assert 0 <= index < self.length 
    return self.parent[self.offset + index] 

    # other methods 

def parse(file, size=8): 
    file.readline() # skip header 
    whole = "".join(line.rstrip().upper() for line in file) 
    for offset in xrange(0, len(whole) - size + 1): 
    yield Subseq(whole, offset, size) 

class Seq(object): 
    __slots__ = ("value", "offset") 
    def __init__(self, value, offset): 
    self.value = value 
    self.offset = offset 

def parse_sep_str(file, size=8): 
    file.readline() # skip header 
    whole = "".join(line.rstrip().upper() for line in file) 
    for offset in xrange(0, len(whole) - size + 1): 
    yield Seq(whole[offset:offset + size], offset) 

def parse_plain_str(file, size=8): 
    file.readline() # skip header 
    whole = "".join(line.rstrip().upper() for line in file) 
    for offset in xrange(0, len(whole) - size + 1): 
    yield whole[offset:offset+size] 

def parse_tuple(file, size=8): 
    file.readline() # skip header 
    whole = "".join(line.rstrip().upper() for line in file) 
    for offset in xrange(0, len(whole) - size + 1): 
    yield (whole, offset, size) 

def parse_orig(file, size=8): 
    file.readline() # skip header 
    buffer = '' 
    for line in file: 
    buffer += line.rstrip().upper() 
    while len(buffer) >= size: 
     yield buffer[:size] 
     buffer = buffer[1:] 

def parse_os_read(file, size=8): 
    file.readline() # skip header 
    file_size = os.fstat(file.fileno()).st_size 
    whole = os.read(file.fileno(), file_size).replace("\n", "").upper() 
    for offset in xrange(0, len(whole) - size + 1): 
    yield whole[offset:offset+size] 

def parse_mmap(file, size=8): 
    file.readline() # skip past the header 
    buffer = "" 
    for line in file: 
    buffer += line 
    if len(buffer) >= size: 
     for start in xrange(0, len(buffer) - size + 1): 
     yield buffer[start:start + size].upper() 
     buffer = buffer[-(len(buffer) - size + 1):] 
    for start in xrange(0, len(buffer) - size + 1): 
    yield buffer[start:start + size] 

def length(x): 
    return sum(1 for _ in x) 

def duration(secs): 
    return "%dm %ds" % divmod(secs, 60) 


def main(argv): 
    tests = [parse, parse_sep_str, parse_tuple, parse_plain_str, parse_orig, parse_os_read] 
    n = 0 
    for fn in tests: 
    n += 1 
    with open(argv[1]) as f: 
     start = time.time() 
     length(fn(f)) 
     end = time.time() 
     print "%d %-20s %s" % (n, fn.__name__, duration(end - start)) 

    fn = parse_mmap 
    n += 1 
    with open(argv[1]) as f: 
    f = mmap.mmap(f.fileno(), 0, mmap.MAP_PRIVATE, mmap.PROT_READ) 
    start = time.time() 
    length(fn(f)) 
    end = time.time() 
    print "%d %-20s %s" % (n, fn.__name__, duration(end - start)) 


if __name__ == "__main__": 
    sys.exit(main(sys.argv)) 

1 parse     1m 42s 
2 parse_sep_str   1m 42s 
3 parse_tuple   0m 29s 
4 parse_plain_str  0m 36s 
5 parse_orig   0m 45s 
6 parse_os_read   0m 34s 
7 parse_mmap   0m 37s 

Первые четыре мой код, а orig - ваш, а последние два - из других ответов.

Определенные пользователем объекты гораздо более дороги для создания и сбора, чем кортежи или простые строки! Это не должно быть так удивительно, но я не понял, что это сильно изменит ситуацию (сравните # 1 и # 3, которые действительно отличаются только в пользовательском классе vs tuple). Вы сказали, что хотите сохранить дополнительную информацию, такую ​​как offset, со строкой в ​​любом случае (как в случаях синтаксического анализа и parse_sep_str), поэтому вы можете рассмотреть возможность внедрения этого типа в модуль расширения C. Посмотрите на Cython и сопутствуйте, если вы не хотите писать C напрямую.

Ожидается случай №1 и №2: указав на родительскую строку, я пытался сохранить память, а не время обработки, но этот тест не измеряет это.

+0

Спасибо за ответ, но я должен не согласиться на два момента. Во-первых, этот метод использует очень мало памяти.Предполагая, что результат возврата не сохраняется долго (это, но см. Пункт 2), нет объектов, которые будут придерживаться без сбора. Он близок к фиксированной памяти. Пункт 2 заключается в том, что я тщательно контролирую использование ОЗУ, и я хорошо разбираюсь в проводке. (Машина, на которой я запущена, - это зверь с 128 гигабайтами оперативной памяти, и процесс использует только ~ 5G, а не из этого кода.) – eblume

+0

@Fred: ... Я смущен целью создания экземпляра новый объект (Subseq) каждый раз через этот цикл. Почему бы не просто «получить целое [смещение: смещение + размер]» – Gerrat

+0

@eblume: достаточно справедливо; Я действительно думал, что вы сохранили результат, и именно здесь я хотел сохранить память, а не в самой функции. Вы все еще можете рассмотреть это, так как это использует меньше памяти (так может быть лучше в кеше), и это приведет к уменьшению фрагментации кучи и т. Д. Геном огромен, но даже в этом случае я нахожу 1,5 часа, учитывая спецификации вашей машины; адаптация чего-то в соответствии с тем, что у меня есть в функции синтаксического анализа, может помочь с некоторыми из них. –

4

Не могли бы вы воспроизвести файл и начать клевать через него с помощью скользящего окна? Я написал глупую небольшую программу, которая работает довольно небольшой:

USER  PID %CPU %MEM VSZ RSS TTY  STAT START TIME COMMAND 
sarnold 20919 0.0 0.0 33036 4960 pts/2 R+ 22:23 0:00 /usr/bin/python ./sliding_window.py 

Работа через 636229 байт Fasta файл (найден через http://biostar.stackexchange.com/questions/1759) занял .383 секунды.

#!/usr/bin/python 

import mmap 
import os 

    def parse(string, size): 
    stride = 8 
    start = string.find("\n") 
    while start < size - stride: 
     print string[start:start+stride] 
     start += 1 

fasta = open("small.fasta", 'r') 
fasta_size = os.stat("small.fasta").st_size 
fasta_map = mmap.mmap(fasta.fileno(), 0, mmap.MAP_PRIVATE, mmap.PROT_READ) 
parse(fasta_map, fasta_size) 
+0

Поскольку весь файл обрабатывается, вы не закончили бы вытеснение памяти? – dietbuddha

+0

@ dietbuddha, то есть риск, на 32-битной машине самый большой файл, который вы могли бы обработать, был бы где-то около двух гигабайт, на 64-битной машине вы могли бы, конечно, намного увеличить. Надеемся, что алгоритм замены страницы ОС может обрабатывать последовательное чтение через mmapped сегмент, чтобы уменьшить размер резидентного набора. – sarnold

+0

@Fred Nurk, я забыл, что у меня есть гораздо больший файл fasta, сидящий вокруг: 242838416 байт в 2m23.355s. 1,9 млн. Строк продукции каждую секунду. – sarnold

1

У меня есть функция для обработки текстового файла и использования буфера при чтении и записи и параллельных вычислениях с помощью асинхронного пула рабочих процессов. У меня есть AMD 2 ядра, 8 ГБ оперативной памяти, с gnu/linux и может обрабатывать 300000 строк менее чем за 1 секунду, 1000000 строк примерно за 4 секунды и приблизительно 4500000 строк (более 220 МБ) примерно за 20 секунд:

# -*- coding: utf-8 -*- 
import sys 
from multiprocessing import Pool 

def process_file(f, fo="result.txt", fi=sys.argv[1]): 
    fi = open(fi, "r", 4096) 
    fo = open(fo, "w", 4096) 
    b = [] 
    x = 0 
    result = None 
    pool = None 
    for line in fi: 
     b.append(line) 
     x += 1 
     if (x % 200000) == 0: 
      if pool == None: 
       pool = Pool(processes=20) 
      if result == None: 
       result = pool.map_async(f, b) 
      else: 
       presult = result.get() 
       result = pool.map_async(f, b) 
       for l in presult: 
        fo.write(l) 
      b = [] 
    if not result == None: 
     for l in result.get(): 
      fo.write(l) 
    if not b == []: 
     for l in b: 
      fo.write(f(l)) 
    fo.close() 
    fi.close() 

Первый аргумент - это функция, которая возвращает одну строку, результат процесса и результата для записи в файл, следующий - файл вывода, а последний - файл ввода (вы не можете использовать последний аргумент, если вы получаете в качестве первого параметра в файле сценария ввода).

Смежные вопросы