Fork me on GitHub
0%

Java 细粒度锁续篇

在上篇文章中大概介绍了 Java 中细粒度锁的几种实现方式,并且针对每种方式都做了优缺点说明,在使用的时候就需要根据业务需求选择更合适的一种。上篇文章中的最后一种弱引用锁的实现方式,我在里面也说了其实还有更优雅的实现,其实也算不上更优雅,只是看起来更优雅,原理还是一样的,今天我打算用一篇文章的篇幅来好好说下。

首先,我们来再次回顾一下,这里为什么可以利用弱引用的特性拿掉分段锁呢?分段锁在这里主要是为了保证每次在创建和移除锁时的线程安全,而采用了弱引用之后,我们不需要每次创建之后都进行移除,因为当弱引用指向的对象引用被释放之后 Java 会在下一次的 GC 将这弱引用指向的对象回收掉,在经过 GC 之后,当弱引用指向的对象被回收时,弱引用将会进入创建时指定的队列,然后我们通过队列中的值来将这些存放在 Map 中的弱引用移除掉,所以我们才能够顺利的拿掉分段锁。

WeakHashMap

你注意看弱引用锁的代码实现,里面在我们获取锁的时候有个手动去清理 Map 中被回收的锁的过程,如果你看过之前的 谈谈 Java 中的各种引用类型 这篇文章的话,你应该知道 Java 提供了一个 WeakHashMap 类,他是使用弱引用作为 key,它在 GC 决定将弱引用所指向的 key 对象回收之后,会将当前保存的 entry 也自动移除,这个是怎么实现的呢?

其实原理也是一样的,利用弱引用指向的对象被回收时,弱引用将会进入创建时指定的队列这一特性,然后通过轮询队列来移除元素。只不过将移除的操作完全包裹在 WeakHashMap 类里面了,你可以看到里面所有的 public 的增删改查方法都直接或间接调用了expuntgeStaleEntries() 方法,而 expuntgeStaleEntries 方法中就是在轮询队列移除被回收的 key 所对应的元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void expungeStaleEntries() {
for (Object x; (x = queue.poll()) != null; ) {
synchronized (queue) {
@SuppressWarnings("unchecked")
Entry<K,V> e = (Entry<K,V>) x;
int i = indexFor(e.hash, table.length);

Entry<K,V> prev = table[i];
Entry<K,V> p = prev;
while (p != null) {
Entry<K,V> next = p.next;
if (p == e) {
if (prev == e)
table[i] = next;
else
prev.next = next;
// Must not null out e.next;
// stale entries may be in use by a HashIterator
e.value = null; // Help GC
size--;
break;
}
prev = p;
p = next;
}
}
}
}

既然 Java 已经给我们提供了相应功能的类,那我们是不是可以在弱引用锁的实现中直接使用 WeakHashMap 呢?这样我们就不用在获取锁的时候做手动移除的操作了,WeakHashMap 内部已经帮我们做了。

但如果你稍微看一下 WeakHashMap 类的描述就能发现他不是线程安全的,在该类里面有这样一段描述:

1
Like most collection classes, this class is not synchronized. A synchronized {@code WeakHashMap} may be constructed using the {@link Collections#synchronizedMap Collections.synchronizedMap} method.

正因为如此,在弱引用的实现中才采用 ConcurrentHashMap 来保存锁,只不过 ConcurrentHashMap 类没有提供弱引用的实现,也就没有提供自动为我们移除元素的功能,所以才会在获取锁的时候做一个移除元素的操作,相信看到这里你应该大概明白了使用弱引用作为 key 的 WeakHashMap 是怎么做到当弱引用被回收的时候自动把对应的元素给移除了。

那如果说按照上面描述里面所说的通过 Collections 工具类的 synchronizedMap 方法来实现线程安全呢?先来看代码实现:

1
2
3
4
5
6
7
8
9
public class WeakHashLock<T> {

public final Map<T, WeakReference<ReentrantLock>> weakHashMap =
Collections.synchronizedMap(new WeakHashMap<>());

public ReentrantLock get(T key){
return this.weakHashMap.computeIfAbsent(key, lock -> new WeakReference<>(new ReentrantLock())).get();
}
}

上面代码中 WeakHashLock 类中只有一个 get 方法根据 key 获取锁对象,不存在的话创建一个新的锁对象返回,看起来是不是很简单,但不幸的是通过 Collections 工具类的 synchronizedMap 方法来实现的线程安全方式性能不是很好,为什么这么说呢,我们可以看下 synchronizedMap 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// synchronizedMap 方法实现
public static <K,V> Map<K,V> synchronizedMap(Map<K,V> m) {
return new SynchronizedMap<>(m);
}

// SynchronizedMap 类构造方法
SynchronizedMap(Map<K,V> m) {
this.m = Objects.requireNonNull(m);
mutex = this;
}

SynchronizedMap(Map<K,V> m, Object mutex) {
this.m = m;
this.mutex = mutex;
}

public int size() {
synchronized (mutex) {return m.size();}
}
public boolean isEmpty() {
synchronized (mutex) {return m.isEmpty();}
}
public V get(Object key) {
synchronized (mutex) {return m.get(key);}
}
public V put(K key, V value) {
synchronized (mutex) {return m.put(key, value);}
}
public V remove(Object key) {
synchronized (mutex) {return m.remove(key);}
}

从代码实现可以看出,synchronizedMap 方法会创建一个SynchronizedMap 实例返回,在该实例的构造方法中将自己赋值给用来同步的对象,然后 SynchronizedMap 类中的方法都使用该同步的对象进行同步,以致于我们做的每一个操作都需要进行同步,其实就相当于给 WeakHashMap 类中实例方法都加上了 synchronized 关键字,这种实现方式性能难免会大打折扣。

ConcurrentReferenceHashMap

这种方式不可取的原因主要是因为 WeakHashMap 不是线程安全的,那有没有线程安全的并且实现了弱引用来保存元素的 Map 呢?当然上篇文章中的实现是一种方式,那如果也想像 WeakHashMap 一样将这些移除的操作完全封装到 Map 类里面呢。我们可以看下 org.springframework.util 包下的 ConcurrentReferenceHashMap 类,该类就很好的实现了我们想要的效果,在该类的描述中就提到了这样一段话:

1
This class can be used as an alternative to {@code Collections.synchronizedMap(new WeakHashMap<K, Reference<V>>())} in order to support better performance when accessed concurrently. This implementation follows the same design constraints as {@link ConcurrentHashMap} with the exception that {@code null} values and {@code null} keys are supported.

从描述中可以看到 ConcurrentReferenceHashMap 类可以用来替代使用 synchronizedMap 方法保证线程安全的 WeakHashMap 类,以便在并发访问时提供更好的性能。那就来看下采用 ConcurrentReferenceHashMap 类的实现方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class WeakHashLock<T> {
private static final int DEFAULT_INITIAL_CAPACITY = 16;
private static final float DEFAULT_LOAD_FACTOR = 0.75f;
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
private static final ConcurrentReferenceHashMap.ReferenceType DEFAULT_REFERENCE_TYPE =
ConcurrentReferenceHashMap.ReferenceType.WEAK;

private final ConcurrentReferenceHashMap<T, ReentrantLock> referenceHashMap;

/**
* Create mutex factory with default settings.
*/
public WeakHashLock() {
this.referenceHashMap = new ConcurrentReferenceHashMap<>(DEFAULT_INITIAL_CAPACITY,
DEFAULT_LOAD_FACTOR,
DEFAULT_CONCURRENCY_LEVEL,
DEFAULT_REFERENCE_TYPE);
}

public WeakHashLock(int concurrencyLevel,
ConcurrentReferenceHashMap.ReferenceType referenceType) {
this.referenceHashMap = new ConcurrentReferenceHashMap<>(DEFAULT_INITIAL_CAPACITY,
DEFAULT_LOAD_FACTOR,
concurrencyLevel,
referenceType);
}

public ReentrantLock get(T key) {
return this.referenceHashMap.computeIfAbsent(key, lock -> new ReentrantLock());
}

}

上面代码实现同样非常简单,相比上面 WeakHashMap 的方式多了两个构造方法而已,但不同于使用 synchronizedMap 方法来保证线程安全的方式,性能会提高很多。如果你感兴趣的话可以去看下这个类的内部实现,原理都是利用了弱引用的特性,只不过实现方式有点不同而已。

这里我想要提醒两点,一个是 ConcurrentReferenceHashMap 中默认的引用类型是软引用。

1
private static final ReferenceType DEFAULT_REFERENCE_TYPE = ReferenceType.SOFT;

另外一个要注意的是 ConcurrentReferenceHashMap 中有的方法返回的结果是 GC 之后但还没有清理被回收元素之前的结果,什么意思呢,我们来看一个示例:

1
2
3
4
5
6
7
8
9
10
11
ConcurrentReferenceHashMap<String, String> referenceHashMap = new ConcurrentReferenceHashMap<>(16, 0.75f, 1, ConcurrentReferenceHashMap.ReferenceType.WEAK);
referenceHashMap.put("key", "value");
// 经过 GC 标记之后,弱引用已经进入创建时指定的队列中,这时可以去轮询队列移除元素了
System.gc();
// isEmpty 和 size 方法返回的结果是还没有移除元素的结果
System.out.println(referenceHashMap.isEmpty()); // false
System.out.println(referenceHashMap.size()); // 1
// get 方法中调用了移除元素的方法
System.out.println(referenceHashMap.get("key")); // null
System.out.println(referenceHashMap.isEmpty()); // true
System.out.println(referenceHashMap.size()); // 0

上面测试结果可以看到,在 GC 标记之后调用 isEmpty 和 size 方法得到的返回结果都表明集合中是还有元素,而调用 get 方法得到的却是个 null,然后再调用 isEmpty 和 size 方法得到的结果表示集合为空,这其实是因为前面两个方法里面没有做移除元素的操作,而 get 方法是先做了一次移除元素然后再去获取值,这里提醒下这个细节问题,避免以为 ConcurrentReferenceHashMap 没有实现移除元素的功能。

好了,上面都是利用弱引用特性再配合 ReentrantLock 实现了细粒度锁,这里就再顺便看下利用弱引用特性配合 synchronized 关键字的实现方式吧。同样,原理是一样,只不过从 ReentrantLock 再回到 synchronized,前面说了这么多的原理,就不再赘述了,直接看代码实现吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 用于同步的对象
public class Mutex<T> {

private final T key;

public Mutex(T key) {
this.key = key;
}

public static <T> Mutex<T> of(T key) {
return new Mutex<>(key);
}

public T getKey() {
return key;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Mutex<?> xMutex = (Mutex<?>) o;
return Objects.equals(key, xMutex.key);
}

@Override
public int hashCode() {
return Objects.hash(key);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class MutexFactory<T> {

private static final int DEFAULT_INITIAL_CAPACITY = 16;
private static final float DEFAULT_LOAD_FACTOR = 0.75f;
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
private static final ConcurrentReferenceHashMap.ReferenceType DEFAULT_REFERENCE_TYPE =
ConcurrentReferenceHashMap.ReferenceType.WEAK;

private final ConcurrentReferenceHashMap<T, Mutex<T>> referenceHashMap;

public MutexFactory() {
this.referenceHashMap = new ConcurrentReferenceHashMap<>(DEFAULT_INITIAL_CAPACITY,
DEFAULT_LOAD_FACTOR,
DEFAULT_CONCURRENCY_LEVEL,
DEFAULT_REFERENCE_TYPE);
}

public MutexFactory(int concurrencyLevel,
ConcurrentReferenceHashMap.ReferenceType referenceType) {
this.referenceHashMap = new ConcurrentReferenceHashMap<>(DEFAULT_INITIAL_CAPACITY,
DEFAULT_LOAD_FACTOR,
concurrencyLevel,
referenceType);
}

public Mutex<T> getMutex(T key) {
return this.referenceHashMap.computeIfAbsent(key, Mutex::new);
}
// 提供强制移除已经被回收的弱引用元素
public void purgeUnreferenced() {
this.referenceHashMap.purgeUnreferencedEntries();
}
}

由于我们一般实现的细粒度基本上是基于用户或者其他的需要同步的对象,上面是通过构建一个互斥对象作为 ConcurrentReferenceHashMap 的 value,然后我们就可以使用 synchronized 关键字来锁定该 value 对象达到同步的效果,使用方式如下:

1
2
3
4
5
6
MutexFactory<String> mutexFactory = new MutexFactory<>();
public void save(String userId) throws InterruptedException {
synchronized (mutexFactory.getMutex(userId)){
// do something
}
}

这种同步方式业务代码看起来简单些,对于一些简单的需求就可以直接使用这种方式,当然如果需要提供 API 级别的加锁方式或者需要构建带条件的加锁方式那还是使用 ReentrantLock。

对于加锁这一块虽然说了这么多,也许你已经打算采用这些方式去实现你想要的效果了,可是呢随着微服务大行其道,一个系统往往启动了好几个实例,每个实例对应一个 JVM 虚拟机,而我们前面说的这些都是在只有一个虚拟机的前提下才有用,这就意味着我们前面说的这些加锁方式基本上已经派不上用场了。

那随之而来的解决方案就是我们经常听到并且感觉很高大上,却很少用到的分布式锁了,这一块我虽然使用过,也去查阅过相关资料,但我自认为没有完全真正掌握底层的原理,还需要进一步的实践,只好再找机会整理整理后再输出了。

 wechat
扫描上面图中二维码关注微信公众号