2016-09-10 2 views
3

дал следующий код Python:Python многопроцессорной, Pool карта - отменить все запущенные процессы, если один, возвращает желаемый результат

import multiprocessing 

def unique(somelist): 
    return len(set(somelist)) == len(somelist) 


if __name__ == '__main__': 
    somelist = [[1,2,3,4,5,6,7,8,9,10,11,12,13,2], [1,2,3,4,5], [1,2,3,4,5,6,7,8,9,1], [0,1,5,1]] 

    pool = multiprocessing.Pool() 
    reslist = pool.map(unique, somelist) 
    pool.close() 
    pool.join() 
    print "Done!" 

    print reslist 

Теперь представьте, что списки с целыми числами в этой игрушке, например, чрезвычайно долго, и то, что я хотел бы получить здесь, следующее: если один из списков в somelist возвращает True, убейте все запущенные процессы.

Это приводит к двум вопросам (и, вероятно, больше, которые я не придумал):

  • Как я могу прочитать «»/«слушать» из готового процесса результата, в то время как другие процессы бегут? Если, например, процесс имеет дело с [1,2,3,4,5] от somelist и закончен перед всеми другими процессами, как я могу прочитать результат этого процесса в этот самый момент?

  • Учитывая, что можно «прочитать» результат завершенного процесса во время работы другого: как я могу использовать этот результат в качестве условия для прекращения всех других запущенных процессов?

например. если один процесс завершил и вернул True, как я могу использовать это как условие для прекращения всех других (все еще) запущенных процессов?

Заранее спасибо за любые подсказки Dan

ответ

4

Используйте pool.imap_unordered для просмотра результатов в любом порядке, они придумали.

reslist = pool.imap_unordered(unique, somelist) 
pool.close() 
for res in reslist: 
    if res: # or set other condition here 
     pool.terminate() 
     break 
pool.join() 

Вы можете перебирать в imap reslist в главном процессе, а процессы пула еще генерации результатов.

+0

сломаться Также цикл, в противном случае вы могли бы застрять ожидая следующего результата после завершения пула. –

+0

хороший улов, спасибо :) –

1

Без причудливых трюков IPC (inter-process communication), проще всего использовать метод Pool с функцией обратного вызова. Обратный вызов запускается в основной программе (в потоке, созданном multiprocessing), и потребляет каждый результат по мере его появления. Когда обратный вызов видит результат, который вам нравится, он может завершить Pool. Например,

import multiprocessing as mp 

def worker(i): 
    from time import sleep 
    sleep(i) 
    return i, (i == 5) 

def callback(t): 
    i, quit = t 
    result[i] = quit 
    if quit: 
     pool.terminate() 

if __name__ == "__main__": 
    N = 50 
    pool = mp.Pool() 
    result = [None] * N 
    for i in range(N): 
     pool.apply_async(func=worker, args=(i,), callback=callback) 
    pool.close() 
    pool.join() 
    print(result) 

Который будет почти наверняка отображать следующие (ОС планирования капризы может позволить другой вход или два, чтобы потребляться):

[False, False, False, False, False, True, None, None, None, None, 
None, None, None, None, None, None, None, None, None, None, 
None, None, None, None, None, None, None, None, None, None, 
None, None, None, None, None, None, None, None, None, None, 
None, None, None, None, None, None, None, None, None, None] 
Смежные вопросы