Java并发(6)- CountDownLatch、Semaphore与AQS (4)

这一段代码和ReentrantLock中的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法基本一样,说下两个不同的地方。一是加入等待队列时这里加入的是Node.SHARED类型的节点。二是获取锁成功后会通知下一个节点,也就是唤醒下一个线程。以旋转寿司店的例子为例,前面同时走了5个客人,空余5个座位,一家3口坐进去之后会告诉后面的一对情侣,让他们也坐进去,这样就达到了共享的目的。shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法在上一篇文章中都有详细说明,这里就做解释了。

再来看看releaseShared方法时怎么释放信号量的,首先调用tryReleaseShared来尝试释放信号量,释放成功后调用doReleaseShared来判断是否需要唤醒后继线程:

protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow //释放信号量过多 throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) //cas操作设置新的信号量 return true; } } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //SIGNAL状态下唤醒后继节点 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); //唤醒后继节点 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }

释放的逻辑很好理解,相比ReentrantLock只是在state的数量上有点差别。

3.3 CountDownLatch源码分析

CountDownLatch相比Semaphore在实现逻辑上要简单的多,同时他也没有公平和非公平的区别,因为当计数器达到0的时候,所有等待的线程都会释放,不为0的时候,所有等待的线程都会阻塞。直接来看看CountDownLatch的两个核心方法await和countDown。

public void await() throws InterruptedException { //和Semaphore的不同在于参数为1,其实这个参数对CountDownLatch来说没什么意义,因为后面CountDownLatch的tryAcquireShared实现是通过getState() == 0来判断的 sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { //这里加入了一个等待超时控制,超过时间后直接返回false执行后面的代码,不会长时间阻塞 return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() { sync.releaseShared(1); //每次释放1个计数 } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //尝试获取arg个信号量 doAcquireSharedInterruptibly(arg); //获取信号量失败时排队挂起 } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; //奠定了同时获取锁的基础,无论State初始为多少,只能计数等于0时触发 }

和Semaphore区别有两个,一是State每次只减少1,同时只有为0时才释放所有等待线程。二是提供了一个超时等待方法。acquireSharedInterruptibly方法跟Semaphore一样,就不细说了,这里重点说下tryAcquireSharedNanos方法。

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } //最小自旋时间 static final long spinForTimeoutThreshold = 1000L; private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; //计算了一个deadline final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) //超时后直接返回false,继续执行 return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) //大于最小cas操作时间则挂起线程 LockSupport.parkNanos(this, nanosTimeout); //挂起线程也有超时限制 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

重点看标了注释的几行代码,首先计算了一个超时时间,当超时后直接退出等待,继续执行。如果未超时并且大于最小的cas操作时间,这里定义的是1000ns,则挂起,同时挂起操作也有超时限制。这样就实现了超时等待。

4.总结

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzspzz.html