AQS是并发编程中非常重要的概念,它是juc包下的许多并发工具类,如CountdownLatch,CyclicBarrier,Semaphore 和锁, 如ReentrantLock, ReaderWriterLock的实现基础,提供了一个基于int状态码和队列来实现的并发框架。本文将对AQS框架的几个重要组成进行简要介绍,读完本文你将get到以下几个点:
AQS进行并发控制的机制是什么
共享模式和独占模式获取和释放同步状态的详细过程
基于AQS框架实现一个简易的互斥锁
一,AQS基本概念
AQS(AbstractQueuedSynchronizer)是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量来表示状态,通过内置的FIFO(first in,first out)队列来完成资源获取线程的排队工作。
1.1 同步状态
AQS中维持一个全局的int状态码(state),线程通过修改(加/减指定的数量)码是否成功来决定当前线程是否成功获取到同步状态。
1.1 独占or共享模式
AQS支持两种获取同步状态的模式既独占式和共享式。顾名思义,独占式模式同一时刻只允许一个线程获取同步状态,而共享模式则允许多个线程同时获取。

1.2 同步队列
同步队列(一个FIFO双向队列)是AQS的核心,用来完成同步状态的管理,当线程获取同步状态失败时,AQS会将当前线程以及等待状态等信息构造成一个节点并加入到同步队列,同时会阻塞当前线程。

二,独占模式获取与释放状态
独占模式既同一时间只能由一个线程持有同步状态。当多个线程竞争时(acquire),获取到同步状态的线程会将当前线程赋值给Thread exclusiveOwnerThread属性(AQS父类中)来标记当前状态被线程独占。其他线程将被构造成Node加入到同步队列中。当线程l
2.1 获取同步状态
/**
* 获取同步状态
*/
public final void acquire(int arg) {
/**
* 1. tryAcquire 尝试获取同步状态;
* 2.1 addWaiter
如果尝试获取到同步状态失败,则加入到同步队列中;
* 2.2 acquireQueued 在队列中尝试获取同步状态.
*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
多线程并发获取(修改)同步状态, 修改同步状态成功的线程标记为拥有同步状态

/**
* 尝试获取同步状态【子类中实现】,因为aqs基于模板模式,仅提供基于状态和同步队列的实
* 现框架,具体的实现逻辑由子类决定
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// a. 尝试修改状态值操作执行成功
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// b. 修改状态值成功,记录当前持有同步状态的线程信息
setExclusiveOwnerThread(current);
return true;
}
// 如果当前线程已经持有同步状态,继续修改同步状态【重入锁实现原理,不理解可以先忽略】
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
获取失败的线程,加入到同步队列的队尾;加入到队列中后,如果当前节点的前驱节点为头节点再次尝试获取同步状态(下文代码:p == head && tryAcquire(arg))。

/**
* 没有获取到同步状态的线程加入到队尾部
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试用最快的方式入队,如果入队失败,再走完整的入队方法
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 将当前线程设置到队尾
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 正常的入队方法
enq(node);
return node;
}
/**
* 同步队列中节点,尝试获取同步状态
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋(死循环)
for (;;) {
// 只有当前节点的前驱节点是头节点时才会尝试执行获取同步状态操作
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);// 注意: 此处重点, 当前节点设置为头节点,相当于头节点出队
p.next = null; // help GC
failed = false;
return interrupted;
}
// 获取失败后是否进入wait
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
如果头节点的下一个节点尝试获取同步状态失败后,会进入等待状态;其他节点则继续自旋。