2010-06-13 4 views
243

Есть класс пула для рабочего темы, аналогично модулю многопроцессорной обработки Pool class?Пул потоков, похожий на многопроцессорный пул?

Мне нравится, например, простой способ параллелизовать регулировочной

def long_running_func(p): 
    c_func_no_gil(p) 

p = multiprocessing.Pool(4) 
xs = p.map(long_running_func, range(100)) 

однако я хотел бы сделать это без накладных расходов на создание новых процессов.

Я знаю о GIL. Тем не менее, в моей функции usecase, функция будет связанной с IO функцией C, для которой оболочка python освободит GIL до фактического вызова функции.

Должен ли я писать свой собственный пул потоков?

ответ

316

Я только что узнал, что там на самом деле является поток на основе бассейн интерфейса в multiprocessing модуля, однако он скрыт несколько и неправильно документированы.

Это могут быть импортированы с помощью

from multiprocessing.pool import ThreadPool 

Он реализован с использованием фиктивного класс процесса упаковки питона нить. Этот технологический класс, основанный на потоках, можно найти в multiprocessing.dummy, который кратко упоминается в docs. Этот фиктивный модуль предположительно обеспечивает весь интерфейс многопроцессорности на основе потоков.

+4

Это потрясающе. У меня возникла проблема с созданием ThreadPools вне основного потока, вы можете использовать их из дочернего потока, когда-то созданного. Я поставил для него проблему: http://bugs.python.org/issue10015 – Olson

+42

Я не понимаю, почему у этого класса нет документации. Такие вспомогательные классы так важны в наши дни. – Wernight

+8

@Wernight: он не является публичным в первую очередь потому, что никто не предложил патч, который предоставляет ему (или что-то подобное) как threading.ThreadPool, включая документацию и тесты. Это действительно хорошая батарея для включения в стандартную библиотеку, но этого не произойдет, если никто ее не напишет. Одно из достоинств этой существующей реализации в многопроцессорной обработке заключается в том, что она должна сделать любой такой патч для потоковой передачи * намного легче написать (http://docs.python.org/devguide/) – ncoghlan

2

Нет встроенного пула на основе потоков. Тем не менее, может быть очень быстро реализовать очередь производителей/потребителей с классом Queue.

От: https://docs.python.org/2/library/queue.html

from threading import Thread 
from Queue import Queue 
def worker(): 
    while True: 
     item = q.get() 
     do_work(item) 
     q.task_done() 

q = Queue() 
for i in range(num_worker_threads): 
    t = Thread(target=worker) 
    t.daemon = True 
    t.start() 

for item in source(): 
    q.put(item) 

q.join()  # block until all tasks are done 
+3

Это больше не относится к модулю 'concurrent.futures'. – Thanatos

+7

Я больше не думаю, что это правда. 'from multiprocessing.pool import ThreadPool' – ranman

2

Надразумной создания новых процессов минимален, особенно, когда это только 4 из них. Я сомневаюсь, что это высокая производительность вашего приложения. Держите его простым, оптимизируйте, где вам нужно, и где указывают результаты профилирования.

+3

Если вопросник находится под Windows (который, как я полагаю, не указан), я думаю, что процесс раскрутки процесса может быть значительным. По крайней мере, это касается проектов, которые я недавно делал. :-) –

33

Для чего-то очень простого и легкого (слегка видоизмененного от here):

from Queue import Queue 
from threading import Thread 


class Worker(Thread): 
    """Thread executing tasks from a given tasks queue""" 
    def __init__(self, tasks): 
     Thread.__init__(self) 
     self.tasks = tasks 
     self.daemon = True 
     self.start() 

    def run(self): 
     while True: 
      func, args, kargs = self.tasks.get() 
      try: 
       func(*args, **kargs) 
      except Exception, e: 
       print e 
      finally: 
       self.tasks.task_done() 


class ThreadPool: 
    """Pool of threads consuming tasks from a queue""" 
    def __init__(self, num_threads): 
     self.tasks = Queue(num_threads) 
     for _ in range(num_threads): 
      Worker(self.tasks) 

    def add_task(self, func, *args, **kargs): 
     """Add a task to the queue""" 
     self.tasks.put((func, args, kargs)) 

    def wait_completion(self): 
     """Wait for completion of all the tasks in the queue""" 
     self.tasks.join() 

if __name__ == '__main__': 
    from random import randrange 
    from time import sleep 

    delays = [randrange(1, 10) for i in range(100)] 

    def wait_delay(d): 
     print 'sleeping for (%d)sec' % d 
     sleep(d) 

    pool = ThreadPool(20) 

    for i, d in enumerate(delays): 
     pool.add_task(wait_delay, d) 

    pool.wait_completion() 

Для поддержки обратных вызовов по завершению задачи вы можете просто добавить функцию обратный вызова для целевого кортежа.

+0

Каким образом нити могут соединяться, если они безоговорочно замкнуты? –

117

В Python 3 можно использовать concurrent.futures.ThreadPoolExecutor, т.е .:

executor = ThreadPoolExecutor(max_workers=10) 
a = executor.submit(my_function) 

Смотрите docs для получения дополнительной информации и примеров.

+0

Что это значит, что других ответов нет? –

+26

@AustinHenley - более чистый, более документированный, более канонический API. –

+19

Он также был отправлен обратно на Python 2.5-2.7 https://pypi.python.org/pypi/futures – crusaderky

39

Да, и у него есть (более или менее) тот же API.

import multiprocessing 

def worker(lnk): 
    ....  
def start_process(): 
    ..... 
.... 

if(PROCESS): 
    pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process) 
else: 
    pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, 
              initializer=start_process) 

pool.map(worker, inputs) 
.... 
+6

Путь импорта для' ThreadPool' отличается от 'Pool'. Правильный импорт - это 'from multiprocessing.pool import ThreadPool'. – Marigold

+0

Странно, что это не документированный API, а multiprocessing.pool кратко упоминается как предоставление AsyncResult. Но он доступен в версиях 2.x и 3.x. – Marvin

3

Привет использовать пул потоков в Python вы можете использовать эту библиотеку:

from multiprocessing.dummy import Pool as ThreadPool 

, а затем использовать это библиотека сделать так:

pool = ThreadPool(threads) 
results = pool.map(service, tasks) 
pool.close() 
pool.join() 
return results 

нити являются количество потоков, которые вы хотите, и задачи - это список задач, большинство из которых относятся к службе.

+0

Спасибо, это отличное предложение! Из документов: multiprocessing.dummy реплицирует API многопроцессорности, но не более чем обертка вокруг модуля потоковой передачи. Одна коррекция - я думаю, вы хотите сказать, что пул api (функция, итерабельность) – layser

Смежные вопросы