2013-08-21 3 views
3

У меня есть файл, который содержит много данных. Каждая строка - это запись. И я пытаюсь сделать некоторую работу ETL против всего файла. Сейчас я использую стандартный ввод для чтения данных по строкам. Замечательная вещь об этом - ваш скрипт может быть очень гибким, чтобы интегрироваться с другими командами сценария и оболочки. Я пишу результат на стандартный вывод. Например.Python Threading stdin/stdout

$ cat input_file 
line1 
line2 
line3 
line4 
... 

Мой текущий код питона выглядит следующим образом - parse.py

import sys 
for line in sys.stdin: 
    result = ETL(line) # ETL is some self defined function which takes a while to execute. 
    print result 

ниже код, как он работает прямо сейчас:

cat input_file | python parse.py > output_file 

Я посмотрел на модуль Threading Python, и мне интересно, будет ли производительность значительно улучшена, если я буду использовать этот модуль.

Вопрос 1: Как я должен планировать квоты для каждой темы, почему?

... 
counter = 0 
buffer = [] 
for line in sys.stdin: 
    buffer.append(line) 
    if counter % 5 == 0: # maybe assign 5 rows to each thread? if not, is there a rule of thumb to determine 
     counter = 0 
     thread = parser(buffer) 
     buffer = [] 
     thread.start() 

Вопрос2: Множественные нити может распечатать результат обратно на стандартный вывод в то же время, как организовать их и избежать ситуации ниже?

import threading 
import time 

class parser(threading.Thread): 
    def __init__ (self, data_input): 
     threading.Thread.__init__(self) 
     self.data_input = data_input 

    def run(self): 
     for elem in self.data_input: 
      time.sleep(3) 
      print elem + 'Finished' 

work = ['a', 'b', 'c', 'd', 'e', 'f'] 

thread1 = parser(['a', 'b']) 
thread2 = parser(['c', 'd']) 
thread3 = parser(['e', 'f']) 

thread1.start() 
thread2.start() 
thread3.start() 

Выход действительно некрасиво, где одна строка содержит выходы из двух потоков.

aFinished 
cFinishedeFinished 

bFinished 
fFinished 
dFinished 
+0

Не могли бы вы связать «Threading module of Python». Во всяком случае, нить не очень хорошо иметь при доступе к файлу, ИМХО. Вам нужно определить, к какому ядру можно получить доступ к тому, что и когда происходит через блокировки и семафоры и работы. Возможно, вы не увидите большого повышения производительности, поскольку большая часть работы связана с работой ввода-вывода, а не с работой ЦП. –

ответ

4

Принимая на ваш второй вопрос, это то, что mutexes для. Вы можете получить более чистый выходной сигнал, который вы хотите с помощью блокировки для координации между анализаторами и гарантировать, что только один поток имеет доступ к выходному потоку в течение заданного периода времени:

class parser(threading.Thread): 
    output_lock = threading.Lock() 

    def __init__ (self, data_input): 
     threading.Thread.__init__(self) 
     self.data_input = data_input 

    def run(self): 
     for elem in self.data_input: 
      time.sleep(3) 
      with self.output_lock: 
       print elem + 'Finished' 

Что касается первого вопроса, обратите внимание, что, вероятно, многопоточность не принесет никакой пользы для вашей конкретной рабочей нагрузки. Это во многом зависит от того, связана ли работа с каждой строкой ввода (ваша функция ETL) в основном связана с ЦП или IO-привязкой. Если первое (вероятно, я подозреваю), нити не помогут, из-за global interpreter lock. В этом случае вы хотели бы использовать модуль multiprocessing для распределения работы между несколькими процессами вместо нескольких потоков.

Но вы можете получить те же результаты с более легким для выполнения рабочего процесса: Разделить входной файл на n штук (используя, например, команду split); вызывать скрипт extract-and-transform отдельно для каждого подфайла; затем объедините полученные выходные файлы.

Один nitpick: «используя стандартный ввод для чтения данных по строкам, потому что он не загружает весь файл в память» подразумевает неправильное представление. Вы можете прочитать файл построчно внутри Python, например, путем замены sys.stdin с файлового объекта в конструкции, как:

for line in sys.stdin: 

Смотрите также readline() метод файловых объектов, и обратите внимание, что read() может принимать в качестве параметра максимальное количество прочитанных байтов.

+0

Много замечательных вещей в вашем посте, Alp. Я очень заинтересован в ваших комментариях CPU-bound/IO-bound. Мне интересно, у вас есть способ определить, сколько времени и ресурсов занимает CPU/IO? Кстати, они считают, что я использую stdIO, потому что вы можете интегрировать свой скрипт с Shell Command, который делает его намного гибким и удобным. Благодарим за исправление в «Память взаимопонимания». –

0

Независимо от того, будет ли резьба полезной, вы сильно зависите от своей ситуации. В частности, если ваша функция ETL() связана с большим количеством доступа к диску, то потоки, вероятно, дадут вам значительное значительное улучшение скорости.

В ответ на ваш первый вопрос, я всегда находил, что это просто зависит. При определении идеального количества потоков существует множество факторов, и многие из них зависят от программы. Например, если вы делаете большой доступ к диску (который довольно медленный), то вам потребуется больше потоков, чтобы использовать преимущества простоя в ожидании доступа к диску. Если программа связана с ЦП, то тонны потоков могут оказаться не очень полезными. Таким образом, хотя может быть возможно проанализировать все факторы, чтобы придумать идеальное количество потоков, обычно бывает намного быстрее сделать первоначальное предположение, а затем отрегулировать оттуда.

В частности, при назначении определенного количества строк для каждого потока, вероятно, не лучший способ развернуть работу. Рассмотрим, например, если одна линия занимает особенно много времени для обработки. Было бы лучше, если бы один поток мог отработать на одной линии, а другие потоки могли бы сделать еще несколько строк за это время. Лучший способ справиться с этим - использовать очередь. Если вы нажимаете каждую строку в очередь, каждый поток может вывести строку из очереди, обрабатывать ее и повторять до тех пор, пока очередь не будет пуста. Таким образом, работа распределяется таким образом, что нить никогда не будет работать (до конца, конечно).

Теперь, второй вопрос. Вы определенно правы, что писать на stdout из нескольких потоков одновременно не является идеальным решением. В идеале, вы должны устраивать вещи так, чтобы запись в stdout происходила только в одном месте. Один из лучших способов сделать это - использовать Очередь. Если каждый поток записывает свой вывод в общую очередь, вы можете создать дополнительный поток, единственной задачей которого является вытащить элементы из этой очереди и распечатать их на stdout. Ограничивая печать только одним потоком, вы избегаете проблем, присущих нескольким потокам, которые пытаются распечатать сразу.