具体做法是在本地事务中插入业务数据时,也插入一条消息数据。然后在做后续操作,如果其他操作成功,则删除该消息;如果失败则不删除,异步监听这个消息,不断重试。
本地消息表是一个很好的思路,可以有多种使用方式:
配合MQ示例伪代码:
messageTx := tc.NewTransaction("order") messageTxSql := tx.TryPlan("content") m,err := db.InsertTx(sql,messageTxSql) if err!=nil { return err } aErr := mq.Publish("B-Service-topic",m) if aErr!=nil { // 推送到 MQ 失败 messageTx.Confirm() // 更新消息的状态为 confirm }else { messageTx.Cancel() // 删除消息 } // 异步处理 confirm 的消息,继续推送 func OnMessage(task *Task){ err := mq.Publish("B-Service-topic", task.Value()) if err==nil { messageTx.Cancel() } }上面代码中其 messageTxSql 是插入本地消息表的一段 SQL :
insert into `tcc_async_task` (`uid`,`name`,`value`,`status`) values (\'?\',\'?\',\'?\',\'?\')它和业务 SQL 在同一个事务中去执行,要么成功,要么失败。
成功则推送到队列,推送成功,则调用 messageTx.Cancel() 删除本地消息;推送失败则标记消息为 confirm。本地消息表中 status 有 2 种状态 try、confirm, 无论哪种状态在 OnMessage 都可以监听到,从而发起重试。
本地事务保障消息和业务一定会写入数据库,此后的执行无论宕机还是网络推送失败,异步监听都可以进行后续处理,从而保障了消息一定会推到 MQ。
而 MQ 则保障一定会到达消费者服务中,利用 MQ 的 QOS 策略,消费者服务一定能处理,或继续投递到下一个业务队列中,从而保障了事务的完整性。
配合服务调用示例伪代码:
messageTx := tc.NewTransaction("order") messageTxSql := tx.TryPlan("content") body,err := db.InsertTx(sql,messageTxSql) if err!=nil { return err } aErr := request.POST("B-Service",body) if aErr!=nil { // 调用 B-Service 失败 messageTx.Confirm() // 更新消息的状态为 confirm }else { messageTx.Cancel() // 删除消息 } // 异步处理 confirm 或 try 的消息,继续调用 B-Service func OnMessage(task *Task){ // request.POST("B-Service",body) }这是本地消息表 + 调用其他服务的例子,没有 MQ 的引入。这种使用异步重试,并用本地消息表保障消息的可靠性,解决了阻塞式重试带来的问题,在日常开发中比较常见。
如果本地没有要写 DB 的操作,可以只写入本地消息表,同样在 OnMessage中处理:
messageTx := tc.NewTransaction("order") messageTx := tx.Try("content") aErr := request.POST("B-Service",body) // .... 消息过期配置本地消息表的 Try 和 Confirm 消息的处理器:
TCC.SetTryHandler(OnTryMessage()) TCC.SetConfirmHandler(OnConfirmMessage())在消息处理函数中要判断当前消息任务是否存在过久,比如一直重试了一小时,还是失败,就考虑发邮件、短信、日志告警等方式,让人工介入。
func OnConfirmMessage(task *tcc.Task) { if time.Now().Sub(task.CreatedAt) > time.Hour { err := task.Cancel() // 删除该消息,停止重试。 // doSomeThing() 告警,人工介入 return } }在 Try 处理函数中,还要单独判断当前消息任务是否存在过短,因为 Try状态的消息,可能才刚刚创建,还没被确认提交或删除。这会和正常业务逻辑的执行重复,意味着成功的调用,也会被重试;为尽量避免这种情况,可以检测消息的创建时间是否很短,短的话可以跳过。
重试机制必然依赖下游 API 在业务逻辑上的幂等性,虽然不处理也可行,但设计上还是要尽量避免干扰正常的请求。
独立消息服务独立消息服务是本地消息表的升级版,把本地消息表抽离成一个独立的服务。所有操作之前先在消息服务添加个消息,后续操作成功则删除消息,失败则提交确认消息。
然后用异步逻辑去监听消息,做对应的处理,和本地消息表的处理逻辑基本一致。但由于向消息服务添加消息,无法和本地操作放到一个事务里,所以会存在添加消息成功,后续失败,则此时的消息就是个无用消息。
如下示例场景:
err := request.POST("Message-Service",body) if err!=nil { return err } aErr := request.POST("B-Service",body) if aErr!=nil { return aErr }