微服务痛点-基于Dubbo + Seata的分布式事务(TCC模式) (3)

微服务痛点-基于Dubbo + Seata的分布式事务(TCC模式)

什么样的情况会造成悬挂呢?按照前面所讲,在 RPC 调用时,先注册分支事务,再执行 RPC 调用,如果此时 RPC 调用的网络发生拥堵,通常 RPC 调用是有超时时间的,RPC 超时以后,发起方就会通知 TC 回滚该分布式事务,可能回滚完成后,RPC 请求才到达参与者,真正执行,从而造成悬挂。

怎么实现才能做到防悬挂呢?根据悬挂出现的条件先来分析下,悬挂是指二阶段 Cancel 执行完后,一阶段才执行。也就是说,为了避免悬挂,如果二阶段执行完成,那一阶段就不能再继续执行。因此,当一阶段执行时,需要先检查二阶段是否已经执行完成,如果已经执行,则一阶段不再执行;否则可以正常执行。那怎么检查二阶段是否已经执行呢?大家是否想到了刚才解决空回滚和幂等时用到的事务控制表,可以在二阶段执行时插入一条事务控制记录,状态为已回滚,这样当一阶段执行时,先读取该记录,如果记录存在,就认为二阶段已经执行;否则二阶段没执行。

Dubbo + Seata 实战案例

关于环境准备和目录结构,大家详见: 微服务痛点 - 基于Dubbo + Seata的分布式事务(AT)模式。

业务模型两阶段改造 Storage商品库存

数据库添加冻结商品库存数

# 创建商品库存表 create table if not exists storage.tcc_storage ( id bigint auto_increment primary key, commodity_code varchar(50) null comment '商品编码', name varchar(255) null comment '商品名称', count int null comment '商品库存数', frozen_count int default 0 null comment '冻结商品库存数' );

将原来的扣减商品库存一步逻辑修改成两阶段逻辑操作:

package cn.mushuwei.storage.api; import cn.mushuwei.storage.api.dto.CommodityDTO; import io.seata.rm.tcc.api.BusinessActionContext; import io.seata.rm.tcc.api.BusinessActionContextParameter; import io.seata.rm.tcc.api.LocalTCC; import io.seata.rm.tcc.api.TwoPhaseBusinessAction; /** * @author jamesmsw * @date 2020/12/1 9:37 上午 */ @LocalTCC public interface StorageApi { /** * 扣减库存准备 * * @param actionContext 业务动作上下文 * @param commodityDTO 库存信息 * @return 是/否 */ @TwoPhaseBusinessAction(name = "decreaseStorageTcc", commitMethod = "decreaseStorageCommit", rollbackMethod = "decreaseStorageCancel") Boolean decreaseStoragePrepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "commdityDTO") CommodityDTO commodityDTO); /** * 扣减库存提交 * * @param actionContext 业务动作上下文 * @return 是/否 */ Boolean decreaseStorageCommit(BusinessActionContext actionContext); /** * 扣减库存回滚 * * @param actionContext 业务动作上下文 * @return 是/否 */ Boolean decreaseStorageCancel(BusinessActionContext actionContext); }

数据持久化操作逻辑修改:

<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > <mapper namespace="cn.mushuwei.storage.dao.StorageDao"> <update> update tcc_storage set count = count - #{count}, frozen_count = frozen_count + #{count} where commodity_code = #{commodityCode} </update> <update> update tcc_storage set frozen_count = frozen_count - #{count} where commodity_code = #{commodityCode} </update> <update> update tcc_storage set count = count + #{count}, frozen_count = frozen_count - #{count} where commodity_code = #{commodityCode} </update> </mapper> Account用户

**数据库添加账号冻结余额: **

# 创建用户账户表 create table if not exists tcc_account ( id bigint auto_increment primary key, user_id varchar(50) null comment '用户编号', amount double(50,2) null comment '账号余额', frozen_amount double(50,2) default 0.00 null comment '账号冻结余额' );

将原来的扣减用户余额一步逻辑修改成两阶段逻辑操作:

package cn.mushuwei.account.api; import cn.mushuwei.account.api.dto.AccountDTO; import io.seata.rm.tcc.api.BusinessActionContext; import io.seata.rm.tcc.api.BusinessActionContextParameter; import io.seata.rm.tcc.api.LocalTCC; import io.seata.rm.tcc.api.TwoPhaseBusinessAction; /** * @author jamesmsw * @date 2020/12/1 5:20 下午 */ @LocalTCC public interface AccountApi { /** * 从账号扣钱准备 * * @param accountDTO * @param actionContext 业务动作上下文 * @return 是/否 */ @TwoPhaseBusinessAction(name = "decreaseAccountTcc", commitMethod = "decreaseAccountCommit", rollbackMethod = "decreaseAccountCancel") Boolean decreaseAccountPrepare(BusinessActionContext actionContext, @BusinessActionContextParameter(paramName = "accountDTO") AccountDTO accountDTO); /** * 从账号扣钱提交 * * @param actionContext * @return 是/否 */ Boolean decreaseAccountCommit(BusinessActionContext actionContext); /** * 从账号扣钱取消 * * @param actionContext * @return 是/否 */ Boolean decreaseAccountCancel(BusinessActionContext actionContext); }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpyzyx.html