源码分析:同步基础框架——AbstractQueuedSynchronizer(AQS) (4)

释放锁储层,唤醒头结点(当前获得锁的节点)的后继节点:unparkSuccessor(h);
找到头结点的后继节点中第一个没有被取消的节点,并唤醒该节点所处线程

基于AQS实现自己的共享锁

设计一个同步器,在同一时刻,只允许最多两个线程能够并行访问,超过限制的其他线程将进入阻塞状态。

这个功能和 Semaphore 的功能很相似,这个学会了,以后看 Semaphore 的源码也就很简单了。

实现思路:

可以利用AQS 的API tryAcquireShared 实现获得共享锁,定义一个状态,允许的范围为【0,1,2】,状态为2代表新的线程进入的时候需要阻塞等待

public class MyAqsSharedLock{ // 定义最大共享值 private final int maxSharedValue = 2; // 同步器 private final Sync sync; MyAqsSharedLock(){ // 构造方法初始化同步器 sync = new Sync(); } // 基于AQS实现的同步器 class Sync extends AbstractQueuedSynchronizer{ @Override protected int tryAcquireShared(int arg){ // 为什么要自旋呢?因为可能满足state的条件,但是CAS修改失败 while(true){ int state = getState(); // 检查同步状态是否达到最大值 if(state >= maxSharedValue){ // 返回-1 表示没有获得锁 return -1; } // CAS 修改同步状态 if(compareAndSetState(state,state + arg)){ // 修改成功,表示获得了锁,大于等于0表示获得了锁 return getState(); } } } @Override protected boolean tryReleaseShared(int arg){ // 为什么要自旋呢?因为可能满足state的条件,但是CAS修改失败 while(true){ int state = getState(); // CAS 修改同步状态,修改成功返回true,失败继续自旋 if(compareAndSetState(state,state - arg)){ return true; } } } } /** 加锁 */ public void lock(){ sync.acquireShared(1); } /** 解锁 */ public void unLock(){ sync.releaseShared(1); } }

测试方法:

5个线程循环打印输出线程名和当前时间

public static void main(String[] args){ MyAqsSharedLock lock = new MyAqsSharedLock(); IntStream.range(0,5).forEach(i -> new Thread(new Runnable(){ @SneakyThrows @Override public void run(){ while(true){ lock.lock(); try{ System.out.println(Thread.currentThread().getName()+":执行。。。时间:"+ LocalDateTime.now()); TimeUnit.SECONDS.sleep(2); }finally{ lock.unLock(); TimeUnit.SECONDS.sleep(1); } } } },"T"+i).start()); }

输出结果示例:

T0:执行。。。时间:2020-10-30T17:45:45.117 T1:执行。。。时间:2020-10-30T17:45:45.117 T3:执行。。。时间:2020-10-30T17:45:47.118 T2:执行。。。时间:2020-10-30T17:45:47.118 T1:执行。。。时间:2020-10-30T17:45:49.119 T4:执行。。。时间:2020-10-30T17:45:49.119

会发现几乎在同一时间最多只有2个线程在打印输出,满足我们的要求。

代码实现分析 获得共享锁:public void lock()

共享锁的 lock() 调用的是sync.acquireShared(1);acquireShared也在AQS里面,同样被final修饰作为基础框架逻辑部分,不允许被继承,源码展示:

public final void acquireShared(int arg) { // tryAcquireShared 是我们自己实现的逻辑,返回-1,表示没有获得锁 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } // 没有获得共享,再次尝试获得锁,和排他模式的acquireQueued方法非常相似 private void doAcquireShared(int arg) { // 新建节点,加入到队列,和排他锁模式一样的入队逻辑 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { // 当前节点的前驱节点 final Node p = node.predecessor(); if (p == head) { // 前驱节点是头结点,说明轮到咱获得锁了 // 继续调用我们自己的逻辑,CAS 获得锁 int r = tryAcquireShared(arg); // 这里再次印证了,我们的tryAcquireShared返回值定义,负值是没有获得锁,>=0 表示成功获得锁 if (r >= 0) { // 设置新的头结点,如果后面的排队节点是共享模式的节点,直接唤醒它 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) // 中断当前线程 selfInterrupt(); failed = false; return; } } // 到这了,说明要么没有排队到当前线程,要么CAS获取锁失败,那就只有阻塞线程了 // shouldParkAfterFailedAcquire 如果线程应阻塞,则返回true // parkAndCheckInterrupt 阻塞当前线程admol, 具体实现分析可以看上面lock的分析 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 线程被取消,摘掉节点 if (failed) cancelAcquire(node); } }

获取共享锁总结:

尝试获得锁,方法:tryAcquireShared

获得锁成功,直接返回;获得锁失败,继续执行下面逻辑;

再次尝试获得锁,方法:doAcquireShared

新建排队节点,并加入到同步队列,方法:addWaiter,逻辑和获得排它锁的一致

自旋(尝试获得锁,阻塞线程,等待被唤醒),直到成功获得锁

释放共享锁:public void unLock()

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wssyjz.html