2016-02-10 3 views
0

У меня есть огромный текстовый файл, для которого я хочу создать словарь (счетчик). В настоящее время я делаю это с помощью следующего кода:Создание словаря из очень ОГРОМНОГО текстового файла многопоточным способом

with open(file_name) as input_doc: 
for line in input_doc: 
    for word in line.strip().split(): 
     vocab[word] += 1 

, но, так как файл огромен, он занимает много времени. Итак, я ищу более быстрый способ сделать это.

Наиболее прямолинейное решение, которое приходит в голову, заключается в хранении пучка строк в списке (небольшие партии) и обрабатывать каждую партию отдельно (параллельно с другими партиями), а в конце - слияние результатов. Таким образом, мы можем сэкономить много времени и можем обрабатывать ранее просмотренные партии (параллельно), в то время как основной поток читает следующую партию строк из файла.

что-то вроде:

buffer_size = 1000 
buff = [] 
vocab = Counter() 
number_of_sentences = 1 
with open(file_name) as input_doc: 
    for line in input_doc: 
     if number_of_sentences % buffer_size == 0: 
      vocab += update_dictionary(buff) ### Here I should create and call a new thread to work on the new batch 
      buff = [] 
     else 
      buff.append(line) 
      number_of_sentences += 1 

Здесь метод update_dictionary() считывает все предложения в данном списке и обновляет свой локальный словарь. Как только это будет сделано, его локальный словарь следует объединить с глобальным. Я пробовал пару часов, но, к сожалению, поскольку я никогда не реализовал многопоточный код на Python, мне не удалось заставить его работать. Не могли бы вы помочь мне реализовать эту идею?

спасибо.

+1

До тех пор, пока вы используете многопоточность 'cpython', вам не поможет. Замок Global Interpreter Lock (GIL) позволяет выполнять только один поток за раз. Есть и другие параллельные версии python, которые могут помочь. Вы можете немного ускориться, заменив часть 'for word in ...' на 'counter.update (слово в слово в line.strip(). Split())' и открыв файл с большим буфер. – tdelaney

+0

[Великий ресурс] (http://stackoverflow.com/questions/11196367/processing-single-file-from-multiple-processes-in-python) – Kevin

+0

Вы просмотрели библиотеку многопроцессорности. каждый процесс может использоваться для подсчета слов в части файла и будет иметь свой собственный посреднический словарь. вы можете затем использовать очередь Multiprocessing для обмена данными с основным процессом –

ответ

0

Модуль concurrent futures использует процессы вместо потоков, чтобы обойти GIL issue. Вы можете отправлять задания в пул, который будет обрабатываться параллельно. Когда вы отправляете задачу в threadpool, она возвращает объект, который представляет запущенную задачу (это называется будущим). Вы можете запустить несколько задач таким образом. Когда вы будете готовы получить результат задания, вы можете вызвать future.result(). Вот пример, который получает суммарную длину всех строк в списке параллельно:

from concurrent.futures import ThreadPoolExecutor 
from collections import defaultdict 

def runTask(lines): 
    counts = defaultdict(int) 
    for line in lines: 
     for word in line.split(): 
      counts[word] += 1 

    return counts 

pool = ThreadPoolExecutor(4) 
futures = [] 
chunkSize = 4 
lines = [] 

with open("test.txt") as f: 
    for line in f: 
     if len(lines) == chunkSize: 
      futures.append(pool.submit(runTask, lines)) 
      lines = [] 
     else: 
      lines.append(line) 

    if len(lines) > 0: 
     futures.append(pool.submit(runTask, lines)) 

# Sum up totals 
finalCount = defaultdict(int) 
for f in futures: 
    result = f.result() 
    for k in result: 
     finalCount[k] += result[k] 

for word in finalCount: 
    print("{0}: {1}".format(word, finalCount[word])) 

Это первая попытка помочь вам начать работу.

+0

Проводятся ли аргументы, переданные по трубам в подпроцессы? Если это так, это просто заменяет строки чтения из файла на чтение строк из канала. И вообще, Хакиму нужен словарь, созданный в его процессе верхнего уровня, поэтому я не думаю, что это поможет, так же круто, как есть. –

+0

Я не уверен. В качестве улучшения я бы начал с передачи смещений файла в подпроцесс, чтобы процесс верхнего уровня не должен был читать все строки. – tmajest

+0

Спасибо, tmajest. Я реализовал ваш подход, но не только он не улучшает производительность, но даже делает его худшим. Я заметил, что, хотя я установил количество потоков в 8, загрузка процессора составляет всего 140%, и, похоже, в управлении потоками много перегрузок. Таким образом, сравнивая время двух подходов (serial vs parallel), я заметил, что требуемое время значительно увеличилось (2 миллиона против 5 минут, соответственно в последовательных и параллельных настройках). – Hakim

0

Это звучит как пример канонического примера слов из всей литературы Map-Reduce. Если это что-то другое, кроме одноразового анализа, и ваш входной файл действительно огромен (как в Big Data), вы можете рассмотреть возможность использования Hadoop или Spark.

Самый первый пример на Спарк example page есть то, что вы могли бы копировать, почти дословно:

text_file = sc.textFile("file:///path/to/your/input/file") 
counts = text_file.flatMap(lambda line: line.strip().split()) \ 
        .map(lambda word: (word, 1)) \ 
        .reduceByKey(lambda a, b: a + b) 
vocab = dict(counts.collect()) 

Скачать Спарк и получить его работу на местном уровне, а затем масштабировать проблему в ЭМИ (с S3 для файла система), при необходимости.

Смежные вопросы