2012-05-03 2 views
2

Я пытаюсь реализовать класс для обеспечения параллелизма в моем приложении Java, блокируя асинхронные попытки изменения данного экземпляра объекта (идентифицированного ключом) с помощью RentrantLock. Цель состоит в том, чтобы блокировать/ставить несколько параллельных попыток изменить данный экземпляр объекта до тех пор, пока не будут завершены предыдущие потоки. Класс реализует это в общем виде, позволяя любому блоку кода получить блокировку и освободить ее после выполнения (идентично семантике RentrantLock) с добавленной утилитой только блокирования потоков, пытающихся модифицировать один и тот же экземпляр объекта (как определено ключом) вместо того, чтобы блокировать все потоки, входящие в блок кода.Реализация блокировки параллелизма с использованием ReentrantLock

Этот класс предоставляет простую конструкцию, позволяющую синхронизировать блок кода только для одного экземпляра объекта. Например, если я хочу, чтобы блок кода был синхронизирован для всех потоков, поступающих от пользователя с идентификатором 33, но я не хочу, чтобы потоки от любого другого пользователя были заблокированы пользователем 33 обслуживания потоков.

Класс реализован как следует

public class EntitySynchronizer { 
    private static final int DEFAULT_MAXIMUM_LOCK_DURATION_SECONDS = 300; // 5 minutes 
    private Object mutex = new Object(); 
    private ConcurrentHashMap<Object, ReentrantLock> locks = new ConcurrentHashMap<Object, ReentrantLock>(); 
    private static ThreadLocal<Object> keyThreadLocal = new ThreadLocal<Object>(); 
    private int maximumLockDurationSeconds; 
    public EntitySynchronizer() { 
    this(DEFAULT_MAXIMUM_LOCK_DURATION_SECONDS); 
    } 
    public EntitySynchronizer(int maximumLockDurationSeconds) { 
    this.maximumLockDurationSeconds = maximumLockDurationSeconds; 
    } 
    /** 
    * Initiate a lock for all threads with this key value 
    * @param key the instance identifier for concurrency synchronization 
    */ 
    public void lock(Object key) { 
    if (key == null) { 
     return; 
    } 
    /* 
    * returns the existing lock for specified key, or null if there was no existing lock for the 
    * key 
    */ 
    ReentrantLock lock; 
    synchronized (mutex) { 
     lock = locks.putIfAbsent(key, new ReentrantLock(true)); 
     if (lock == null) { 
     lock = locks.get(key); 
     } 
    } 
    /* 
    * Acquires the lock and returns immediately with the value true if it is not held by another 
    * thread within the given waiting time and the current thread has not been interrupted. If this 
    * lock has been set to use a fair ordering policy then an available lock will NOT be acquired 
    * if any other threads are waiting for the lock. If the current thread already holds this lock 
    * then the hold count is incremented by one and the method returns true. If the lock is held by 
    * another thread then the current thread becomes disabled for thread scheduling purposes and 
    * lies dormant until one of three things happens: - The lock is acquired by the current thread; 
    * or - Some other thread interrupts the current thread; or - The specified waiting time elapses 
    */ 
    try { 
     /* 
     * using tryLock(timeout) instead of lock() to prevent deadlock situation in case acquired 
     * lock is not released normalRelease will be false if the lock was released because the 
     * timeout expired 
     */ 
     boolean normalRelease = lock.tryLock(maximumLockDurationSeconds, TimeUnit.SECONDS); 
     /* 
     * lock was release because timeout expired. We do not want to proceed, we should throw a 
     * concurrency exception for waiting thread 
     */ 
     if (!normalRelease) { 
     throw new ConcurrentModificationException(
      "Entity synchronization concurrency lock timeout expired for item key: " + key); 
     } 
    } catch (InterruptedException e) { 
     throw new IllegalStateException("Entity synchronization interrupted exception for item key: " 
      + key); 
    } 
    keyThreadLocal.set(key); 
    } 
    /** 
    * Unlock this thread's lock. This takes care of preserving the lock for any waiting threads with 
    * the same entity key 
    */ 
    public void unlock() { 
    Object key = keyThreadLocal.get(); 
    keyThreadLocal.remove(); 
    if (key != null) { 
     ReentrantLock lock = locks.get(key); 
     if (lock != null) { 
     try { 
      synchronized (mutex) { 
      if (!lock.hasQueuedThreads()) { 
       locks.remove(key); 
      } 
      } 
     } finally { 
      lock.unlock(); 
     } 
     } else { 
     synchronized (mutex) { 
      locks.remove(key); 
     } 
     } 
    } 
    } 
} 

Этот класс используется следующим образом:

private EntitySynchronizer entitySynchronizer = new EntitySynchronizer(); 
entitySynchronizer.lock(userId); // 'user' is the entity by which i want to differentiate my synchronization 
try { 
    //execute my code here ... 
} finally { 
    entitySynchronizer.unlock(); 
} 

проблема заключается в том, что она не работает отлично. При очень высоких нагрузках на параллелизм все еще есть случаи, когда несколько потоков с одним и тем же ключом не синхронизируются. Я прошел и испытал достаточно тщательно и не могу понять, почему/где это может произойти.

Специалисты по параллелизму там?

+0

Я полагаю, вы используете один и тот же экземпляр 'entitySynchronizer' для всех обращений? – assylias

ответ

1

Ваше решение страдает от недостатка атомарности. Рассмотрим следующий сценарий:

  • Резьба A вводит lock() и получает существующий замок с карты.
  • Резьба B вводит unlock() за тот же ключ, разблокирует и удаляет замок с карты (так как нить A еще не вызвала tryLock()).
  • Тема A успешно звонит tryLock().

Одним из возможных вариантов является следить за количеством замков «проверил» с карты:

public class EntitySynchronizer { 
    private Map<Object, Token> tokens = new HashMap<Object, Token>(); 
    private ThreadLocal<Token> currentToken = new ThreadLocal<Token>(); 
    private Object mutex = new Object(); 

    ... 

    public void lock(Object key) throws InterruptedException { 
     Token token = checkOut(key); 
     boolean locked = false; 
     try { 
      locked = token.lock.tryLock(maximumLockDurationSeconds, TimeUnit.SECONDS)); 
      if (locked) currentToken.set(token); 
     } finally { 
      if (!locked) checkIn(token); 
     } 
    } 

    public void unlock() { 
     Token token = currentToken.get(); 
     if (token != null) { 
      token.lock.unlock(); 
      checkIn(token); 
      currentToken.remove(); 
     } 
    } 

    private Token checkOut(Object key) { 
     synchronized (mutex) { 
      Token token = tokens.get(key); 
      if (token != null) { 
       token.checkedOutCount++; 
      } else { 
       token = new Token(key); 
       tokens.put(key, token); 
      } 
      return token; 
     } 
    } 

    private void checkIn(Token token) { 
     synchronized (mutex) { 
      token.checkedOutCount--; 
      if (token.checkedOutCount == 0) 
       tokens.remove(token.key); 
     } 
    } 

    private class Token { 
     public final Object key; 
     public final ReentrantLock lock = new ReentrantLock(); 
     public int checkedOutCount = 1; 

     public Token(Object key) { 
      this.key = key; 
     } 
    } 
} 

Обратите внимание, что tokens не должен быть ConcurentHashMap, поскольку его методы вызываются в синхронизированных блоках так или иначе.

+1

Он мог просто использовать 'synchronized (key)', но, вероятно, он хочет, чтобы все ключи, которые являются «равными», были взаимоисключающими. –

+0

@axtavt - В методе lock(), если tryLock() возвращает false (из-за тайм-аута блокировки), тогда блок кода с этим будет выполняться правильно (checkIn() вызывается в finally {} block) , но метод unlock() (и, следовательно, checkIn()) будет снова вызван в конце блока кода. Кроме того, могу ли я достичь того, что вы здесь делаете, включив мой tryLock() в моем синхронизированном блоке? – Strykker

+0

@Strykker: Если 'tryLock' возвращает' false', 'token' не будет помещен в' ThreadLocal', поэтому последующая 'unlock()' ничего не сделает. Если вы хотите, вы можете реализовать различную обработку для этих случаев. Вы не можете поместить 'tryLock()' в синхронизированный блок, потому что он блокирует другие потоки. – axtavt

4

Одна из вещей, которые вы должны исправить это:

ReentrantLock lock; 
synchronized (mutex) { 
    lock = locks.putIfAbsent(key, new ReentrantLock(true)); 
    if (lock == null) { 
    lock = locks.get(key); 
    } 
} 

Это пропускает всю точку одновременной карты. Почему ты не писал так:

ReentrantLock lock = new ReentrantLock(true); 
final ReentrantLock oldLock = locks.putIfAbsent(key, lock); 
lock = oldLock != null? oldLock : lock; 
+1

Это не совсем так, поскольку вы всегда получите новый экземпляр блокировки, даже если на карте уже есть блокировка. То, что плакат действительно хочет, это просто 'lock = locks.putIfAbsent (key, new ReentrantLock (true));' см. (Http://docs.oracle.com/javase/6/docs/api/java/util /concurrent/ConcurrentHashMap.html#putIfAbsent%28K,%20V%29) –

+0

Малкольм прав, потому что я сделал это, потому что мне нужно повторно использовать существующую блокировку, если она существует. Marko, вам интересно, почему я беспокоил синхронизацию операций вокруг Карты? Я сделал это, потому что я подозревал, что может быть состояние гонки между созданием/получением существующей блокировки и кодом управления блокировкой в ​​разблокировке (методе) @Malcom. Таким образом, в вашем фрагменте кода операция new() вызывается только если нет существующего замка? – Strykker

+0

@MalcolmSmith Нет, OP тоже не хочет вашего предложения. См. Мой отредактированный ответ о том, что я имею в виду. OP, я думаю, вы можете переписать код так, чтобы все операции на параллельной карте были атомарными. Кстати, и код Малькольма, и мой экземпляр блокируют замок безоговорочно, но это не проблема, если вы избавитесь от него сразу. Вы должны сделать это, если хотите атомный 'putIfAbsent'. –

1

Я предполагаю, что вы на самом деле не используя ваш класс следующим образом:

private EntitySynchronizer entitySynchronizer = new EntitySynchronizer(); 
entitySynchronizer.lock(userId); // 'user' is the entity by which i want to differentiate my synchronization 
try { 
    //execute my code here ... 
} finally { 
    entitySynchronizer.unlock(); 
} 

Но есть одноэлементный экземпляр EntitySynchronizer? Потому что в противном случае это ваша проблема.

+0

+1 Хороший улов. Я даже не смотрел на его использование. –

+0

Правильно, у меня есть экземпляр Singleton EntitySynchronizer. Мой пример использования был немного упрощен – Strykker

Смежные вопросы