Я пытаюсь реализовать класс для обеспечения параллелизма в моем приложении Java, блокируя асинхронные попытки изменения данного экземпляра объекта (идентифицированного ключом) с помощью RentrantLock. Цель состоит в том, чтобы блокировать/ставить несколько параллельных попыток изменить данный экземпляр объекта до тех пор, пока не будут завершены предыдущие потоки. Класс реализует это в общем виде, позволяя любому блоку кода получить блокировку и освободить ее после выполнения (идентично семантике RentrantLock) с добавленной утилитой только блокирования потоков, пытающихся модифицировать один и тот же экземпляр объекта (как определено ключом) вместо того, чтобы блокировать все потоки, входящие в блок кода.Реализация блокировки параллелизма с использованием ReentrantLock
Этот класс предоставляет простую конструкцию, позволяющую синхронизировать блок кода только для одного экземпляра объекта. Например, если я хочу, чтобы блок кода был синхронизирован для всех потоков, поступающих от пользователя с идентификатором 33, но я не хочу, чтобы потоки от любого другого пользователя были заблокированы пользователем 33 обслуживания потоков.
Класс реализован как следует
public class EntitySynchronizer {
private static final int DEFAULT_MAXIMUM_LOCK_DURATION_SECONDS = 300; // 5 minutes
private Object mutex = new Object();
private ConcurrentHashMap<Object, ReentrantLock> locks = new ConcurrentHashMap<Object, ReentrantLock>();
private static ThreadLocal<Object> keyThreadLocal = new ThreadLocal<Object>();
private int maximumLockDurationSeconds;
public EntitySynchronizer() {
this(DEFAULT_MAXIMUM_LOCK_DURATION_SECONDS);
}
public EntitySynchronizer(int maximumLockDurationSeconds) {
this.maximumLockDurationSeconds = maximumLockDurationSeconds;
}
/**
* Initiate a lock for all threads with this key value
* @param key the instance identifier for concurrency synchronization
*/
public void lock(Object key) {
if (key == null) {
return;
}
/*
* returns the existing lock for specified key, or null if there was no existing lock for the
* key
*/
ReentrantLock lock;
synchronized (mutex) {
lock = locks.putIfAbsent(key, new ReentrantLock(true));
if (lock == null) {
lock = locks.get(key);
}
}
/*
* Acquires the lock and returns immediately with the value true if it is not held by another
* thread within the given waiting time and the current thread has not been interrupted. If this
* lock has been set to use a fair ordering policy then an available lock will NOT be acquired
* if any other threads are waiting for the lock. If the current thread already holds this lock
* then the hold count is incremented by one and the method returns true. If the lock is held by
* another thread then the current thread becomes disabled for thread scheduling purposes and
* lies dormant until one of three things happens: - The lock is acquired by the current thread;
* or - Some other thread interrupts the current thread; or - The specified waiting time elapses
*/
try {
/*
* using tryLock(timeout) instead of lock() to prevent deadlock situation in case acquired
* lock is not released normalRelease will be false if the lock was released because the
* timeout expired
*/
boolean normalRelease = lock.tryLock(maximumLockDurationSeconds, TimeUnit.SECONDS);
/*
* lock was release because timeout expired. We do not want to proceed, we should throw a
* concurrency exception for waiting thread
*/
if (!normalRelease) {
throw new ConcurrentModificationException(
"Entity synchronization concurrency lock timeout expired for item key: " + key);
}
} catch (InterruptedException e) {
throw new IllegalStateException("Entity synchronization interrupted exception for item key: "
+ key);
}
keyThreadLocal.set(key);
}
/**
* Unlock this thread's lock. This takes care of preserving the lock for any waiting threads with
* the same entity key
*/
public void unlock() {
Object key = keyThreadLocal.get();
keyThreadLocal.remove();
if (key != null) {
ReentrantLock lock = locks.get(key);
if (lock != null) {
try {
synchronized (mutex) {
if (!lock.hasQueuedThreads()) {
locks.remove(key);
}
}
} finally {
lock.unlock();
}
} else {
synchronized (mutex) {
locks.remove(key);
}
}
}
}
}
Этот класс используется следующим образом:
private EntitySynchronizer entitySynchronizer = new EntitySynchronizer();
entitySynchronizer.lock(userId); // 'user' is the entity by which i want to differentiate my synchronization
try {
//execute my code here ...
} finally {
entitySynchronizer.unlock();
}
проблема заключается в том, что она не работает отлично. При очень высоких нагрузках на параллелизм все еще есть случаи, когда несколько потоков с одним и тем же ключом не синхронизируются. Я прошел и испытал достаточно тщательно и не могу понять, почему/где это может произойти.
Специалисты по параллелизму там?
Я полагаю, вы используете один и тот же экземпляр 'entitySynchronizer' для всех обращений? – assylias