2015-12-15 4 views
0

Мне нужно запустить два потока, которые управляют первым, а затем чередуя их задания.Как синхронизировать две чередующиеся нити

Следующий код работает так, как ожидается, с do_sleep = True, но может завершиться с ошибкой do_sleep = False.

Как я могу достичь такого же результата, не используя эти уродливые (и ненадежные) сна?

Причина, почему она работает с do_sleep = True что:

  • Каждый рабочий поток дает время на другой поток, чтобы начать, прежде чем пытаться получить блокировку и начать следующую работу
  • Существует пауза между начало первой и второй рабочий, который позволяет первому один получить блокировку перед второй готов

с do_sleep = False он может потерпеть неудачу, потому что:

  • В конце каждого задания, каждый поток может попытаться получить блокировку для следующего цикла до другого потока, выполнение двух последовательных заданий вместо переменного
  • Второго поток может получить блокировку перед первым

Вот код:

import threading 
import time 
import random 

do_sleep = True 

def workerA(lock): 
    for i in range(5): 
     lock.acquire() 
     print('Working A - %s' % i) 
     time.sleep(random.uniform(0.2, 1)) 
     lock.release() 
     if do_sleep: time.sleep(0.1) 

def workerB(lock): 
    for i in range(5): 
     if do_sleep: time.sleep(0.1) 
     lock.acquire() 
     print('Working B - %s' % i) 
     time.sleep(random.uniform(0.2, 1)) 
     lock.release() 
     if do_sleep: time.sleep(0.1) 

lock = threading.Lock() 

t1 = threading.Thread(target=workerA, args=(lock,)) 
t2 = threading.Thread(target=workerB, args=(lock,)) 

t1.start() 
if do_sleep: time.sleep(0.1) 
t2.start() 

t1.join() 
t2.join() 

print('done') 

EDIT Использование Queue в sugge Майк не помогает, потому что первый рабочий закончил работу, не дожидаясь второго.

Это неправильный выход версии после замены Lock с Queue:

Working A - 0 
Working A - 1 
Working B - 0 
Working A - 2 
Working B - 1 
Working A - 3 
Working B - 2 
Working A - 4 
Working B - 3 
Working B - 4 
done 

Это неправильный выход, полученный с do_sleep = False:

Working A - 0 
Working A - 1 
Working A - 2 
Working A - 3 
Working A - 4 
Working B - 0 
Working B - 1 
Working B - 2 
Working B - 3 
Working B - 4 
done 

Это правильный выход, полученные с использованием do_sleep = True:

Working A - 0 
Working B - 0 
Working A - 1 
Working B - 1 
Working A - 2 
Working B - 2 
Working A - 3 
Working B - 3 
Working A - 4 
Working B - 4 
done 
+0

в действительности это действительно дубликат: http://stackoverflow.com/questions/16665367/why-doesnt-a-simple-python-producer-consumer-multi-threading-program-speed-up-b –

+0

Нет, я пытался с «Очередью», и он не работает. – stenci

ответ

2

Несколько способов решить эту проблему. Один из относительно простых заключается в том, чтобы использовать блокировку для контроля доступа к отдельной общей переменной: вызвать эту другую переменную owner, ее можно либо установить на A, либо B. Нить A может начать только задание, когда owner установлено в A, и поток B может начать только работу, когда owner установлен в В. Тогда псевдо-код (предположит, что поток а здесь):

while True: 
    while True: 
     # Loop until I'm the owner 
     lock.acquire() 
     if owner == A: 
      break 
     lock.release() 

    # Now I'm the owner. And I still hold the lock. Start job. 
    <Grab next job (or start job or finish job, whatever is required to remove it from contention)> 
    owner = B 
    lock.release() 
    <Finish job if not already done. Go get next one> 

Б нить делает то же самое только реверсивное заявление if owner и owner =. И, очевидно, вы можете параметризовать его так, чтобы оба фактически запускали один и тот же код.

EDIT

Вот рабочая версия, с предложенной логикой внутри объекта:

import threading 
import time 

def workerA(lock): 
    for i in range(5): 
     lock.acquire_for('A') 
     print('Start A - %s' % i) 
     time.sleep(0.5) 
     print('End A - %s' % i) 
     lock.release_to('B') 

def workerB(lock): 
    for i in range(5): 
     lock.acquire_for('B') 
     print('Start B - %s' % i) 
     time.sleep(2) 
     print('End B - %s' % i) 
     lock.release_to('A') 

class LockWithOwner: 

    lock = threading.RLock() 
    owner = 'A' 

    def acquire_for(self, owner): 
     n = 0 
     while True: 
      self.lock.acquire() 
      if self.owner == owner: 
       break 
      n += 1 
      self.lock.release() 
      time.sleep(0.001) 
     print('Waited for {} to be the owner {} times'.format(owner, n)) 

    def release_to(self, new_owner): 
     self.owner = new_owner 
     self.lock.release() 

lock = LockWithOwner() 
lock.owner = 'A' 

t1 = threading.Thread(target=workerA, args=(lock,)) 
t2 = threading.Thread(target=workerB, args=(lock,)) 

t1.start() 
t2.start() 

t1.join() 
t2.join() 

print('done') 
1

Вы можете исключить возможность неправильной нити получения блокировки, исключаем полагаться на time.sleep(...) для правильность и сокращение кода одновременно с использованием Queue (две очереди для двусторонней связи):

import threading 
import time 
import random 
from Queue import Queue 

def work_hard(name, i): 
    print('start %s - %s' % (name, i)) 
    time.sleep(random.uniform(0.2, 1)) 
    print('end %s - %s' % (name, i)) 

def worker(name, q_mine, q_his): 
    for i in range(5): 
    q_mine.get() 
    work_hard(name, i) 
    q_his.put(1) 

qAB = Queue() 
qBA = Queue() 

t1 = threading.Thread(target=worker, args=('A', qAB, qBA)) 
t2 = threading.Thread(target=worker, args=('B', qBA, qAB)) 

t1.start() 
qAB.put(1) # notice how you don't need time.sleep(...) even here 
t2.start() 

t1.join() 
t2.join() 

print('done') 

Он работает так, как вы указали. В качестве альтернативы вы можете использовать threading.Condition (комбинация acquire, release, wait и notify/notifyAll), но это будет более тонким, особенно с точки зрения нити.

+0

Я замечаю, что вы не используете 'task_done()'. Это обычное/правильное? – stenci

+1

'Queue.Queue' имеет широкий API и позволяет использовать различные сценарии использования. Выше я использую его как канал между нитями, который является законным; другим сценарием является заполнение очереди объектами, подлежащими обработке, и просто вызывать 'join()' в клиентском потоке, этот вызов будет блокироваться до тех пор, пока не будет вызван 'task_done()' для всех объектов по потоку (-ам) обработки. Стоит отметить, что API-интерфейсы IPC для этих сценариев были смоделированы после «Queue.Queue», но все еще разделены: первая поддерживается «multiprocessing.Pipe/Connection» и «multiprocessing.queues.SimpleQueue'; второй - «многопроцессорным.JoinableQueue». – starikoff