2013-07-10 2 views
12

Требование состоит в том, чтобы запустить пять потоков и подождать только в самом быстром потоке. Все пять потоков пошли искать одни и те же данные в 5 направлениях, и одного достаточно, чтобы продолжить поток управления.Как подождать, пока закончится только первый поток в Python

На самом деле, мне нужно дождаться возвращения первых двух потоков, чтобы проверять друг друга. Но я думаю, если я знаю, как ждать самого быстрого. Я могу понять, как ждать второго - самого быстрого.

Много разговоров о join(timeout), но вы не знаете заранее, какой из них ждать (какой из них следует применить join).

ответ

1

Вы можете использовать мероприятие для этого. См. http://docs.python.org/2/library/threading.html#event-objects. Идея состоит в том, что рабочие потоки поднимают событие, когда они закончены. Основной поток ждет этого события, прежде чем продолжить. Рабочий поток может установить переменную (mutexed), чтобы идентифицировать себя с событием.

1

Или просто отслеживайте все готовые потоки в списке и дайте второму потоку закончить обрабатывать все, что должно быть сделано, списки Python являются потокобезопасными.

finished_threads = [] 
event = threading.Event() 

def func(): 
    do_important_stuff() 

    thisthread = threading.current_thread() 
    finished_threads.append(thisthread) 
    if len(finished_threads) > 1 and finished_threads[1] == thisthread: 
     #yay we are number two! 
     event.set() 

for i in range(5): 
    threading.Thread(target=func).start() 

event.wait() 
+0

Это не ответит на вопрос о основном потоке, ожидающем завершения двух потоков, а затем продолжения: вместо этого вы передали всю оставшуюся активность на второй поток, чтобы завершить, что может быть не так, как требуется. – Duncan

+1

true; handle_two_threads_done() должен, вероятно, установить вместо этого событие. Ред. –

+0

Ummm, списки Python являются потокобезопасными? В самом деле? Я думал, что нужно использовать Queue() для согласованности потоков! –

3

Если у вас есть какие-то петли обработки в ваших потоков, следующий код будет завершать их, когда один заканчивается с помощью threading.Event():

def my_thread(stop_event): 
    while not stop_event.is_set(): 
     # do stuff in a loop 

     # some check if stuff is complete 
     if stuff_complete: 
      stop_event.set() 
      break 

def run_threads(): 
    # create a thread event 
    a_stop_event = threading.Event() 

    # spawn the threads 
    for x in range(5): 
     t = threading.Thread(target=my_thread, args=[a_stop_event]) 
     t.start() 

    while not a_stop_event.is_set(): 
     # wait for an event 
     time.sleep(0.1) 

    print "At least one thread is done" 

Если процесс «дешевый» или один тип запроса-ответа (например, HTTP-запрос async), то Duncan's answer - хороший подход.

13

Используйте очереди: каждый поток после завершения помещает результат в очередь и тогда вам просто необходимо прочитать соответствующее количество результатов и игнорировать остальные:

#!python3.3 
import queue # For Python 2.x use 'import Queue as queue' 
import threading, time, random 

def func(id, result_queue): 
    print("Thread", id) 
    time.sleep(random.random() * 5) 
    result_queue.put((id, 'done')) 

def main(): 
    q = queue.Queue() 
    threads = [ threading.Thread(target=func, args=(i, q)) for i in range(5) ] 
    for th in threads: 
     th.daemon = True 
     th.start() 

    result1 = q.get() 
    result2 = q.get() 

    print("Second result: {}".format(result2)) 

if __name__=='__main__': 
    main() 

Документация для Queue.get() (без аргументов он эквивалентно Queue.get(True, None):

Queue.get ([блок [, тайм-аут]])

Удалить и вернуть элемент из очередь. Если дополнительный аргумент args равен true, а тайм-аут - None ( по умолчанию), блокируйте, если необходимо, до тех пор, пока элемент не будет доступен. Если тайм-аут - положительное число, он блокируется с наибольшим временем таймаута и вызывает Пустое исключение, если за это время не было предметов. В противном случае (блок является ложным), возвратите элемент, если он сразу доступен, иначе поднять пустое исключение (тогда в этом случае игнорируется время ожидания).

+1

Не приведет ли это к возникновению «пустого» исключения, если очередь Queue пуста, когда вы выполняете 'q.get()'? – Michael

+2

@Michael, значение по умолчанию для 'q.get()' состоит в том, чтобы сделать блокировку get, поэтому нет, он не будет генерировать исключение, вместо этого он заблокирует основной поток до тех пор, пока не появится результат. – Duncan

1

Метод Дункана, вероятно, лучший и является тем, что я бы рекомендовал. Меня слегка раздражало отсутствие «дождаться следующего завершенного потока, чтобы закончить», но, поэтому, я просто написал это, чтобы попробовать. Кажется, работает. Просто используйте MWThread вместо threading.thread, и вы получите эту новую функцию wait_for_thread.

Глобальные переменные являются немного klunky; альтернативой было бы сделать их переменными уровня класса. Но если это скрыто в модуле (mwthread.py или что-то еще), это должно быть хорошо в любом случае.

#! /usr/bin/env python 

# Example of how to "wait for"/join whichever threads is/are done, 
# in (more or less) the order they're done. 

import threading 
from collections import deque 

_monitored_threads = [] 
_exited_threads = deque() 
_lock = threading.Lock() 
_cond = threading.Condition(_lock) 

class MWThread(threading.Thread): 
    """ 
    multi-wait-able thread, or monitored-wait-able thread 
    """ 
    def run(self): 
     tid = threading.current_thread() 
     try: 
      with _lock: 
       _monitored_threads.append(tid) 
      super(MWThread, self).run() 
     finally: 
      with _lock: 
       _monitored_threads.remove(tid) 
       _exited_threads.append(tid) 
       _cond.notifyAll() 

def wait_for_thread(timeout=None): 
    """ 
    Wait for some thread(s) to have finished, with optional 
    timeout. Return the first finished thread instance (which 
    is removed from the finished-threads queue). 

    If there are no unfinished threads this returns None 
    without waiting. 
    """ 
    with _cond: 
     if not _exited_threads and _monitored_threads: 
      _cond.wait(timeout) 
     if _exited_threads: 
      result = _exited_threads.popleft() 
     else: 
      result = None 
    return result 

def main(): 
    print 'testing this stuff' 
    def func(i): 
     import time, random 
     sleeptime = (random.random() * 2) + 1 
     print 'thread', i, 'starting - sleep for', sleeptime 
     time.sleep(sleeptime) 
     print 'thread', i, 'finished' 

    threads = [MWThread(target=func, args=(i,)) for i in range(3)] 
    for th in threads: 
     th.start() 
    i = 0 
    while i < 3: 
     print 'main: wait up to .5 sec' 
     th = wait_for_thread(.5) 
     if th: 
      print 'main: got', th 
      th.join() 
      i += 1 
     else: 
      print 'main: timeout' 
    print 'I think I collected them all' 
    print 'result of wait_for_thread():' 
    print wait_for_thread() 

if __name__ == '__main__': 
    main() 
Смежные вопросы