那么就来进行改造吧。这里直接采用James大佬的最终版CommonMask-JameAQS了。 这里采用自己的AQS,因为自己的AQS有一些关键注解。
四,简易JUC(版本四): 1.JarryAQS: package tech.jarry.learning.netease.locks6; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.LockSupport; /** * @Description: * @Author: jarry */ public class JarryAQS { volatile AtomicInteger readCount = new AtomicInteger(0); AtomicInteger writeCount = new AtomicInteger(0); AtomicReference<Thread> owner = new AtomicReference<>(); public volatile LinkedBlockingQueue<WaitNode> waiters = new LinkedBlockingQueue<>(); class WaitNode{ Thread thread = null; // 表示希望争取的锁的类型。0表示写锁(独占锁),1表示读锁(共享锁) int type = 0; int arg = 0; public WaitNode(Thread thread, int type, int arg) { this.type = type; this.thread = thread; this.arg = arg; } } /** * 获取独占锁(针对独占锁) */ public void lock(){ int arg = 1; if (!tryLock(arg)){ WaitNode waitNode = new WaitNode(Thread.currentThread(), 0, arg); waiters.offer(waitNode); while (true){ WaitNode headNote = waiters.peek(); if (headNote !=null && headNote.thread == Thread.currentThread()){ if (!tryLock(headNote.arg)){ LockSupport.park(); } else { waiters.poll(); return; } }else { LockSupport.park(); } } } } /** * 解锁(针对独占锁) */ public void unlock(){ int arg = 1; if (tryUnlock(arg)){ WaitNode head = waiters.peek(); if (head == null){ return; } LockSupport.unpark(head.thread); } } /** * 获取共享锁(针对共享锁) */ public void lockShared(){ int arg = 1; if (!tryLockShared(arg)){ WaitNode waitNode = new WaitNode(Thread.currentThread(),1,arg); waiters.offer(waitNode); while (true){ WaitNode head = waiters.peek(); if (head != null && head.thread == Thread.currentThread()){ if (tryLockShared(head.arg)){ waiters.poll(); WaitNode newHead = waiters.peek(); if (newHead != null && newHead.type == 1){ LockSupport.unpark(newHead.thread); } return; } else { LockSupport.park(); } } else { LockSupport.park(); } } } } /** * 解锁(针对共享锁) */ public boolean unLockShared(){ int arg = 1; if (tryUnLockShared(arg)){ WaitNode head = waiters.peek(); if (head != null){ LockSupport.unpark(head.thread); } return true; } return false; } /** * 尝试获取独占锁(针对独占锁) * @param acquires * @return */ public boolean tryLock(int acquires){ throw new UnsupportedOperationException(); } /** * 尝试解锁(针对独占锁) * @param releases 用于设定解锁次数。一般传入waitNode.arg * @return */ public boolean tryUnlock(int releases){ throw new UnsupportedOperationException(); } /** * 尝试获取共享锁(针对共享锁) * @param acquires * @return */ public boolean tryLockShared(int acquires){ throw new UnsupportedOperationException(); } /** * 尝试解锁(针对共享锁) * @param releases * @return */ public boolean tryUnLockShared(int releases){ throw new UnsupportedOperationException(); } } 2.JarryReentrantLock: package tech.jarry.learning.netease.locks6; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * @Description: 仿ReentrantLock,实现其基本功能及特性 * @Author: jarry */ public class JarryReentrantLock implements Lock { private boolean isFair; // 默认采用非公平锁,保证效率(就是参照源码) public JarryReentrantLock() { this.isFair = false; } public JarryReentrantLock(boolean isFair) { this.isFair = isFair; } private JarryAQS jarryAQS = new JarryAQS(){ @Override // 源码中,则是将FairSync与NonFairSync作为两个单独内布类(extend Sync),来实现的。那样会更加优雅,耦合度更低,扩展性更好(而且实际源码,需要重写的部分也会更多,而不像这个自定义demo,只有一个tryLock方法需要重写) public boolean tryLock(int acquires){ if (isFair){ return tryFairLock(acquires); } else { return tryNonFairLock(acquires); } } private boolean tryFairLock(int acquires){ // 这里简单注释一下,如何实现公平锁,其关键在于新的线程到来时,不再直接尝试获取锁,而是直接塞入队列(队列为空,也是殊途同归的) // 1.判断读锁(共享锁)是否被占用 if (readCount.get() == 0){ // 2.判断写锁(独占锁)是否被占用 int writeCountValue = writeCount.get(); if (writeCountValue == 0){ // 2.1 (核心区别)如果写锁未被占用,需要先对等待队列waiters进行判断 WaitNode head = waiters.peek(); if (head !=null && head.thread == Thread.currentThread()){ if (writeCount.compareAndSet(writeCountValue,writeCountValue+acquires)){ owner.set(Thread.currentThread()); return true; } // 竞争失败就直接返回false了 } } else { // 2.2 如果写锁已经被占用了,就判断是否为当前线程持有,是否进行重入操作 if (owner.get() == Thread.currentThread()){ // 如果持有独占锁的线程就是当前线程,那么不需要改变owner,也不需要CAS,只需要修改writeCount的值即可 writeCount.set(writeCountValue + acquires); return true; } } } // 以上操作失败,就返回false,表示竞争锁失败 return false; } private boolean tryNonFairLock(int acquires){ if (readCount.get() == 0){ int writeCountValue = writeCount.get(); if (writeCountValue == 0){ if (writeCount.compareAndSet(writeCountValue,writeCountValue+acquires)){ owner.set(Thread.currentThread()); return true; } } else { if (Thread.currentThread() == owner.get()){ writeCount.set(writeCountValue+acquires); return true; } } } return false; } @Override /** * 先通过临时变量c,判断是否接下来的操作会完全解锁。 如果完全解锁,先释放owner,再通过setState将count(源码中为state)修改为0。 这样调换了一下顺序,但是避免了owner的原子性问题(毕竟别的线程是通过state来判断是否可以竞争锁,修改owner的)。 */ public boolean tryUnlock(int releases) { if (owner.get() != Thread.currentThread()){ throw new IllegalMonitorStateException(); } int writeCountNextValue = writeCount.get() - releases; boolean result = false; if (writeCountNextValue == 0){ result = true; owner.set(null); } writeCount.set(writeCountNextValue); return result; } // 其它诸如共享锁的相关操作,就不进行了。如果强行调用,只会发生UnsupportedOperationException }; @Override public void lock() { jarryAQS.lock(); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return jarryAQS.tryLock(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void unlock() { jarryAQS.unlock(); } @Override public Condition newCondition() { return null; } } 3.JarryReadWriteLock: package tech.jarry.learning.netease.locks6; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; /** * @Description: * @Author: jarry */ public class JarryReadWriteLock implements ReadWriteLock { private JarryAQS jarryAQS = new JarryAQS(){ @Override // 实际源码,是通过Sync类,继承AQS,再进行Override的。 public boolean tryLock(int acquires){ if (readCount.get() == 0){ int writeCountValue = writeCount.get(); if (writeCountValue == 0){ if (writeCount.compareAndSet(writeCountValue,writeCountValue+acquires)){ owner.set(Thread.currentThread()); return true; } } else { if (Thread.currentThread() == owner.get()){ writeCount.set(writeCountValue+acquires); return true; } } } return false; } @Override public boolean tryUnlock(int releases) { if (owner.get() != Thread.currentThread()){ throw new IllegalMonitorStateException(); } int writeCountNextValue = writeCount.get() - releases; boolean result = false; if (writeCountNextValue == 0){ result = true; owner.set(null); } writeCount.set(writeCountNextValue); return result; } @Override public boolean tryLockShared(int acquires) { while (true){ if (writeCount.get() == 0 || owner.get() == Thread.currentThread()){ int readCountValue = readCount.get(); if (readCount.compareAndSet(readCountValue, readCountValue+acquires)){ return true; } } return false; } } @Override public boolean tryUnLockShared(int releases) { while (true){ int readCountValue = readCount.get(); int readCountNext = readCountValue - releases; if (readCount.compareAndSet(readCountValue,readCountNext)){ return readCountNext == 0; } } } }; /** * 获取独占锁(针对独占锁) */ public void lock(){ jarryAQS.lock(); } /** * 解锁(针对独占锁) */ public void unlock(){ jarryAQS.unlock(); } /** * 尝试获取独占锁(针对独占锁) * @param acquires 用于加锁次数。一般传入waitNode.arg(本代码中就是1。为什么不用一个常量1,就不知道了?) * @return */ public boolean tryLock(int acquires){ return jarryAQS.tryLock(acquires); } /** * 尝试解锁(针对独占锁) * @param releases 用于设定解锁次数。一般传入waitNode.arg * @return */ public boolean tryUnlock(int releases){ return jarryAQS.tryUnlock(releases); } /** * 获取共享锁(针对共享锁) */ public void lockShared(){ jarryAQS.lockShared(); } /** * 解锁(针对共享锁) */ public boolean unLockShared(){ return jarryAQS.unLockShared(); } /** * 尝试获取共享锁(针对共享锁) * @param acquires * @return */ public boolean tryLockShared(int acquires){ return tryLockShared(acquires); } /** * 尝试解锁(针对共享锁) * @param releases * @return */ public boolean tryUnLockShared(int releases){ return jarryAQS.tryUnLockShared(releases); } @Override public Lock readLock() { return new Lock() { @Override public void lock() { jarryAQS.lockShared(); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return jarryAQS.tryLockShared(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void unlock() { jarryAQS.unLockShared(); } @Override public Condition newCondition() { return null; } }; } @Override public Lock writeLock() { return new Lock() { @Override public void lock() { jarryAQS.lock(); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return jarryAQS.tryLock(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void unlock() { jarryAQS.unlock(); } @Override public Condition newCondition() { return null; } }; } }J.U.C剖析与解读2(AQS的由来) (6)
内容版权声明:除非注明,否则皆为本站原创文章。