Я пытаюсь написать интерактивную оболочку (для использования в ipython) для библиотеки, которая управляет некоторым оборудованием. Некоторые вызовы тяжелы для ввода-вывода, поэтому имеет смысл параллельно выполнять задачи. Использование ThreadPool (почти) работает хорошо:Остановка процессов в ThreadPool в Python
from multiprocessing.pool import ThreadPool
class hardware():
def __init__(IPaddress):
connect_to_hardware(IPaddress)
def some_long_task_to_hardware(wtime):
wait(wtime)
result = 'blah'
return result
pool = ThreadPool(processes=4)
Threads=[]
h=[hardware(IP1),hardware(IP2),hardware(IP3),hardware(IP4)]
for tt in range(4):
task=pool.apply_async(h[tt].some_long_task_to_hardware,(1000))
threads.append(task)
alive = [True]*4
Try:
while any(alive) :
for tt in range(4): alive[tt] = not threads[tt].ready()
do_other_stuff_for_a_bit()
except:
#some command I cannot find that will stop the threads...
raise
for tt in range(4): print(threads[tt].get())
Проблема возникает, если пользователь хочет, чтобы остановить процесс или возникает ошибка ввода-вывода в do_other_stuff_for_a_bit()
. Нажатие Ctrl + C останавливает основной процесс, но рабочие потоки продолжают работать, пока их текущая задача не будет завершена.
Есть ли способ остановить эти потоки, не переписывая библиотеку или пользовательский выход из python? pool.terminate()
и pool.join()
, которые я видел в других примерах, похоже, не выполняют эту работу.
Фактическая процедура (вместо упрощенной версии выше) использует ведение журнала, и хотя все рабочие потоки в какой-то момент закрыты, я вижу, что процессы, с которых они начали работать, продолжаются до завершения (и быть аппаратным я могу видеть их эффект, глядя через комнату).
Это в python 2.7.
UPDATE:
Решение, кажется, чтобы переключиться на использование multiprocessing.Process вместо пула потоков. Тест код, который я пытался это запустить foo_pulse:
class foo(object):
def foo_pulse(self,nPulse,name): #just one method of *many*
print('starting pulse for '+name)
result=[]
for ii in range(nPulse):
print('on for '+name)
time.sleep(2)
print('off for '+name)
time.sleep(2)
result.append(ii)
return result,name
Если вы попробуйте запустить это с помощью ThreadPool затем Ctrl-C не останавливает foo_pulse от работы (даже если он действительно убивает нити сразу, операторы печати продолжают приход:
from multiprocessing.pool import ThreadPool
import time
def test(nPulse):
a=foo()
pool=ThreadPool(processes=4)
threads=[]
for rn in range(4) :
r=pool.apply_async(a.foo_pulse,(nPulse,'loop '+str(rn)))
threads.append(r)
alive=[True]*4
try:
while any(alive) : #wait until all threads complete
for rn in range(4):
alive[rn] = not threads[rn].ready()
time.sleep(1)
except : #stop threads if user presses ctrl-c
print('trying to stop threads')
pool.terminate()
print('stopped threads') # this line prints but output from foo_pulse carried on.
raise
else :
for t in threads : print(t.get())
Однако вариант с использованием multiprocessing.Process работает, как ожидалось:
import multiprocessing as mp
import time
def test_pro(nPulse):
pros=[]
ans=[]
a=foo()
for rn in range(4) :
q=mp.Queue()
ans.append(q)
r=mp.Process(target=wrapper,args=(a,"foo_pulse",q),kwargs={'args':(nPulse,'loop '+str(rn))})
r.start()
pros.append(r)
try:
for p in pros : p.join()
print('all done')
except : #stop threads if user stops findRes
print('trying to stop threads')
for p in pros : p.terminate()
print('stopped threads')
else :
print('output here')
for q in ans :
print(q.get())
print('exit time')
Где я определил оболочку для библиотеки обув (так, что он сделал не нужно переписывать). Если возвращаемое значение не необходима ни эта обертка:
def wrapper(a,target,q,args=(),kwargs={}):
'''Used when return value is wanted'''
q.put(getattr(a,target)(*args,**kwargs))
Из документации я не вижу никаких причин, почему пул не будет работать (кроме ошибки).
У вас есть причины использовать недокументированные классы? Вероятно, вам повезло бы с модулем 'concurrent.futures'. – SuperSaiyan
Нет реальной причины использовать недокументированные классы - кроме этого используется то, что было использовано в примере кода, с которым я столкнулся при исследовании того, как это сделать. – SRD
@SuperSaiyan: Это задокументировано под другим именем; 'ThreadPool' экспонируется в документальной форме в разделе' multiprocessing.dummy.Pool', где ['multiprocessing.dummy' является закрытой копией API« многопроцессорности », поддерживаемой потоками вместо процессов] (https: // docs. python.org/3/library/multiprocessing.html#module-multiprocessing.dummy). – ShadowRanger