简单的基于Java名称的锁?

MySQL有一个方便的function:

SELECT GET_LOCK("SomeName") 

这可以用来为应用程序创build简单但非常具体的基于名称的锁。 但是,它需要数据库连接。

我有很多情况:

 someMethod() { // do stuff to user A for their data for feature X } 

简单地同步这个方法是没有意义的,因为,例如,如果在此期间为用户B调用此方法,则用户B在开始之前不需要等待用户A完成,只有用户的操作A和functionX组合需要等待。

通过MySql锁,我可以执行如下操作:

 someMethod() { executeQuery("SELECT GET_LOCK('userA-featureX')") // only locked for user A for their data for feature X executeQuery("SELECT RELEASE_LOCK('userA-featureX')") } 

由于Javalocking是基于对象的,所以似乎我需要创build一个新的对象来表示这个锁的情况,然后把它放在某个静态caching中,这样所有的线程都可以看到它。 随后的对这种情况的locking请求将在locking对象中find,并获得locking。 我试图创build这样的东西,但锁高速caching本身需要同步。 此外,很难检测何时不再使用锁对象,以便可以将其从caching中移除。

我已经看了Java的并发包,但没有什么突出的能够处理这样的事情。 有没有一个简单的方法来实现这个,或者我从错误的angular度来看待这个?

编辑:

为了澄清,我不想提前创build一个预定义的锁池,我想按需创build它们。 我所想的一些伪代码是:

 LockManager.acquireLock(String name) { Lock lock; synchronized (map) { lock = map.get(name); // doesn't exist yet - create and store if(lock == null) { lock = new Lock(); map.put(name, lock); } } lock.lock(); } LockManager.releaseLock(String name) { // unlock // if this was the last hold on the lock, remove it from the cache } 

也许这对你有用: jkeylockmanager

编辑:

我最初的反应可能有点短。 我是作者,几次面对这个问题,找不到现有的解决scheme。 这就是为什么我在Google Code上创build这个小型图书馆的原因。

你可以有一个Map<String, java.util.concurrent.Lock> ? 每次你需要一个锁,你基本上都调用map.get(lockName).lock()

以下是一个使用Google Guava的例子:

 Map<String, Lock> lockMap = new MapMaker().makeComputingMap(new Function<String, Lock>() { @Override public Lock apply(String input) { return new ReentrantLock(); } }); 

然后lockMap.get("anyOldString")会导致一个新的锁被创build, 如果需要并返回给你。 然后你可以调用该lock()makeComputingMap返回一个线程安全的地图,所以你可以与你的所有线程共享。

 // pool of names that are being locked HashSet<String> pool = new HashSet<String>(); lock(name) synchronized(pool) while(pool.contains(name)) // already being locked pool.wait(); // wait for release pool.add(name); // I lock it unlock(name) synchronized(pool) pool.remove(name); pool.notifyAll(); 

我看到的所有答案都太复杂了。 为什么不简单地使用:

 public void executeInNamedLock(String lockName, Runnable runnable) { synchronized(lockName.intern()) { runnable.run(); } } 

关键之处在于intern方法:它确保返回的string是全局唯一对象,因此可以用作虚拟机实例的互斥体。 所有internedstring都保存在一个全局池中,所以这是您在原始问题中讨论的静态caching。 不要担心memleaks; 这些string将被gc'ed如果没有其他线程引用它。 但是请注意,直到并包括Java6这个池保存在PermGen空间而不是堆,所以你可能不得不增加它。

有一个问题,如果你的虚拟机中的一些其他代码locking在相同的string由于完全不同的原因,但是a)这是不太可能的,和b)你可以通过引入命名空间,例如executeInNamedLock(this.getClass().getName() + "_" + myLockName);绕过它executeInNamedLock(this.getClass().getName() + "_" + myLockName);

对于像用户名那样的Lock ,映射中的内存Lock可能有点泄漏。 作为替代scheme,您可以使用WeakReference和WeakHashMap来创build互斥对象,在没有任何引用时可以进行垃圾回收。 这可避免您必须执行任何手动引用计数才能释放内存。

你可以在这里find一个实现。 请注意,如果您正在频繁查找地图,则可能会遇到获取互斥锁的争用问题。

使用java.util.concurrent的通用解决scheme

 import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; public class LockByName<L> { ConcurrentHashMap<String, L> mapStringLock; public LockByName(){ mapStringLock = new ConcurrentHashMap<String, L>(); } public LockByName(ConcurrentHashMap<String, L> mapStringLock){ this.mapStringLock = mapStringLock; } @SuppressWarnings("unchecked") public L getLock(String key) { L initValue = (L) createIntanceLock(); L lock = mapStringLock.putIfAbsent(key, initValue); if (lock == null) { lock = initValue; } return lock; } protected Object createIntanceLock() { return new ReentrantLock(); } public static void main(String[] args) { LockByName<ReentrantLock> reentrantLocker = new LockByName<ReentrantLock>(); ReentrantLock reentrantLock1 = reentrantLocker.getLock("pepe"); try { reentrantLock1.lock(); //DO WORK }finally{ reentrantLock1.unlock(); } } } 

也许稍后,但你可以使用Google Guava Striped

从概念上讲,锁条是将锁分成许多条的技术,增加了单个锁的粒度,并允许独立的操作locking不同的条并同时进行,而不是为单个锁创build争用。

 //init stripes=Striped.lazyWeakLock(size); //or stripes=Striped.lock(size); //... Lock lock=stripes.get(object); 

基于McDowell和他的类IdMutexProvider的答案 ,我写了使用WeakHashMap来存储locking对象的generics类LockMapLockMap.get()可以用来检索一个键的锁对象,然后可以使用Java synchronized (...)语句来应用一个锁。 垃圾收集期间未使用的锁对象会自动释放。

 import java.lang.ref.WeakReference; import java.util.WeakHashMap; // A map that creates and stores lock objects for arbitrary keys values. // Lock objects which are no longer referenced are automatically released during garbage collection. // Author: Christian d'Heureuse, www.source-code.biz // Based on IdMutexProvider by McDowell, http://illegalargumentexception.blogspot.ch/2008/04/java-synchronizing-on-transient-id.html // See also https://stackoverflow.com/questions/5639870/simple-java-name-based-locks public class LockMap<KEY> { private WeakHashMap<KeyWrapper<KEY>,WeakReference<KeyWrapper<KEY>>> map; public LockMap() { map = new WeakHashMap<KeyWrapper<KEY>,WeakReference<KeyWrapper<KEY>>>(); } // Returns a lock object for the specified key. public synchronized Object get (KEY key) { if (key == null) { throw new NullPointerException(); } KeyWrapper<KEY> newKeyWrapper = new KeyWrapper<KEY>(key); WeakReference<KeyWrapper<KEY>> ref = map.get(newKeyWrapper); KeyWrapper<KEY> oldKeyWrapper = (ref == null) ? null : ref.get(); if (oldKeyWrapper != null) { return oldKeyWrapper; } map.put(newKeyWrapper, new WeakReference<KeyWrapper<KEY>>(newKeyWrapper)); return newKeyWrapper; } // Returns the number of used entries in the map. public synchronized int size() { return map.size(); } // KeyWrapper wraps a key value and is used in three ways: // - as the key for the internal WeakHashMap // - as the value for the internal WeakHashMap, additionally wrapped in a WeakReference // - as the lock object associated to the key private static class KeyWrapper<KEY> { private KEY key; private int hashCode; public KeyWrapper (KEY key) { this.key = key; hashCode = key.hashCode(); } public boolean equals (Object obj) { if (obj == this) { return true; } if (obj instanceof KeyWrapper) { return ((KeyWrapper)obj).key.equals(key); } return false; } public int hashCode() { return hashCode; }} } // end class LockMap 

如何使用LockMap类的例子:

 private static LockMap<String> lockMap = new LockMap<String>(); synchronized (lockMap.get(name)) { ... } 

一个简单的LockMap类的testing程序:

 public static Object lock1; public static Object lock2; public static void main (String[] args) throws Exception { System.out.println("TestLockMap Started"); LockMap<Integer> map = new LockMap<Integer>(); lock1 = map.get(1); lock2 = map.get(2); if (lock2 == lock1) { throw new Error(); } Object lock1b = map.get(1); if (lock1b != lock1) { throw new Error(); } if (map.size() != 2) { throw new Error(); } for (int i=0; i<10000000; i++) { map.get(i); } System.out.println("Size before gc: " + map.size()); // result varies, eg 4425760 System.gc(); Thread.sleep(1000); if (map.size() != 2) { System.out.println("Size after gc should be 2 but is " + map.size()); } System.out.println("TestLockMap completed"); } 

如果有人知道更好的方法来自动testingLockMap类,请写评论。

也许这样的事情:

 public class ReentrantNamedLock { private class RefCounterLock { public int counter; public ReentrantLock sem; public RefCounterLock() { counter = 0; sem = new ReentrantLock(); } } private final ReentrantLock _lock = new ReentrantLock(); private final HashMap<String, RefCounterLock> _cache = new HashMap<String, RefCounterLock>(); public void lock(String key) { _lock.lock(); RefCounterLock cur = null; try { if (!_cache.containsKey(key)) { cur = new RefCounterLock(); _cache.put(key, cur); } else { cur = _cache.get(key); } cur.counter++; } finally { _lock.unlock(); } cur.sem.lock(); } public void unlock(String key) { _lock.lock(); try { if (_cache.containsKey(key)) { RefCounterLock cur = _cache.get(key); cur.counter--; cur.sem.unlock(); if (cur.counter == 0) { //last reference _cache.remove(key); } cur = null; } } finally { _lock.unlock(); } }} 

我没有testing它。

2年后,但我正在寻找一个简单的命名储物柜解决scheme,碰到这个,是有用的,但我需要一个更简单的答案,所以下面我想出了。

在某个名字下简单的locking,并在同一个名字下再次释放。

 private void doTask(){ locker.acquireLock(name); try{ //do stuff locked under the name }finally{ locker.releaseLock(name); } } 

这里是代码:

 public class NamedLocker { private ConcurrentMap<String, Semaphore> synchSemaphores = new ConcurrentHashMap<String, Semaphore>(); private int permits = 1; public NamedLocker(){ this(1); } public NamedLocker(int permits){ this.permits = permits; } public void acquireLock(String... key){ Semaphore tempS = new Semaphore(permits, true); Semaphore s = synchSemaphores.putIfAbsent(Arrays.toString(key), tempS); if(s == null){ s = tempS; } s.acquireUninterruptibly(); } public void releaseLock(String... key){ Semaphore s = synchSemaphores.get(Arrays.toString(key)); if(s != null){ s.release(); } } } 

有些失望之后,没有语言水平的支持或简单的Guava / Commons类的命名锁,

这就是我所解决的:

 ConcurrentMap<String, Object> locks = new ConcurrentHashMap<>(); Object getLock(String name) { Object lock = locks.get(name); if (lock == null) { Object newLock = new Object(); lock = locks.putIfAbsent(name, newLock); if (lock == null) { lock = newLock; } } return lock; } void somethingThatNeedsNamedLocks(String name) { synchronized(getLock(name)) { // some operations mutually exclusive per each name } } 

在这里,我实现了:没有库依赖性的小样板代码,primefaces地获取锁对象,不污染全局实例化的string对象,没有低级别的通知/等待混乱和没有try-catch-finally混乱。

我想要注意的是, ConcurrentHashMap具有内置的locking设施,足以实现简单的专用multithreadinglocking。 不需要额外的Lock对象。

下面是一个这样的locking映射的例子,用于为单个客户端强制执行最多一个活动的jms处理。

 private static final ConcurrentMap<String, Object> lockMap = new ConcurrentHashMap<String, Object>(); private static final Object DUMMY = new Object(); private boolean tryLock(String key) { if (lockMap.putIfAbsent(key, DUMMY) != null) { return false; } try { if (/* attempt cluster-wide db lock via select for update nowait */) { return true; } else { unlock(key); log.debug("DB is already locked"); return false; } } catch (Throwable e) { unlock(key); log.debug("DB lock failed", e); return false; } } private void unlock(String key) { lockMap.remove(key); } @TransactionAttribute(TransactionAttributeType.REQUIRED) public void onMessage(Message message) { String key = getClientKey(message); if (tryLock(key)) { try { // handle jms } finally { unlock(key); } } else { // key is locked, forcing redelivery messageDrivenContext.setRollbackOnly(); } } 

类似Lyomi的答案,但使用更灵活的ReentrantLock而不是同步块。

 public class NamedLock { private static final ConcurrentMap<String, Lock> lockByName = new ConcurrentHashMap<String, Lock>(); public static void lock(String key) { Lock lock = new ReentrantLock(); Lock existingLock = lockByName.putIfAbsent(key, lock); if(existingLock != null) { lock = existingLock; } lock.lock(); } public static void unlock(String key) { Lock namedLock = lockByName.get(key); namedLock.unlock(); } } 

是的,这会随着时间的推移而增长 – 但是使用ReentrantLock为从地图上移除locking提供了更大的可能性。 虽然,从地图中移除项目似乎并不是很有用,因为从地图中移除值不会缩小其大小。 一些手动地图大小的逻辑将不得不实施。

记忆考虑

通常,特定密钥所需的同步是短暂的。 保持释放的密钥可能导致过度的内存浪费,使其不可行。

这里有一个实现不在内部保留释放的密钥。

 import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; public class KeyedMutexes<K> { private final ConcurrentMap<K, CountDownLatch> key2Mutex = new ConcurrentHashMap<>(); public void lock(K key) throws InterruptedException { final CountDownLatch ourLock = new CountDownLatch(1); for (;;) { CountDownLatch theirLock = key2Mutex.putIfAbsent(key, ourLock); if (theirLock == null) { return; } theirLock.await(); } } public void unlock(K key) { key2Mutex.remove(key).countDown(); } } 

重新进入和其他花里胡哨

如果一个人想要重入locking语义,他们可以通过将互斥对象包装在一个跟踪locking线程和locking计数的类中来扩展上述范围。

例如:

 private static class Lock { final CountDownLatch mutex = new CountDownLatch(1); final long threadId = Thread.currentThread().getId(); int lockedCount = 1; } 

如果希望lock()返回一个对象以使发布更简单,更安全,那也是一种可能。

综合起来,下面是class级的样子:

 public class KeyedReentrantLocks<K> { private final ConcurrentMap<K, KeyedLock> key2Lock = new ConcurrentHashMap<>(); public KeyedLock acquire(K key) throws InterruptedException { final KeyedLock ourLock = new KeyedLock() { @Override public void close() { if (Thread.currentThread().getId() != threadId) { throw new IllegalStateException("wrong thread"); } if (--lockedCount == 0) { key2Lock.remove(key); mutex.countDown(); } } }; for (;;) { KeyedLock theirLock = key2Lock.putIfAbsent(key, ourLock); if (theirLock == null) { return ourLock; } if (theirLock.threadId == Thread.currentThread().getId()) { theirLock.lockedCount++; return theirLock; } theirLock.mutex.await(); } } public static abstract class KeyedLock implements AutoCloseable { protected final CountDownLatch mutex = new CountDownLatch(1); protected final long threadId = Thread.currentThread().getId(); protected int lockedCount = 1; @Override public abstract void close(); } } 

以下是如何使用它:

 try (KeyedLock lock = locks.acquire("SomeName")) { // do something critical here } 

为了回应使用新的MapMaker()的build议。makeComputingMap()…

MapMaker()。makeComputingMap()由于安全原因已被弃用。 后继者是CacheBuilder。 由于caching键/值应用于CacheBuilder,我们非常接近解决scheme。

问题是CacheBuilder.weakKeys()中的一个注释:

 when this method is used, the resulting cache will use identity (==) comparison to determine equality of keys. 

这使得不可能通过string值select现有的锁。 尔格。

(4年后…)我的答案类似于user2878608的,但我认为在逻辑中有一些缺less边缘情况。 我还以为信号量是一次locking多个资源(虽然我想用它来计算这样的储物柜也不错),所以我使用了一个普通的POJOlocking对象。 我对其进行了一次testing,certificate每个边缘案例都存在IMO,并将在我的项目中使用它。 希望它可以帮助别人。 🙂

 class Lock { int c; // count threads that require this lock so you don't release and acquire needlessly } ConcurrentHashMap<SomeKey, Lock> map = new ConcurrentHashMap<SomeKey, Lock>(); LockManager.acquireLock(String name) { Lock lock = new Lock(); // creating a new one pre-emptively or checking for null first depends on which scenario is more common in your use case lock.c = 0; while( true ) { Lock prevLock = map.putIfAbsent(name, lock); if( prevLock != null ) lock = prevLock; synchronized (lock) { Lock newLock = map.get(name); if( newLock == null ) continue; // handles the edge case where the lock got removed while someone was still waiting on it if( lock != newLock ) { lock = newLock; // re-use the latest lock continue; // handles the edge case where a new lock was acquired and the critical section was entered immediately after releasing the lock but before the current locker entered the sync block } // if we already have a lock if( lock.c > 0 ) { // increase the count of threads that need an offline director lock ++lock.c; return true; // success } else { // safely acquire lock for user try { perNameLockCollection.add(name); // could be a ConcurrentHashMap or other synchronized set, or even an external global cluster lock // success lock.c = 1; return true; } catch( Exception e ) { // failed to acquire lock.c = 0; // this must be set in case any concurrent threads are waiting map.remove(name); // NOTE: this must be the last critical thing that happens in the sync block! } } } } } LockManager.releaseLock(String name) { // unlock // if this was the last hold on the lock, remove it from the cache Lock lock = null; // creating a new one pre-emptively or checking for null first depends on which scenario is more common in your use case while( true ) { lock = map.get(name); if( lock == null ) { // SHOULD never happen log.Error("found missing lock! perhaps a releaseLock call without corresponding acquireLock call?! name:"+name); lock = new Lock(); lock.c = 1; Lock prevLock = map.putIfAbsent(name, lock); if( prevLock != null ) lock = prevLock; } synchronized (lock) { Lock newLock = map.get(name); if( newLock == null ) continue; // handles the edge case where the lock got removed while someone was still waiting on it if( lock != newLock ) { lock = newLock; // re-use the latest lock continue; // handles the edge case where a new lock was acquired and the critical section was entered immediately after releasing the lock but before the current locker entered the sync block } // if we are not the last locker if( lock.c > 1 ) { // decrease the count of threads that need an offline director lock --lock.c; return true; // success } else { // safely release lock for user try { perNameLockCollection.remove(name); // could be a ConcurrentHashMap or other synchronized set, or even an external global cluster lock // success lock.c = 0; // this must be set in case any concurrent threads are waiting map.remove(name); // NOTE: this must be the last critical thing that happens in the sync block! return true; } catch( Exception e ) { // failed to release log.Error("unable to release lock! name:"+name); lock.c = 1; return false; } } } } } 

我创build了一个基于McDowell的IdMutexProvider的tokenProvider 。 经理使用WeakHashMap ,负责清理未使用的锁。

TokenManager:

 /** * Token provider used to get a {@link Mutex} object which is used to get exclusive access to a given TOKEN. * Because WeakHashMap is internally used, Mutex administration is automatically cleaned up when * the Mutex is no longer is use by any thread. * * <pre> * Usage: * private final TokenMutexProvider&lt;String&gt; myTokenProvider = new TokenMutexProvider&lt;String&gt;(); * * Mutex mutex = myTokenProvider.getMutex("123456"); * synchronized (mutex) { * // your code here * } * </pre> * * Class inspired by McDowell. * url: http://illegalargumentexception.blogspot.nl/2008/04/java-synchronizing-on-transient-id.html * * @param <TOKEN> type of token. It is important that the equals method of that Object return true * for objects of different instances but with the same 'identity'. (see {@link WeakHashMap}).<br> * Eg * <pre> * String key1 = "1"; * String key1b = new String("1"); * key1.equals(key1b) == true; * * or * Integer key1 = 1; * Integer key1b = new Integer(1); * key1.equals(key1b) == true; * </pre> */ public class TokenMutexProvider<TOKEN> { private final Map<Mutex, WeakReference<Mutex>> mutexMap = new WeakHashMap<Mutex, WeakReference<Mutex>>(); /** * Get a {@link Mutex} for the given (non-null) token. */ public Mutex getMutex(TOKEN token) { if (token==null) { throw new NullPointerException(); } Mutex key = new MutexImpl(token); synchronized (mutexMap) { WeakReference<Mutex> ref = mutexMap.get(key); if (ref==null) { mutexMap.put(key, new WeakReference<Mutex>(key)); return key; } Mutex mutex = ref.get(); if (mutex==null) { mutexMap.put(key, new WeakReference<Mutex>(key)); return key; } return mutex; } } public int size() { synchronized (mutexMap) { return mutexMap.size(); } } /** * Mutex for acquiring exclusive access to a token. */ public static interface Mutex {} private class MutexImpl implements Mutex { private final TOKEN token; protected MutexImpl(TOKEN token) { this.token = token; } @Override public boolean equals(Object other) { if (other==null) { return false; } if (getClass()==other.getClass()) { TOKEN otherToken = ((MutexImpl)other).token; return token.equals(otherToken); } return false; } @Override public int hashCode() { return token.hashCode(); } } } 

用法:

 private final TokenMutexManager<String> myTokenManager = new TokenMutexManager<String>(); Mutex mutex = myTokenManager.getMutex("UUID_123456"); synchronized(mutex) { // your code here } 

或者宁可使用整数?

 private final TokenMutexManager<Integer> myTokenManager = new TokenMutexManager<Integer>(); Mutex mutex = myTokenManager.getMutex(123456); synchronized(mutex) { // your code here } 

你对每种情况的共享锁对象的静态库的想法是正确的。
你不需要caching本身来同步…它可以像一个哈希映射一样简单。

线程可以同时从地图上获取locking对象。 实际的同步逻辑应该分别封装在每个这样的对象中(请参阅java.util.concurrent包 – http://download.oracle.com/javase/6/docs/api/java/util/concurrent/locks/ package-summary.html )

TreeMap因为在内部数组的HashMap大小只能增加

 public class Locker<T> { private final Object lock = new Object(); private final Map<T, Value> map = new TreeMap<T, Value>(); public Value<T> lock(T id) { Value r; synchronized (lock) { if (!map.containsKey(id)) { Value value = new Value(); value.id = id; value.count = 0; value.lock = new ReentrantLock(); map.put(id, value); } r = map.get(id); r.count++; } r.lock.lock(); return r; } public void unlock(Value<T> r) { r.lock.unlock(); synchronized (lock) { r.count--; if (r.count == 0) map.remove(r.id); } } public static class Value<T> { private Lock lock; private long count; private T id; } }