如,我了解到AQS是通过一个state来同时实现独占锁与共享锁的持有数量。那么我就在JarryAQS中,去尝试实现,从而进一步理解它。
五,简易JUC(版本X-扩展state): 1.JarryAQS: package tech.jarry.learning.netease.locks7; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; /** * @Description: * @Author: jarry */ public class JarryReadWriteLock implements ReadWriteLock { // 实际源码,是通过Sync类,继承AQS,再进行Override的。 private JarryAQS jarryAQS = new JarryAQS(){ @Override public boolean tryLock(int acquires){ int stateTemp = state; if (sharedCount(stateTemp) == 0){ int writeCountValue = exclusiveCount(stateTemp); if (writeCountValue == 0){ if (compareAndSetState(stateTemp,stateTemp+acquires)){ owner.set(Thread.currentThread()); return true; } } else { if (Thread.currentThread() == owner.get()){ compareAndSetState(stateTemp,stateTemp+acquires); return true; } } } return false; } @Override public boolean tryUnlock(int releases) { int stateTemp = state; if (owner.get() != Thread.currentThread()){ throw new IllegalMonitorStateException(); } int writeCountNextValue = exclusiveCount(stateTemp) - releases; boolean result = false; if (writeCountNextValue == 0){ result = true; owner.set(null); } compareAndSetState(stateTemp,stateTemp - releases); return result; } @Override public boolean tryLockShared(int acquires) { while (true){ int stateTemp = state; if (exclusiveCount(stateTemp) == 0 || owner.get() == Thread.currentThread()){ if (compareAndSetState(stateTemp, stateTemp+SHARED_UNIT*acquires)){ return true; } } return false; } } @Override public boolean tryUnLockShared(int releases) { while (true){ int stateTemp = state; int readCountValue = sharedCount(stateTemp); int readCountNext = readCountValue - releases; if (compareAndSetState(stateTemp, stateTemp-SHARED_UNIT*readCountNext)){ return readCountNext == 0; } } } }; /** * 获取独占锁(针对独占锁) */ public void lock(){ jarryAQS.lock(); } /** * 解锁(针对独占锁) */ public void unlock(){ jarryAQS.unlock(); } /** * 尝试获取独占锁(针对独占锁) * @param acquires 用于加锁次数。一般传入waitNode.arg(本代码中就是1。为什么不用一个常量1,就不知道了?) * @return */ public boolean tryLock(int acquires){ return jarryAQS.tryLock(acquires); } /** * 尝试解锁(针对独占锁) * @param releases 用于设定解锁次数。一般传入waitNode.arg * @return */ public boolean tryUnlock(int releases){ return jarryAQS.tryUnlock(releases); } /** * 获取共享锁(针对共享锁) */ public void lockShared(){ jarryAQS.lockShared(); } /** * 解锁(针对共享锁) */ public boolean unLockShared(){ return jarryAQS.unLockShared(); } /** * 尝试获取共享锁(针对共享锁) * @param acquires * @return */ public boolean tryLockShared(int acquires){ return tryLockShared(acquires); } /** * 尝试解锁(针对共享锁) * @param releases * @return */ public boolean tryUnLockShared(int releases){ return jarryAQS.tryUnLockShared(releases); } @Override public Lock readLock() { return new Lock() { @Override public void lock() { jarryAQS.lockShared(); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return jarryAQS.tryLockShared(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void unlock() { jarryAQS.unLockShared(); } @Override public Condition newCondition() { return null; } }; } @Override public Lock writeLock() { return new Lock() { @Override public void lock() { jarryAQS.lock(); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return jarryAQS.tryLock(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void unlock() { jarryAQS.unlock(); } @Override public Condition newCondition() { return null; } }; } } 2.JarryReentrantLock: package tech.jarry.learning.netease.locks7; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * @Description: 仿ReentrantLock,实现其基本功能及特性 * @Author: jarry */ public class JarryReentrantLock implements Lock { private boolean isFair; // 默认采用非公平锁,保证效率(就是参照源码) public JarryReentrantLock() { this.isFair = false; } public JarryReentrantLock(boolean isFair) { this.isFair = isFair; } // 实际源码,是通过Sync类,继承AQS,再进行Override的。 private JarryAQS jarryAQS = new JarryAQS(){ @Override // 源码中,则是将FairSync与NonFairSync作为两个单独内布类(extend Sync),来实现的。那样会更加优雅,耦合度更低,扩展性更好(而且实际源码,需要重写的部分也会更多,而不像这个自定义demo,只有一个tryLock方法需要重写) public boolean tryLock(int acquires){ if (isFair){ return tryFairLock(acquires); } else { return tryNonFairLock(acquires); } } private boolean tryFairLock(int acquires){ // 这里简单注释一下,如何实现公平锁,其关键在于新的线程到来时,不再直接尝试获取锁,而是直接塞入队列(队列为空,也是殊途同归的) // 1.判断读锁(共享锁)是否被占用 if (readCount.get() == 0){ // 2.判断写锁(独占锁)是否被占用 int writeCountValue = writeCount.get(); if (writeCountValue == 0){ // 2.1 (核心区别)如果写锁未被占用,需要先对等待队列waiters进行判断 WaitNode head = waiters.peek(); if (head !=null && head.thread == Thread.currentThread()){ if (writeCount.compareAndSet(writeCountValue,writeCountValue+acquires)){ owner.set(Thread.currentThread()); return true; } // 竞争失败就直接返回false了 } } else { // 2.2 如果写锁已经被占用了,就判断是否为当前线程持有,是否进行重入操作 if (owner.get() == Thread.currentThread()){ // 如果持有独占锁的线程就是当前线程,那么不需要改变owner,也不需要CAS,只需要修改writeCount的值即可 writeCount.set(writeCountValue + acquires); return true; } } } // 以上操作失败,就返回false,表示竞争锁失败 return false; } private boolean tryNonFairLock(int acquires){ if (readCount.get() == 0){ int writeCountValue = writeCount.get(); if (writeCountValue == 0){ if (writeCount.compareAndSet(writeCountValue,writeCountValue+acquires)){ owner.set(Thread.currentThread()); return true; } } else { if (Thread.currentThread() == owner.get()){ writeCount.set(writeCountValue+acquires); return true; } } } return false; } @Override /** * 先通过临时变量c,判断是否接下来的操作会完全解锁。 如果完全解锁,先释放owner,再通过setState将count(源码中为state)修改为0。 这样调换了一下顺序,但是避免了owner的原子性问题(毕竟别的线程是通过state来判断是否可以竞争锁,修改owner的)。 */ public boolean tryUnlock(int releases) { if (owner.get() != Thread.currentThread()){ throw new IllegalMonitorStateException(); } int writeCountNextValue = writeCount.get() - releases; boolean result = false; if (writeCountNextValue == 0){ result = true; owner.set(null); } writeCount.set(writeCountNextValue); return result; } // 其它诸如共享锁的相关操作,就不进行了。如果强行调用,只会发生UnsupportedOperationException }; @Override public void lock() { jarryAQS.lock(); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return jarryAQS.tryLock(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void unlock() { jarryAQS.unlock(); } @Override public Condition newCondition() { return null; } } 3.JarryReadWriteLock: package tech.jarry.learning.netease.locks7; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; /** * @Description: * @Author: jarry */ public class JarryReadWriteLock implements ReadWriteLock { // 实际源码,是通过Sync类,继承AQS,再进行Override的。 private JarryAQS jarryAQS = new JarryAQS(){ @Override public boolean tryLock(int acquires){ int stateTemp = state; if (sharedCount(stateTemp) == 0){ int writeCountValue = exclusiveCount(stateTemp); if (writeCountValue == 0){ if (compareAndSetState(stateTemp,stateTemp+acquires)){ owner.set(Thread.currentThread()); return true; } } else { if (Thread.currentThread() == owner.get()){ compareAndSetState(stateTemp,stateTemp+acquires); return true; } } } return false; } @Override public boolean tryUnlock(int releases) { int stateTemp = state; if (owner.get() != Thread.currentThread()){ throw new IllegalMonitorStateException(); } int writeCountNextValue = exclusiveCount(stateTemp) - releases; boolean result = false; if (writeCountNextValue == 0){ result = true; owner.set(null); } compareAndSetState(stateTemp,stateTemp - releases); return result; } @Override public boolean tryLockShared(int acquires) { while (true){ int stateTemp = state; if (exclusiveCount(stateTemp) == 0 || owner.get() == Thread.currentThread()){ if (compareAndSetState(stateTemp, stateTemp+SHARED_UNIT*acquires)){ return true; } } return false; } } @Override public boolean tryUnLockShared(int releases) { while (true){ int stateTemp = state; int readCountValue = sharedCount(stateTemp); int readCountNext = readCountValue - releases; if (compareAndSetState(stateTemp, stateTemp-SHARED_UNIT*readCountNext)){ return readCountNext == 0; } } } }; /** * 获取独占锁(针对独占锁) */ public void lock(){ jarryAQS.lock(); } /** * 解锁(针对独占锁) */ public void unlock(){ jarryAQS.unlock(); } /** * 尝试获取独占锁(针对独占锁) * @param acquires 用于加锁次数。一般传入waitNode.arg(本代码中就是1。为什么不用一个常量1,就不知道了?) * @return */ public boolean tryLock(int acquires){ return jarryAQS.tryLock(acquires); } /** * 尝试解锁(针对独占锁) * @param releases 用于设定解锁次数。一般传入waitNode.arg * @return */ public boolean tryUnlock(int releases){ return jarryAQS.tryUnlock(releases); } /** * 获取共享锁(针对共享锁) */ public void lockShared(){ jarryAQS.lockShared(); } /** * 解锁(针对共享锁) */ public boolean unLockShared(){ return jarryAQS.unLockShared(); } /** * 尝试获取共享锁(针对共享锁) * @param acquires * @return */ public boolean tryLockShared(int acquires){ return tryLockShared(acquires); } /** * 尝试解锁(针对共享锁) * @param releases * @return */ public boolean tryUnLockShared(int releases){ return jarryAQS.tryUnLockShared(releases); } @Override public Lock readLock() { return new Lock() { @Override public void lock() { jarryAQS.lockShared(); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return jarryAQS.tryLockShared(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void unlock() { jarryAQS.unLockShared(); } @Override public Condition newCondition() { return null; } }; } @Override public Lock writeLock() { return new Lock() { @Override public void lock() { jarryAQS.lock(); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return jarryAQS.tryLock(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void unlock() { jarryAQS.unlock(); } @Override public Condition newCondition() { return null; } }; } } 六,总结:J.U.C剖析与解读2(AQS的由来) (8)
内容版权声明:除非注明,否则皆为本站原创文章。