2016-06-03 5 views
4

Простите меня, если это было задано раньше. Я много оглядывался, но я чувствую, что у меня нет подходящего словарного запаса, чтобы найти это, обыскав в Интернете.блокировка потока на основе значений

У меня многопоточное приложение в python. Я хочу иметь возможность блокировать определенный блок кода, но только для других потоков с определенным условием. Позвольте привести пример: Есть три потока: thread_a, thread_b и thread_c. Каждый поток может проходить через функцию foo в любое время. Я не хочу, чтобы два потока с bar равны друг другу, чтобы иметь возможность одновременно получать доступ к Code block ALPHA. Однако я не хочу блокировать потоки, значение bar отличается. В этом случае, скажем, thread_a имеет bar == "cat" и попадает в линию (3). До thread_a показывается строка (5), скажем thread_b, с bar == "cat" показов линии (3). Я хотел бы для thread_b ждать. Но если приходит thread_c, с bar == "dog", я бы хотел, чтобы он продолжал идти.

(1) def foo(bar): 
(2)  
(3)  lock(bar) 
(4)  # Code block ALPHA (two threads with equivalent bar should not be in here) 
(5)  unlock(bar) 

В другой записке, возможные значения для bar совершенно непредсказуемы, но с очень высокой вероятностью столкновения.

Благодарим за помощь. Библиотека Я смотрю на это python threading library

+0

Является ли 'бар' конечным? Если это так, вы можете настроить столько мьютексов, сколько хотите, а затем заблокировать мьютекс, который соответствует любому значению «bar», которое имеет ваш поток. – numeral

+0

@ нумерация, хороший вопрос. Бар не является конечным в том смысле, что вы могли бы разумно выделить мьютекс для каждого возможного случая. Я буду обновлять свой вопрос, чтобы отразить это. –

+0

В этом случае единственное, что я могу придумать, это то, что вам нужно будет следить за тем, что «бар» вы встретили и заблокировали динамически. Все, что я думаю, имеет какую-то модель разделяемой памяти. Это означает, что вам понадобится мастер-замок, который заблокирует все ваши динамические блокировки. Другими словами, у вас будет только один мьютекс, который заблокировал бы все потоки. Извините, не смог. :/ – numeral

ответ

3

Обновлено

Хорошие новости: Я был в состоянии воспроизвести release_lock проблему с которой вы столкнулись, используя свой оригинальный ответ через несколько сырой испытательный стенд я мощеной вместе, и решить проблему используя механизм подсчета (как вы предположили) - по крайней мере, так далеко, как я могу сказать с помощью моего испытательного аппарата.

Теперь используются два отдельных общих словаря, один для отслеживания «имен» или значений, связанных с каждой блокировкой, как и раньше, а другой для отслеживания того, сколько потоков использует каждый из них в данный момент времени.

Как и раньше, имена блокировок должны быть хешируемыми значениями, поэтому их можно использовать в качестве ключей в словарях.

import threading 

namespace_lock = threading.Lock() 
namespace = {} 
counters = {} 

def aquire_lock(value): 
    with namespace_lock: 
     if value in namespace: 
      counters[value] += 1 
     else: 
      namespace[value] = threading.Lock() 
      counters[value] = 1 

    namespace[value].acquire() 

def release_lock(value): 
    with namespace_lock: 
     if counters[value] == 1: 
      del counters[value] 
      lock = namespace.pop(value) 
     else: 
      counters[value] -= 1 
      lock = namespace[value] 

    lock.release() 

# sample usage  
def foo(bar): 
    aquire_lock(bar) 
    # Code block ALPHA (two threads with equivalent bar should not be in here) 
    release_lock(bar) 
+0

Благодарим вас за то, что вы написали это. Я собираюсь проверить это и отчитаться. Это выглядит многообещающе. –

+0

Я впечатлен тем, что это до сих пор до любого тестирования. Единственная проблема, которую я обнаружил, заключается в том, что значения могут быть выведены из пространства имен преждевременно. Когда два потока пытаются получить одно и то же значение, когда 'release_lock' вызывается первым потоком, он выдает значение, а второй поток попадает через' namespace [value] .acquire() '. Когда второй поток вызывает 'release_lock', значение больше не находится в' namespace' и не может быть вытолкнуто. Я думаю, все, что потребуется, чтобы исправить это добавление счетного механизма, чтобы позволить выпуску поток знать, если он должен появиться. Удивительный ответ в целом! –

2

Есть один замок, приобретенный всякий раз, когда поток пытается войти или выйти из критической секции, а также использовать отдельные переменные условия для каждого значения bar. Далее, вероятно, может быть оптимизирована для создания меньше переменных условия, но при этом для этой должности чувствовал себя преждевременной оптимизации:

import collections 
import contextlib 
import threading 

lock = threading.Lock() 

wait_tracker = collections.defaultdict(lambda: (False, 0, threading.Condition(lock))) 

@contextlib.contextmanager 
def critical(bar): 
    with lock: 
     busy, waiters, condition = wait_tracker[bar] 
     if busy: 
      # Someone with the same bar value is in the critical section. 

      # Record that we're waiting. 
      waiters += 1 
      wait_tracker[bar] = busy, waiters, condition 

      # Wait for our turn. 
      while wait_tracker[bar][0]: 
       condition.wait() 

      # Record that we're not waiting any more. 
      busy, waiters, condition = wait_tracker[bar] 
      waiters -= 1 

     # Record that we're entering the critical section. 
     busy = True 
     wait_tracker[bar] = busy, waiters, condition 
    try: 
     # Critical section runs here. 
     yield 
    finally: 
     with lock: 
      # Record that we're out of the critical section. 
      busy, waiters, condition = wait_tracker[bar] 
      busy = False 
      if waiters: 
       # Someone was waiting for us. Tell them it's their turn now. 
       wait_tracker[bar] = busy, waiters, condition 
       condition.notify() 
      else: 
       # No one was waiting for us. Clean up a bit so the wait_tracker 
       # doesn't grow forever. 
       del wait_tracker[bar] 

Затем каждый поток, который хочет войти в критическую секцию делает следующее:

with critical(bar): 
    # Critical section. 

Этот код непроверен, а параллелизм сложный, особенно параллелизм между блокировками и памятью. Я не гарантирую, что он сработает.

+0

Когда я получу шанс, я внимательно посмотрю на это. –

Смежные вопросы