Java并发(6)- CountDownLatch、Semaphore与AQS (3)

首先是Semaphore的构造方法,同ReentrantLock一样,他有两个构造方法,这样也是为了实现公平共享锁和非公平共享锁,大家可能有疑问,既然是共享锁,为什么还分公平和非公平的呢?这就回到了上面那个例子后面的疑问,前面有等待的线程时,后来的线程是否可以直接获取信号量,还是一定要排队。等待当然是公平的,插队就是非公平的。

还是用旋转寿司的例子来说:现在只有2个空位,已经有一家3口在等待,这时来了一对情侣,公平共享锁的实现就是这对情侣必须等待,只到一家3口上桌之后才轮到他们,而非公平共享锁的实现是可以让这对情况直接去吃,因为刚好有2个空位,让一家3口继续等待(好像是很不公平......),这种情况下非公平共享锁的好处就是可以最大化寿司店的利润(好像同时也得罪了等待的顾客......),也是Semaphore默认的实现方式。

public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }

Semaphore的例子中使用了两个核心方法acquire和release,分别调用了AQS中的acquireSharedInterruptibly和releaseShared方法:

//获取permits个信号量 public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } //释放permits个信号量 public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //尝试获取arg个信号量 doAcquireSharedInterruptibly(arg); //获取信号量失败时排队挂起 } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //尝试释放arg个信号量 doReleaseShared(); return true; } return false; }

Semaphore在获取和释放信号量的流程都是通过AQS来实现,具体怎么算获取成功或释放成功则由Semaphore本身实现。

//公平共享锁尝试获取acquires个信号量 protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) //前面是否有排队,有则返回获取失败 return -1; int available = getState(); //剩余的信号量(旋转寿司店剩余的座位) int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) // 剩余信号量不够,够的情况下尝试获取(旋转寿司店座位不够,或者同时来两对情况抢座位) return remaining; } } //非公平共享锁尝试获取acquires个信号量 final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); //剩余的信号量(旋转寿司店剩余的座位) int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) // 剩余信号量不够,够的情况下尝试获取(旋转寿司店座位不够,或者同时来两对情侣抢座位) return remaining; } }

可以看到公平共享锁和非公平共享锁的区别就在是否需要判断队列中是否有已经等待的线程。公平共享锁需要先判断,非公平共享锁直接插队,尽管前面已经有线程在等待。

为了验证这个结论,稍微修改下上面的示例:

threadA.start(); threadB.start(); Thread.sleep(1); threadD.start(); //threadD已经在排队 Thread.sleep(3500); threadC.start(); //3500毫秒后threadC来插队

结果输出:

threadB get 5 semaphore threadA get 4 semaphore threadB release 2 semaphore threadA release 1 semaphore threadC get 4 semaphore //threadC先与threadD获取到信号量 threadA release 1 semaphore threadB release 3 semaphore threadC release 4 semaphore threadA release 2 semaphore threadD get 10 semaphore threadD release 10 semaphore

这个示例很好的说明了当为非公平锁时会先尝试获取共享锁,然后才排队。

当获取信号量失败之后会去排队,排队这个操作通过AQS中的doAcquireSharedInterruptibly方法来实现:

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); //加入等待队列 boolean failed = true; try { for (;;) { final Node p = node.predecessor(); //获取当前节点的前置节点 if (p == head) { int r = tryAcquireShared(arg); //前置节点是头节点时,说明当前节点是第一个挂起的线程节点,再次尝试获取共享锁 if (r >= 0) { setHeadAndPropagate(node, r); //与ReentrantLock不同的地方:获取共享锁成功设置头节点,同时通知下一个节点 p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && //非头节点或者获取锁失败,检查节点状态,查看是否需要挂起线程 parkAndCheckInterrupt()) //挂起线程,当前线程阻塞在这里! throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zzspzz.html