2016-08-09 3 views
6

Я пытаюсь написать интерактивную оболочку (для использования в ipython) для библиотеки, которая управляет некоторым оборудованием. Некоторые вызовы тяжелы для ввода-вывода, поэтому имеет смысл параллельно выполнять задачи. Использование ThreadPool (почти) работает хорошо:Остановка процессов в ThreadPool в Python

from multiprocessing.pool import ThreadPool 

class hardware(): 
    def __init__(IPaddress): 
     connect_to_hardware(IPaddress) 

    def some_long_task_to_hardware(wtime): 
     wait(wtime) 
     result = 'blah' 
     return result 

pool = ThreadPool(processes=4) 
Threads=[] 
h=[hardware(IP1),hardware(IP2),hardware(IP3),hardware(IP4)] 
for tt in range(4): 
    task=pool.apply_async(h[tt].some_long_task_to_hardware,(1000)) 
    threads.append(task) 
alive = [True]*4 
Try: 
    while any(alive) : 
     for tt in range(4): alive[tt] = not threads[tt].ready() 
     do_other_stuff_for_a_bit() 
except: 
    #some command I cannot find that will stop the threads... 
    raise 
for tt in range(4): print(threads[tt].get()) 

Проблема возникает, если пользователь хочет, чтобы остановить процесс или возникает ошибка ввода-вывода в do_other_stuff_for_a_bit(). Нажатие Ctrl + C останавливает основной процесс, но рабочие потоки продолжают работать, пока их текущая задача не будет завершена.
Есть ли способ остановить эти потоки, не переписывая библиотеку или пользовательский выход из python? pool.terminate() и pool.join(), которые я видел в других примерах, похоже, не выполняют эту работу.

Фактическая процедура (вместо упрощенной версии выше) использует ведение журнала, и хотя все рабочие потоки в какой-то момент закрыты, я вижу, что процессы, с которых они начали работать, продолжаются до завершения (и быть аппаратным я могу видеть их эффект, глядя через комнату).

Это в python 2.7.

UPDATE:

Решение, кажется, чтобы переключиться на использование multiprocessing.Process вместо пула потоков. Тест код, который я пытался это запустить foo_pulse:

class foo(object): 
    def foo_pulse(self,nPulse,name): #just one method of *many* 
     print('starting pulse for '+name) 
     result=[] 
     for ii in range(nPulse): 
      print('on for '+name) 
      time.sleep(2) 
      print('off for '+name) 
      time.sleep(2) 
      result.append(ii) 
     return result,name 

Если вы попробуйте запустить это с помощью ThreadPool затем Ctrl-C не останавливает foo_pulse от работы (даже если он действительно убивает нити сразу, операторы печати продолжают приход:

from multiprocessing.pool import ThreadPool 
import time 
def test(nPulse): 
    a=foo() 
    pool=ThreadPool(processes=4) 
    threads=[] 
    for rn in range(4) : 
     r=pool.apply_async(a.foo_pulse,(nPulse,'loop '+str(rn))) 
     threads.append(r) 
    alive=[True]*4 
    try: 
     while any(alive) : #wait until all threads complete 
      for rn in range(4): 
       alive[rn] = not threads[rn].ready() 
       time.sleep(1) 
    except : #stop threads if user presses ctrl-c 
     print('trying to stop threads') 
     pool.terminate() 
     print('stopped threads') # this line prints but output from foo_pulse carried on. 
     raise 
    else : 
     for t in threads : print(t.get()) 

Однако вариант с использованием multiprocessing.Process работает, как ожидалось:

import multiprocessing as mp 
import time 
def test_pro(nPulse): 
    pros=[] 
    ans=[] 
    a=foo() 
    for rn in range(4) : 
     q=mp.Queue() 
     ans.append(q) 
     r=mp.Process(target=wrapper,args=(a,"foo_pulse",q),kwargs={'args':(nPulse,'loop '+str(rn))}) 
     r.start() 
     pros.append(r) 
    try: 
     for p in pros : p.join() 
     print('all done') 
    except : #stop threads if user stops findRes 
     print('trying to stop threads') 
     for p in pros : p.terminate() 
     print('stopped threads') 
    else : 
     print('output here') 
     for q in ans : 
      print(q.get()) 
    print('exit time') 

Где я определил оболочку для библиотеки обув (так, что он сделал не нужно переписывать). Если возвращаемое значение не необходима ни эта обертка:

def wrapper(a,target,q,args=(),kwargs={}): 
    '''Used when return value is wanted''' 
    q.put(getattr(a,target)(*args,**kwargs)) 

Из документации я не вижу никаких причин, почему пул не будет работать (кроме ошибки).

+1

У вас есть причины использовать недокументированные классы? Вероятно, вам повезло бы с модулем 'concurrent.futures'. – SuperSaiyan

+0

Нет реальной причины использовать недокументированные классы - кроме этого используется то, что было использовано в примере кода, с которым я столкнулся при исследовании того, как это сделать. – SRD

+0

@SuperSaiyan: Это задокументировано под другим именем; 'ThreadPool' экспонируется в документальной форме в разделе' multiprocessing.dummy.Pool', где ['multiprocessing.dummy' является закрытой копией API« многопроцессорности », поддерживаемой потоками вместо процессов] (https: // docs. python.org/3/library/multiprocessing.html#module-multiprocessing.dummy). – ShadowRanger

ответ

1

Это очень интересное использование параллелизма.

Однако, если вы используете multiprocessing, цель состоит в том, чтобы иметь много процессов, работающих параллельно, в отличие от одного процесса, выполняющего множество потоков.

Рассмотрим эти несколько изменений, чтобы реализовать его с помощью multiprocessing:

У вас есть эти функции, которые будут работать параллельно:

import time 
import multiprocessing as mp 


def some_long_task_from_library(wtime): 
    time.sleep(wtime) 


class MyException(Exception): pass 

def do_other_stuff_for_a_bit(): 
    time.sleep(5) 
    raise MyException("Something Happened...") 

Давайте создавать и запускать процессы, скажем, 4:

procs = [] # this is not a Pool, it is just a way to handle the 
      # processes instead of calling them p1, p2, p3, p4... 
for _ in range(4): 
    p = mp.Process(target=some_long_task_from_library, args=(1000,)) 
    p.start() 
    procs.append(p) 
mp.active_children() # this joins all the started processes, and runs them. 

Процессы выполняются параллельно, предположительно в отдельном ядре процессора, но это зависит от ОС.Вы можете проверить свой системный монитор.

В то же время запуска процесса, который сломается, и вы хотите, чтобы остановить запущенные процессы, не оставляя им сироте:

try: 
    do_other_stuff_for_a_bit() 
except MyException as exc: 
    print(exc) 
    print("Now stopping all processes...") 
    for p in procs: 
     p.terminate() 
print("The rest of the process will continue") 

Если это не имеет смысла продолжать с основным процессом, когда один или все подпроцессы завершены, вы должны обработать выход из основной программы.

Надеюсь, что это поможет, и вы можете адаптировать биты этого для своей библиотеки.

+0

В моем случае это не имело значения, если все работает на одном процессоре, причиной параллельной работы является то, что в IO есть массовые ожидания. Однако этот метод работает с одним недостатком, что трудно вернуть значения из вызовов. На данный момент я решил это с помощью функции обертки - см. Мой обновленный пост. – SRD

+0

В зависимости от того, какие значения необходимо вернуть из вызовов, вы можете использовать 'Queue',' Pipe', общую память 'Value' или' Array' или даже файл на диске. В некоторых из этих случаев вам может понадобиться использовать 'Lock'. – chapelo

0

В ответ на вопрос о том, почему бассейн не работает, то это связано (как указано в Documentation), тогда основные потребности быть ввозу на дочерних процессов и в связи с характером этого проекта интерактивного питон быть использованным.

В то же время было непонятно, почему ThreadPool будет - хотя подсказка прямо там в названии. ThreadPool создает пул рабочих процессов с использованием multiprocessing.dummy, который, как отмечено, here - это всего лишь обертка вокруг модуля Threading. Пул использует многопроцессорную обработку. Это можно увидеть с помощью этого теста:

p=ThreadPool(processes=3) 
p._pool[0] 
<DummyProcess(Thread23, started daemon 12345)> #no terminate() method 

p=Pool(processes=3) 
p._pool[0] 
<Process(PoolWorker-1, started daemon)> #has handy terminate() method if needed 

Как нити не имеют способ прекратить рабочие потоки нести на движение, пока они не закончат свою текущую задачу. Убийство потоков беспорядочно (вот почему я пытался использовать многопроцессорный модуль), но решениями являются here.

Одно предупреждение о решении с помощью выше:

def wrapper(a,target,q,args=(),kwargs={}): 
    '''Used when return value is wanted''' 
    q.put(getattr(a,target)(*args,**kwargs)) 

является то, что изменения атрибутов внутри экземпляра объекта не передаются обратно к основной программе. В качестве примера класс foo выше может также иметь такие методы, как: def addIP (newIP): self.hardwareIP = newIP Вызов r=mp.Process(target=a.addIP,args=(127.0.0.1)) не обновляется a.

Единственный способ обойти это для сложного объекта, по-видимому, является разделяемой памятью с использованием настраиваемого manager, который может предоставить доступ как к методам, так и к атрибутам объекта. a Для очень сложного объекта на основе библиотеки это может быть лучше всего сделано используя dir(foo) для заполнения диспетчера. Если я смогу понять, как я буду обновлять этот ответ на примере (для моего будущего я, как и другие).

Смежные вопросы