本系列文章主要讲解Java并发相关的内容,包括同步、锁、信号量、阻塞队列、线程池等,整体思维导图如下:
系列文章列表:
Java并发基础-并发模型、基础接口以及Thread
Java并发基础-同步和锁
Java并发基础-并发工具类(一)
本文主要以实例讲解Semaphore、阻塞队列等内容。
Semaphore 基本概念和用途Semaphore常称信号量,其维护了一个许可集,可以用来控制线程并发数。线程调用acquire()方法去或者许可证,然后执行相关任务,任务完成后,调用release()方法释放该许可证,让其他阻塞的线程可以运行。
Semaphore可以用于流量控制,尤其是一些公共资源有限的场景,比如数据库连接。假设我们上面的账户余额管理中的账户修改操作涉及到去更改mysql数据库,为了避免数据库并发太大,我们进行相关限制。
常用方法
Semaphore(int permits):构造方法,初始化许可证数量
void acquire():获取许可证
void release():释放许可证
int availablePermits() :返回此信号量中当前可用的许可证数。
int getQueueLength():返回正在等待获取许可证的线程数。
boolean hasQueuedThreads() :是否有线程正在等待获取许可证。
void reducePermits(int reduction) :减少reduction个许可证。是个protected方法。
Collection getQueuedThreads() :返回所有等待获取许可证的线程集合。是个protected方法。
虽然在代码中设置了20个线程去运行,但同时设置了许可证的数量为5,因而实际的最大并发数还是5。
package com.aidodoo.java.concurrent; import java.util.concurrent.*; /** * Created by zhangkh on 2018/9/9. */ public class SemaphoreDemo { public static void main(String[] args){ Semaphore semaphore=new Semaphore(5); ExecutorService executorService = Executors.newFixedThreadPool(20); Account account=new Account(); for(int i=0;i<20;i++){ SpenderWithSemaphore spender = new SpenderWithSemaphore(account, semaphore); executorService.submit(spender); } executorService.shutdown(); } } class SpenderWithSemaphore implements Runnable { private final Account account; private final Semaphore semaphore; public SpenderWithSemaphore(Account account, Semaphore semaphore) { this.account = account; this.semaphore = semaphore; } @Override public void run() { try{ semaphore.acquire(); System.out.println(String.format("%s get a premit at time %s,change and save data to mysql",Thread.currentThread().getName(),System.currentTimeMillis()/1000)); Thread.sleep(2000); }catch (InterruptedException e){ e.printStackTrace(); }finally { // System.out.println(String.format("%s release a premit",Thread.currentThread().getName())); semaphore.release(); } } }获取许可证后,模拟操作mysql,我们让线程睡眠2秒,程序输出如下:
pool-1-thread-2 get a premit at time 1536480858,change and save data to mysql pool-1-thread-5 get a premit at time 1536480858,change and save data to mysql pool-1-thread-3 get a premit at time 1536480858,change and save data to mysql pool-1-thread-4 get a premit at time 1536480858,change and save data to mysql pool-1-thread-1 get a premit at time 1536480858,change and save data to mysql pool-1-thread-8 get a premit at time 1536480860,change and save data to mysql pool-1-thread-7 get a premit at time 1536480860,change and save data to mysql pool-1-thread-6 get a premit at time 1536480860,change and save data to mysql pool-1-thread-9 get a premit at time 1536480860,change and save data to mysql pool-1-thread-10 get a premit at time 1536480860,change and save data to mysql pool-1-thread-11 get a premit at time 1536480862,change and save data to mysql pool-1-thread-13 get a premit at time 1536480862,change and save data to mysql pool-1-thread-12 get a premit at time 1536480862,change and save data to mysql pool-1-thread-14 get a premit at time 1536480862,change and save data to mysql pool-1-thread-15 get a premit at time 1536480862,change and save data to mysql pool-1-thread-16 get a premit at time 1536480864,change and save data to mysql pool-1-thread-17 get a premit at time 1536480864,change and save data to mysql pool-1-thread-19 get a premit at time 1536480864,change and save data to mysql pool-1-thread-18 get a premit at time 1536480864,change and save data to mysql pool-1-thread-20 get a premit at time 1536480864,change and save data to mysql可以看到前面5个线程同一时间1536480858获得许可证,然后执行操作,并不是20个线程一起操作,这样能降低对mysql数据库的影响。
如果把上面Semaphore的构造方法中的许可证数量改为20,大家可以看到20个线程的运行时间基本一致。
Semaphore实现直接基于AQS,有公平和非公平两种模式。公平模式即按照调用acquire()的顺序依次获得许可证,遵循FIFO(先进先出),非公平模式是抢占式的,谁先抢到先使用。
public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }