2012-01-22 2 views
2

У меня есть приложение с привязкой к процессору, которое я хочу ускорить, используя многопроцессорную + поточную передачу вместо использования чистой потоковой версии. Я написал простое приложение для проверки производительности моего подхода и был удивлен, увидев, что многопроцессорные и многопроцессорные + потоковые версии работают хуже, чем поточные и последовательные версии.Устранение неполадок приложения с использованием многопроцессорной обработки + потоки в Python

В моем приложении у меня есть рабочая очередь, в которой хранится вся работа. Затем потоки выводят один рабочий элемент за раз, а затем обрабатывают его либо напрямую (потоковая версия), либо передавая его в процесс. Затем поток должен ждать, пока результат не появится, прежде чем продолжить следующую итерацию. Причина, по которой мне нужно выталкивать один рабочий элемент за раз, заключается в том, что работа динамична (не в случае с приложенным ниже приложением кода прототипа), и я не могу предварительно разбить работу и передать ее каждому потоку/процессу во время создания ,

Я хотел бы знать, что я делаю неправильно, и как я могу ускорить свое приложение.

Вот время выполнения, когда я работал на 16-основной машине:

Version  : 2.7.2 
Compiler  : GCC 4.1.2 20070925 (Red Hat 4.1.2-33) 
Platform  : Linux-2.6.24-perfctr-x86_64-with-fedora-8-Werewolf 
Processor : x86_64 
Num Threads/Processes: 8 ; Num Items: 16000 
mainMultiprocessAndThreaded exec time: 3505.97214699 ms 
mainPureMultiprocessing exec time: 2241.89805984 ms 
mainPureThreaded exec time: 309.767007828 ms 
mainSerial exec time: 52.3412227631 ms 
Terminating 

и вот код, который я использовал:

import threading 
import multiprocessing 
import time 
import platform 

class ConcurrentQueue: 
    def __init__(self): 
     self.data = [] 
     self.lock = threading.Lock() 

    def push(self, item): 
     self.lock.acquire() 
     try: 
      self.data.append(item) 
     finally: 
      self.lock.release() 
     return 

    def pop(self): 
     self.lock.acquire() 
     result = None 
     try: 
      length = len(self.data) 
      if length > 0: 
       result = self.data.pop() 
     finally: 
      self.lock.release() 
     return result 

    def isEmpty(self, item): 
     self.lock.acquire() 
     result = 0 
     try: 
      result = len(self.data) 
     finally: 
      self.lock.release() 
     return result != 0 


def timeFunc(passedFunc): 
    def wrapperFunc(*arg): 
     startTime = time.time() 
     result = passedFunc(*arg) 
     endTime = time.time() 
     elapsedTime = (endTime - startTime) * 1000 
     print passedFunc.__name__, 'exec time:', elapsedTime, " ms" 
     return result 
    return wrapperFunc 

def checkPrime(candidate): 
    # dummy process to do some work 
    for k in xrange(3, candidate, 2): 
     if candidate % k: 
      return False 
    return True 

def fillQueueWithWork(itemQueue, numItems): 
    for item in xrange(numItems, 2 * numItems): 
     itemQueue.push(item) 


@timeFunc 
def mainSerial(numItems): 
    jobQueue = ConcurrentQueue() 
    fillQueueWithWork(jobQueue, numItems) 

    while True: 
     dataItem = jobQueue.pop() 
     if dataItem is None: 
      break 
     # do work with dataItem 
     result = checkPrime(dataItem) 
    return 

# Start: Implement a pure threaded version 
def pureThreadFunc(jobQueue): 
    curThread = threading.currentThread() 
    while True: 
     dataItem = jobQueue.pop() 
     if dataItem is None: 
      break 
     # do work with dataItem 
     result = checkPrime(dataItem) 
    return 

@timeFunc 
def mainPureThreaded(numThreads, numItems): 
    jobQueue = ConcurrentQueue() 
    fillQueueWithWork(jobQueue, numItems) 

    workers = [] 
    for index in xrange(numThreads): 
     loopName = "Thread-" + str(index) 
     loopThread = threading.Thread(target=pureThreadFunc, name=loopName, args=(jobQueue,)) 
     loopThread.start() 
     workers.append(loopThread) 

    for worker in workers: 
     worker.join() 

    return 
# End: Implement a pure threaded version 

# Start: Implement a pure multiprocessing version 
def pureMultiprocessingFunc(jobQueue, resultQueue): 
    while True: 
     dataItem = jobQueue.get() 
     if dataItem is None: 
      break 
     # do work with dataItem 
     result = checkPrime(dataItem) 
     resultQueue.put_nowait(result) 
    return 

@timeFunc 
def mainPureMultiprocessing(numProcesses, numItems): 
    jobQueue = ConcurrentQueue() 
    fillQueueWithWork(jobQueue, numItems) 

    workers = [] 
    queueSize = (numItems/numProcesses) + 10 
    for index in xrange(numProcesses): 
     jobs = multiprocessing.Queue(queueSize) 
     results = multiprocessing.Queue(queueSize) 
     loopProcess = multiprocessing.Process(target=pureMultiprocessingFunc, args=(jobs, results,)) 
     loopProcess.start() 
     workers.append((loopProcess, jobs, results)) 

    processIndex = 0 
    while True: 
     dataItem = jobQueue.pop() 
     if dataItem is None: 
      break 
     workers[processIndex][1].put_nowait(dataItem) 

     processIndex += 1 
     if numProcesses == processIndex: 
      processIndex = 0 

    for worker in workers: 
     worker[1].put_nowait(None) 

    for worker in workers: 
     worker[0].join() 

    return 
# End: Implement a pure multiprocessing version 

# Start: Implement a threaded+multiprocessing version 
def mpFunc(processName, jobQueue, resultQueue): 
    while True: 
     dataItem = jobQueue.get() 
     if dataItem is None: 
      break 
     result = checkPrime(dataItem) 
     resultQueue.put_nowait(result) 
    return 

def mpThreadFunc(jobQueue): 
    curThread = threading.currentThread() 
    threadName = curThread.getName() 

    jobs = multiprocessing.Queue() 
    results = multiprocessing.Queue() 

    myProcessName = "Process-" + threadName 
    myProcess = multiprocessing.Process(target=mpFunc, args=(myProcessName, jobs, results,)) 
    myProcess.start() 

    while True: 
     dataItem = jobQueue.pop() 
     # put item to allow process to start 
     jobs.put_nowait(dataItem) 
     # terminate loop if work queue is empty 
     if dataItem is None: 
      break 
     # wait to get result from process 
     result = results.get() 
     # do something with result 
    return 

@timeFunc 
def mainMultiprocessAndThreaded(numThreads, numItems): 
    jobQueue = ConcurrentQueue() 
    fillQueueWithWork(jobQueue, numItems) 

    workers = [] 
    for index in xrange(numThreads): 
     loopName = "Thread-" + str(index) 
     loopThread = threading.Thread(target=mpThreadFunc, name=loopName, args=(jobQueue,)) 
     loopThread.start() 
     workers.append(loopThread) 

    for worker in workers: 
     worker.join() 

    return 
# End: Implement a threaded+multiprocessing version 

if __name__ == '__main__': 

    print 'Version  :', platform.python_version() 
    print 'Compiler  :', platform.python_compiler() 
    print 'Platform  :', platform.platform() 
    print 'Processor :', platform.processor() 

    numThreads = 8 
    numItems = 16000 #200000 

    print "Num Threads/Processes:", numThreads, "; Num Items:", numItems 

    mainMultiprocessAndThreaded(numThreads, numItems) 
    mainPureMultiprocessing(numThreads, numItems) 
    mainPureThreaded(numThreads, numItems) 
    mainSerial(numItems) 

    print "Terminating" 

Edit: Один из моих догадок медленности что Queue.put() заняты ожиданием, а не отказом от GIL. Если да, то какие-либо предложения по альтернативной структуре данных, которые я должен использовать?

+0

Насколько я знаю, многопоточность python не для ускорения выполнения. (возможно, я ошибаюсь, поэтому, пожалуйста, поправьте меня). [понимание GIL] (http://www.dabeaz.com/python/UnderstandingGIL.pdf) –

+0

Аналогичное исследование [Python - параллелизация задач, связанных с ЦП с многопроцессорной обработкой] (http://eli.thegreenplace.net/2012/01/ 16/python-parallelization-cpu-bound-tasks-with-multiprocessing /) – reclosedev

+0

@reclosedev Разница, которую я вижу, в примере есть все входные данные для цели, которые предоставляются заранее, вместо повторных вызовов queue.get(). Точно так же есть только одиночные вызовы result.get() для каждого процесса. – shams

ответ

5

Похоже, что вычислительная стоимость каждого элемента не перевешивает накладные расходы, связанные с отправкой работы на другой поток/процесс. Например, вот результаты я вижу при запуске вашего тестового приложения на моей машине (очень похоже на ваши результаты):

Version  : 2.7.1 
Compiler  : MSC v.1500 32 bit (Intel) 
Platform  : Windows-7-6.1.7601-SP1 
Processor : Intel64 Family 6 Model 30 Stepping 5, GenuineIntel 
Num Threads/Processes: 8 ; Num Items: 16000 
mainMultiprocessAndThreaded exec time: 1134.00006294 ms 
mainPureMultiprocessing exec time: 917.000055313 ms 
mainPureThreaded exec time: 111.000061035 ms 
mainSerial exec time: 41.0001277924 ms 
Terminating 

Если я изменить работу, которая, выполняемую на то, что это более вычислительно дорого, для пример:

def checkPrime(candidate): 
    i = 0; 
    for k in xrange(1,10000): 
     i += k 
    return i < 5000 

Тогда я вижу результаты, которые более соответствуют тому, что я думаю, что можно было бы ожидать:

Version  : 2.7.1 
Compiler  : MSC v.1500 32 bit (Intel) 
Platform  : Windows-7-6.1.7601-SP1 
Processor : Intel64 Family 6 Model 30 Stepping 5, GenuineIntel 
Num Threads/Processes: 8 ; Num Items: 16000 
mainMultiprocessAndThreaded exec time: 2190.99998474 ms 
mainPureMultiprocessing exec time: 2154.99997139 ms 
mainPureThreaded exec time: 16170.0000763 ms 
mainSerial exec time: 9143.00012589 ms 
Terminating 

вы также можете взглянуть на multiprocessing.Pool. Он предоставляет аналогичную модель тому, что вы описываете (несколько рабочих процессов вытягивают задания из общей очереди). Для примера, реализация может выглядеть примерно так:

@timeFunc 
def mainPool(numThreads, numItems): 
    jobQueue = ConcurrentQueue() 
    fillQueueWithWork(jobQueue, numItems) 

    pool = multiprocessing.Pool(processes=numThreads) 
    results = [] 
    while True: 
     dataItem = jobQueue.pop() 
     if dataItem == None: 
      break 
     results.append(pool.apply_async(checkPrime, dataItem)) 

    pool.close() 
    pool.join() 

На моей машине, с альтернативной checkPrime реализации, я вижу результат:

Version  : 2.7.1 
Compiler  : MSC v.1500 32 bit (Intel) 
Platform  : Windows-7-6.1.7601-SP1 
Processor : Intel64 Family 6 Model 30 Stepping 5, GenuineIntel 
Num Threads/Processes: 8 ; Num Items: 1600 
mainPool exec time: 1530.99989891 ms 
Terminating 

Поскольку multiprocessing.Pool уже обеспечивает безопасный доступ для вставки работы вы, вероятно, можете устранить свой ConcurrentQueue и вставить свою динамическую работу непосредственно в Pool.

+0

Большое спасибо за недостаток вычислительной работы, я смог воспроизвести ваши результаты. Я обязательно посмотрю пример Пула. – shams

2

Кажется, что ваша функция недостаточно вычислительна, чтобы перевесить накладные расходы многопроцессорной обработки. (Обратите внимание, что в Python MultiThreading не увеличивает ваши вычислительные ресурсы из-за GIL).

Ваша функция (checkPrime) на самом деле не проверяет правильность, а скорее очень быстро возвращается, заменяя ее простой (и наивной) первичкой, результат такой, как ожидалось.

Однако посмотрите на Use Python pool.map to have multiple processes perform operations on a list, чтобы легко использовать многопроцессорную обработку. Обратите внимание, что есть встроенные типы, чтобы выполнить задачу очереди, такие как очереди, см http://docs.python.org/library/multiprocessing.html#multiprocessing-managers

def checkPrime(candidate): 
    # dummy process to do some work 
    for k in xrange(3, candidate): 
     if not candidate % k: 
      return False 
    return True 

и пример «быстрая» реализация:

@timeFunc 
def speedy(numThreads,numItems): 
    pool = multiprocessing.Pool(numThreads) #note the default will use the optimal number of workers 

    for i in xrange(numItems, 2 * numItems): 
     pool.apply_async(checkPrime,i) 
    pool.close() 
    pool.join() 

что почти в два раза быстрее!

[email protected]:~$ python test.py 
Version  : 2.6.6 
Compiler  : GCC 4.4.5 
Platform  : Linux-2.6.35-32-generic-x86_64-with-Ubuntu-10.10-maverick 
Processor : 
Num Threads/Processes: 8 ; Num Items: 16000 
mainSerial exec time: 5555.76992035 ms 
mainMultiprocessAndThreaded exec time: 4721.43602371 ms 
mainPureMultiprocessing exec time: 4440.83094597 ms 
mainPureThreaded exec time: 10829.3449879 ms 
speedy exec time: 1898.72503281 ms 
+0

Большое спасибо за улов. Ваш ответ аналогичен вашему ответу на @DRH. К несчастью, я могу принять только один ответ :( – shams

Смежные вопросы