2013-08-21 7 views
5

Как ограничить количество параллельных потоков в Python?Как ограничить количество параллельных потоков в Python?

Например, у меня есть каталог со многими файлами, и я хочу обработать все из них, но только по 4 параллельно.

Вот то, что я до сих пор:

def process_file(fname): 
     # open file and do something                        

def process_file_thread(queue, fname): 
    queue.put(process_file(fname)) 

def process_all_files(d): 
    files=glob.glob(d + '/*') 
    q=Queue.Queue() 
    for fname in files: 
     t=threading.Thread(target=process_file_thread, args=(q, fname)) 
     t.start() 
    q.join() 

def main(): 
    process_all_files('.') 
    # Do something after all files have been processed 

Как я могу изменить код таким образом, что только 4 нити выполняются в то время?

Обратите внимание, что я хочу подождать, пока все файлы будут обработаны, а затем продолжите работу и обработайте обработанные файлы.

+2

Вы пробовали [нескольких процессов] (http://docs.python.org/2/library/multiprocessing.html # модуль-многопроцессорность) Бассейны? На Python 3 вы также можете использовать [фьючерсы] (http://docs.python.org/dev/library/concurrent.futures.html). – javex

+2

Вы также можете использовать ['futures'] (https://pypi.python.org/pypi/futures) в Python 2, вам просто нужно установить backport. – abarnert

+0

concurrent.futures действительно лучший способ сделать это. – JBernardo

ответ

7

Например, у меня есть каталог со многими файлами, и я хочу обрабатывать все из них, но только по 4 параллельно.

Это именно то, что делает пул потоков: вы создаете задания, а пул работает 4 параллельно. Вы можете сделать вещи еще проще, используя исполнителя, где вы просто передаете его функции (или другие вызовы), и он возвращает вам фьючерсы на результаты. Вы можете построить все это самостоятельно, но вам не обязательно. *

Модуль stdlib concurrent.futures - это самый простой способ сделать это. (Для Python 3.1 и ранее см. backport.) Фактически, one of the main examples очень близок к тому, что вы хотите сделать. Но давайте адаптировать его к точному прецеденту:

def process_all_files(d): 
    files = glob.glob(d + '/*') 
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: 
     fs = [executor.submit(process_file, file) for file in files] 
     concurrent.futures.wait(fs) 

Если вы хотите process_file вернуть что-то, что почти так же легко:

def process_all_files(d): 
    files = glob.glob(d + '/*') 
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: 
     fs = [executor.submit(process_file, file) for file in files] 
     for f in concurrent.futures.as_completed(fs): 
      do_something(f.result()) 

И если вы хотите, чтобы обрабатывать исключения тоже ... хорошо, просто посмотрите на примере; это всего лишь try/except вокруг звонка result().


* Если вы хотите построить их самостоятельно, это не так сложно. Источник в multiprocessing.pool хорошо написан и прокомментирован, и не так сложно, и большая часть жесткого материала не имеет отношения к потоковому использованию; источник до concurrent.futures еще проще.

0

Я использовал эту технику несколько раз, я думаю, что это немного некрасиво мысль:

import threading 

def process_something(): 
    something = list(get_something) 

    def worker(): 
     while something: 
      obj = something.pop() 
      # do something with obj 

    threads = [Thread(target=worker) for i in range(4)] 
    [t.start() for t in threads] 
    [t.join() for t in threads]