并发包下常见的同步工具类详解(CountDownLatch,CyclicBarrier,Semaphore) (4)

如果学过操作系统的话,对信号量Semaphore应该不陌生。操作系统中的信号量是这么一个机构:它维护了一定数目的资源,进程向其请求资源将导致Semaphore中资源数量减少,当资源数量小于0时将会导致当前线程阻塞;而进程释放资源将导致Semaphore中资源数量增加,当资源数量大于0时会唤醒阻塞的进程。操作系统中使用信号量可以轻松实现进程间的互斥和同步。java在语言层面也支持信号量机制,其工作原理和操作系统中的信号量类似,可以通过调用

public void acquire(int permits)

或者public boolean tryAcquire(int permits)
请求信号量中的许可(资源)。不过后者在信号量中许可数量不够时不会阻塞而是立即返回一个失败结果。当然,也可以通过public void release()
向信号量归还资源。
信号量在创建时必须为其指定可以用的许可总数,如下所示

public Semaphore(int permits) { sync = new NonfairSync(permits); }

当创建信号量时指定许可总数为1,则可以起到独占锁的作用,不过它是不允许线程重入的。同时,它还有公平和非公平模式之分,通过在创建对象时传入参数进行指定

public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }

和ReentrentLock一样默认是非公平模式。

4.2 使用Semaphore进行最大并发数的控制

假设服务器上有一种资源可以同时供多个用户进行访问,出于系统稳定性考虑需要限制同时访问的用户的数量,整个过程可以模拟如下

/** * @author: takumiCX * @create: 2018-09-24 **/ public class SemaphoreTest { public static void main(String[] args) throws InterruptedException { //信号量控制并发数最多为3 Semaphore semaphore = new Semaphore(3); //同时开启10个线程 for(int i=1;i<=10;i++){ new Thread(new ReaderThread(semaphore,i)).start(); } } static class ReaderThread implements Runnable{ Semaphore semaphore; //用户序号 int userIndex; public ReaderThread(Semaphore semaphore, int userIndex) { this.semaphore = semaphore; this.userIndex = userIndex; } @Override public void run() { try { //获取许可 semaphore.acquire(1); //模拟访问资源所用的时间 TimeUnit.SECONDS.sleep(1); System.out.println("用户 "+userIndex+" 访问资源,时间:"+System.currentTimeMillis()); //释放许可 semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } }

使用信号量限制同时并发访问的线程数为3,然后开启10个线程模拟用户访问。得到的结果如下

并发包下常见的同步工具类详解(CountDownLatch,CyclicBarrier,Semaphore)

从结果上可以清晰的看到,每次最多允许3个用户同时访问资源,信号量很好的起到了限流作用。

4.3 Semaphore原理浅析

和CountDownLatch类似,Semaphore底层也是通过AQS的共享模式实现的。它和CountDownLatch的区别只是对于AQS共享模式的钩子方法tryAcquireShared()
和tryReleaseShared()
的实现不同。

以Semaphore的非公平模式为例,其尝试释放同步状态的逻辑如下

final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); //获取可用许可数 int remaining = available - acquires; //计算被消耗后剩余的许可数 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }

首先会获取当前可用的许可值(state),根据请求数量计算出剩余的许可值,若剩余许可数小于0则直接返回剩余值表示该操作失败;否则以CAS方式将state值更新为计算后的剩余值,并返回一个大于等于0的数表示成功。通过该方法的返回值可以知道尝试获取同步状态的操作是否成功,返回值小于0表示没有足够的许可,线程将会加入同步队列并等待;返回值大于等于0则表示许可足够,则整个获取许可的流程就结束了。

tryReleaseShared()的实现也很简单,

protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); //获取当前许可数 int next = current + releases; //计算释放后的许可总数 if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) //cas更新许可值 return true; } }

计算释放后的许可总数并以CAS方式对state值进行更新。之后将返回上层唤醒执行

doReleaseShared()

唤醒头结点后结点中的线程,被唤醒的线程将执行tryAcquireShared()重新尝试获取同步状态,获取失败则继续阻塞,获取成功将设置当前结点为队列头结点并继续唤醒后续结点中的线程。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zyzypg.html