随着技术的快速发展,业务系统规模的不断扩大,分布式系统越来越普及。一个应用往往会部署到多台机器上,在一些业务场景中,为了保证数据的一致性,要求在同一时刻,同一任务只在一个节点上运行,保证同一个方法同一时刻只能被一个线程执行。这时候分布式锁就运用而生了。
分布式锁有很多的解决方案。常见的有:
基于数据库的:悲观锁,乐观锁。
基于zookeeper的分布式锁。
本章中讲的基于redis的分布式锁。
2. 超卖下单减库存是互联网项目中必不可少的环节。然而,如果我么考虑不得当,将会带来很多问题。比如最不能忍受的:超卖
如下代码,一个初始化库存的方法和一个购买图书的方法,我们没有做任何的并发处理,查看下最终结果。
package com.ldx.redisson.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.util.Objects; /** * redis 实现分布式锁 * * @author ludangxin * @date 2021/8/15 */ @Slf4j @RestController @RequestMapping("redis") public class RedisLockTestController { @Resource private StringRedisTemplate stringRedisTemplate; // 商品key private static final String KEY = "book"; // 库存数量 private static final Long STOCK = 50L; /** * 初始化 */ @GetMapping("init") public String init() { stringRedisTemplate.opsForValue().set(KEY, String.valueOf(STOCK)); return "初始化成功~"; } /** * 购买图书 */ @GetMapping("buy") public String buy() { // 获取到当前库存 String buyBefore = stringRedisTemplate.opsForValue().get(KEY); if(Objects.isNull(buyBefore)) { log.error("未找到\"{}\"的库存信息~", KEY); return "暂未上架~"; } long buyBeforeL = Long.parseLong(buyBefore); if(buyBeforeL > 0) { // 对库存进行-1操作 Long buyAfter = stringRedisTemplate.opsForValue().decrement(KEY); log.info("剩余图书==={}", buyAfter); return "购买成功~"; } else { log.info("库存不足~"); return "库存不足~"; } } }启动测试:
这里我们使用jemter来进行并发请求。配置如下:
线程组配置:
请求配置:
请求结果:
只复制了部分日志
通过日志很明显的看到,即使在业务代码中判断了库存 > 0但还是超卖了。
...... 2021-08-15 21:01:22.614 INFO 66913 --- [io-8080-exec-30] c.l.r.c.RedisLockTestController : 库存不足~ 2021-08-15 21:01:22.614 INFO 66913 --- [io-8080-exec-99] c.l.r.c.RedisLockTestController : 剩余图书===-42 2021-08-15 21:01:22.614 INFO 66913 --- [io-8080-exec-29] c.l.r.c.RedisLockTestController : 库存不足~ 2021-08-15 21:01:22.622 INFO 66913 --- [io-8080-exec-89] c.l.r.c.RedisLockTestController : 剩余图书===-40 2021-08-15 21:01:22.622 INFO 66913 --- [io-8080-exec-90] c.l.r.c.RedisLockTestController : 剩余图书===-35 2021-08-15 21:01:22.622 INFO 66913 --- [o-8080-exec-135] c.l.r.c.RedisLockTestController : 库存不足~ 2021-08-15 21:01:22.622 INFO 66913 --- [o-8080-exec-177] c.l.r.c.RedisLockTestController : 库存不足~ 2021-08-15 21:01:22.622 INFO 66913 --- [io-8080-exec-92] c.l.r.c.RedisLockTestController : 剩余图书===-34 2021-08-15 21:01:22.622 INFO 66913 --- [io-8080-exec-86] c.l.r.c.RedisLockTestController : 剩余图书===-37 2021-08-15 21:01:22.642 INFO 66913 --- [io-8080-exec-11] c.l.r.c.RedisLockTestController : 库存不足~ 2021-08-15 21:01:22.642 INFO 66913 --- [o-8080-exec-115] c.l.r.c.RedisLockTestController : 库存不足~ 2021-08-15 21:01:22.642 INFO 66913 --- [io-8080-exec-72] c.l.r.c.RedisLockTestController : 剩余图书===-33 2021-08-15 21:01:22.643 INFO 66913 --- [nio-8080-exec-3] c.l.r.c.RedisLockTestController : 库存不足~ 3. redis setnx主要是用redis的 setnx (set not exists)命令实现分布式锁。
3.1 编写逻辑在超买的场景中,我们了解了分布式锁的必要性。
上面的场景如果是单机的话,直接使用jvm锁就能解决问题,但是在分布式场景下下jvm锁无法处理。
接下来我们将使用redis命令来解决一下超卖问题。
新增了锁标识key。
在进行业务处理之前,给redis中setIfAbsent(LOCK_KEY, clientId, 30, TimeUnit.SECONDS)作为lock。
LOCK_KEY:锁的标识,比如秒杀的商品id_lock:当对该商品进行秒杀下单时,加锁使其线性执行。
clientId:当前请求的唯一值,为了在删除锁时进行锁判断。即只能删除自己加的锁。防止误删锁。