2013-11-18 3 views
7

Мне нужно выполнить пул из множества параллельных подключений и запросов к базе данных. Я хотел бы использовать multiprocessing.Pool или concurrent.futures ProcessPoolExecutor. Python 2.7.5python multiprocessing.Pool kill * specific * long running или hung process

В некоторых случаях запросы запросов занимают слишком много времени или никогда не будут завершены (процесс зависания/зомби). Я хотел бы убить конкретный процесс из многопроцессорного процесса.Pool или concurrent.futures ProcessPoolExecutor, который истекло.

Вот пример того, как убить/повторно запустить весь пул процессов, но в идеале я бы свести к минимуму перегрев процессора, так как я хочу только убить определенный длительный процесс, который не возвратил данные после таймаута секунд.

По какой-то причине приведенный ниже код, похоже, не может завершить/объединить пул процессов после того, как все результаты будут возвращены и завершены. Возможно, это связано с убийством рабочих процессов, когда происходит тайм-аут, однако Пул создает новых работников, когда они убиты, а результаты ожидаются.

from multiprocessing import Pool 
import time 
import numpy as np 
from threading import Timer 
import thread, time, sys 

def f(x): 
    time.sleep(x) 
    return x 

if __name__ == '__main__': 
    pool = Pool(processes=4, maxtasksperchild=4) 

    results = [(x, pool.apply_async(f, (x,))) for x in np.random.randint(10, size=10).tolist()] 

    while results: 
     try: 
      x, result = results.pop(0) 
      start = time.time() 
      print result.get(timeout=5), '%d done in %f Seconds!' % (x, time.time()-start) 

     except Exception as e: 
      print str(e) 
      print '%d Timeout Exception! in %f' % (x, time.time()-start) 
      for p in pool._pool: 
       if p.exitcode is None: 
        p.terminate() 

    pool.terminate() 
    pool.join() 

ответ

4

Я не полностью понимаю ваш вопрос. Вы говорите, что хотите остановить один конкретный процесс, но затем, на этапе обработки исключений, вы вызываете завершение на всех заданиях. Не знаете, почему вы это делаете. Кроме того, я уверен, что использование внутренних переменных от multiprocessing.Pool не совсем безопасно. Сказав все это, я думаю, ваш вопрос в том, почему эта программа не заканчивается, когда происходит тайм-аут. Если это проблема, то делает следующее:

from multiprocessing import Pool 
import time 
import numpy as np 
from threading import Timer 
import thread, time, sys 

def f(x): 
    time.sleep(x) 
    return x 

if __name__ == '__main__': 
    pool = Pool(processes=4, maxtasksperchild=4) 

    results = [(x, pool.apply_async(f, (x,))) for x in np.random.randint(10, size=10).tolist()] 

    result = None 
    start = time.time() 
    while results: 
     try: 
      x, result = results.pop(0) 
      print result.get(timeout=5), '%d done in %f Seconds!' % (x, time.time()-start) 
     except Exception as e: 
      print str(e) 
      print '%d Timeout Exception! in %f' % (x, time.time()-start) 
      for i in reversed(range(len(pool._pool))): 
       p = pool._pool[i] 
       if p.exitcode is None: 
        p.terminate() 
       del pool._pool[i] 

    pool.terminate() 
    pool.join() 

Дело в том, что вам необходимо удалить предметы из бассейна; просто вызвать завершение на них недостаточно.

0

В вашем решении вы подделываете внутренние переменные самого пула. Пул полагается на 3 разных потока для правильной работы, небезопасно вмешиваться в их внутренние переменные, не имея действительно знает, что вы делаете.

Не существует чистого способа остановить процессы синхронизации в стандартных пулах Python, но есть альтернативные реализации, которые раскрывают такую ​​функцию.

Вы можете посмотреть на следующих библиотек:

pebble

billiard

0

Чтобы избежать доступа к внутренним переменным вы можете сохранить multiprocessing.current_process().pid от исполняющего задачи в общей памяти. Затем перейдите по multiprocessing.active_children() из основного процесса и уничтожьте цель pid, если она существует.
Однако после такого внешнего прекращения рабочих, они воссозданы, но бассейн становится nonjoinable, а также требует явного прекращения перед join()

+0

Это предполагает, что нет других активных детей, которые ** не являются ** частью бассейна. –

0

я наткнулся на эту проблему.

Исходный код и отредактированная версия @stacksia имеют такую ​​же проблему: в обоих случаях он убьет все текущие запущенные процессы, когда тайм-аут достигнут только для одного из процессов (т.е. когда цикл завершен pool._pool) ,

Найти ниже моего решения. Он включает в себя создание файла .pid для каждого рабочего процесса, как это было предложено компанией @luart. Он будет работать, если есть способ пометить каждый рабочий процесс (в приведенном ниже коде x выполняет это задание). Если у кого-то есть более элегантное решение (например, сохранение PID в памяти), пожалуйста, поделитесь им.

#!/usr/bin/env python 

from multiprocessing import Pool 
import time, os 
import subprocess 

def f(x): 
    PID = os.getpid() 
    print 'Started:', x, 'PID=', PID 

    pidfile = "/tmp/PoolWorker_"+str(x)+".pid" 

    if os.path.isfile(pidfile): 
     print "%s already exists, exiting" % pidfile 
     sys.exit() 

    file(pidfile, 'w').write(str(PID)) 

    # Do the work here 
    time.sleep(x*x) 

    # Delete the PID file 
    os.remove(pidfile) 

    return x*x 


if __name__ == '__main__': 
    pool = Pool(processes=3, maxtasksperchild=4) 

    results = [(x, pool.apply_async(f, (x,))) for x in [1,2,3,4,5,6]] 

    pool.close() 

    while results: 
     print results 
     try: 
      x, result = results.pop(0) 
      start = time.time() 
      print result.get(timeout=3), '%d done in %f Seconds!' % (x, time.time()-start) 

     except Exception as e: 
      print str(e) 
      print '%d Timeout Exception! in %f' % (x, time.time()-start) 

      # We know which process gave us an exception: it is "x", so let's kill it! 

      # First, let's get the PID of that process: 
      pidfile = '/tmp/PoolWorker_'+str(x)+'.pid' 
      PID = None 
      if os.path.isfile(pidfile): 
       PID = str(open(pidfile).read()) 
       print x, 'pidfile=',pidfile, 'PID=', PID 

      # Now, let's check if there is indeed such process runing: 
      for p in pool._pool: 
       print p, p.pid 
       if str(p.pid)==PID: 
        print 'Found it still running!', p, p.pid, p.is_alive(), p.exitcode 

        # We can also double-check how long it's been running with system 'ps' command:" 
        tt = str(subprocess.check_output('ps -p "'+str(p.pid)+'" o etimes=', shell=True)).strip() 
        print 'Run time from OS (may be way off the real time..) = ', tt 

        # Now, KILL the m*[email protected]: 
        p.terminate() 
        pool._pool.remove(p) 
        pool._repopulate_pool() 

        # Let's not forget to remove the pidfile 
        os.remove(pidfile) 

        break 

    pool.terminate() 
    pool.join() 

Многие люди предлагают гальки. Он выглядит неплохо, но доступен только для Python 3. Если у кого-то есть способ получить гальку, импортированную для python 2.6, было бы здорово.

+0

Pebble поддерживает python 2, но он был протестирован только на python 2.7. – noxdafox