原《第10章 可靠消息最终一致性》
理论
项目背景
前几年,社会上逐渐兴起了一种叫做C2C
同城快递的业务模式,也就是俗称的跑腿闪送
。比如,当某人需要送个东西给朋友石昊,而路人甲刚好也要去石昊所在的地方,那么他就可以接单跑腿。这样既满足了需求,又可以顺便挣点跑腿费。
其中有一个场景,就是用户可以通过钱包余额支付跑腿费。既然涉及到支付和结算,那么就必须保证结果是正确的,不过经过业务论证,在应用中允许出现短暂的数据不一致现象。也就是当负责跑腿的用户完成订单后,且在提现前的这一段时间之内,用户余额可能会出现不一致现象。例如,当用户完成一笔10元的跑腿订单的那一刻,可能余额并未实时增加10元,但只要当他提现时余额是正确的就行。
当时综合考虑过分布式事务
的几大解决方案,比较来比较去,得出如下结论。
虽然2PC/3PC功能上可以满足需求,但性能上比较拉胯,其实业务上根本不在乎是不是强一致性。2PC可以很好地应对传统的单体应用,但不适合高并发、高性能和高吞吐量要求的场景。
TCC对代码的侵入性太高,要改造的话需要花费大量时间。而且如果不合适,改回来同样要花费巨大代价,不适合作为通用解决方案。
基于消息队列的方式既能满足这种最终一致性的要求,而且性能上也有保障。因为是做自有产品的余额支付。
因此,最终的技术选型就是可靠消息最终一致性
的分布式事务解决方案。
另外,原框架使用的是Dubbo,但改造后使用了Spring Cloud Alibaba(V2021.0.1.0
)。
不过在正式开始动手写代码前,先明确整个运行流程及测试数据。
预先向MySQL中插入10位用户,每位用户有10000块钱的余额。这10位用户同时下单,用余额完成支付,所有订单的金额均为1元。
程序运行过程中,可以随时停止进程和停止
MQ
,然后再恢复运行。可以比对“最终”产生的数据结果,看看是否符合下面的公式。
未付款任务数
= 0已付款任务数
=支付数
=账单数
=交易数
=会计分录数
=所有用户已花费的余额
即使上述公式暂时不成立也没关系,因为数据的暂时不一致是可以容忍的,只要将
MQ
中所有的消息及MongoDB中所有积压的事务消息全部消费完后,数据最终是一致的就行。
可以部署多个消费者节点,然后重试上述步骤。
因为尚未改造为分布式任务调度,所以任务调度节点目前只有一个。
在分布式事务中讲过可靠消息最终一致性方案
,它的整个流程如下图所示。

它的整个执行过程如下。
业务逻辑的处理服务在事务提交前,先封装消息。
待处理完若干业务后,先存储消息,然后再通过消息队列发送。
消费者收到消息后,就启动相应的任务,并放到定时任务线程池中执行。
如果业务操作成功完成,就删除之前保存过的消息。
如果业务处理状态不明确(失败或超时),那么独立的消息系统会每隔一段指定的时间就把所有存储的消息都重发一遍,直到与消息相关的业务被成功执行为止。
因此,对于可靠消息最终一致性
方案来说,其真正的精髓和最主要的就是两个点。
保存并发送事务消息。
事务消息轮询。
环境准备
可以通过Docker搭建整个项目环境。
实验环境的MySQL数据库初始化脚本如下。
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for sys_accounting_voucher
-- ----------------------------
DROP TABLE IF EXISTS `sys_accounting_voucher`;
CREATE TABLE `sys_accounting_voucher` (
`guid` bigint(20) unsigned NOT NULL COMMENT '凭证编码',
`voucherno` varchar(64) NOT NULL COMMENT '原始凭证号,用商家支付订单号表示',
`requestno` bigint(20) unsigned NOT NULL COMMENT '请求号',
`entrytype` varchar(16) NOT NULL DEFAULT '' COMMENT '会计分录类型',
`origin` varchar(16) NOT NULL DEFAULT 'APP' COMMENT '来源系统 APP;WEB;SCAN;FACE;PACKAGE;POINT',
`payamount` decimal(10,2) NOT NULL DEFAULT '0.00' COMMENT '付款方支付金额',
`changeamount` decimal(10,2) NOT NULL DEFAULT '0.00' COMMENT '收款方变动金额',
`payeraccountno` varchar(32) NOT NULL COMMENT '付款方账号',
`receiveraccountno` varchar(32) NOT NULL COMMENT '收款方账号',
`income` decimal(10,2) NOT NULL DEFAULT '0.00' COMMENT '收入',
`cost` decimal(10,2) NOT NULL DEFAULT '0.00' COMMENT '成本',
`profit` decimal(10,2) NOT NULL DEFAULT '0.00' COMMENT '利润',
`createtime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`remark` varchar(128) NOT NULL DEFAULT '' COMMENT '备注',
PRIMARY KEY (`guid`),
UNIQUE KEY `sys_accounting_voucher_voucherno` (`voucherno`) USING HASH,
UNIQUE KEY `sys_accounting_voucher_requestno` (`requestno`) USING HASH
) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4 COMMENT='会计原始凭证表';
-- ----------------------------
-- Records of sys_accounting_voucher
-- ----------------------------
BEGIN;
COMMIT;
-- ----------------------------
-- Table structure for sys_payment_order
-- ----------------------------
DROP TABLE IF EXISTS `sys_payment_order`;
CREATE TABLE `sys_payment_order` (
`guid` bigint(20) unsigned NOT NULL COMMENT '支付订单唯一编码',
`userid` bigint(20) unsigned NOT NULL COMMENT '用户编码',
`taskid` bigint(20) unsigned NOT NULL COMMENT '订单(订单)编码',
`transno` varchar(64) NOT NULL DEFAULT '' COMMENT '支付交易流水号',
`outerno` varchar(64) NOT NULL DEFAULT '' COMMENT '支付交易订单号',
`title` varchar(128) NOT NULL DEFAULT '' COMMENT '支付标题',
`paytype` varchar(16) NOT NULL DEFAULT 'BALANCE' COMMENT '支付类别 0:余额;1:支付宝;2:微信;3:银行卡',
`payclass` varchar(16) NOT NULL DEFAULT '' COMMENT '支付类型 0:消费;1:充值;2:提现',
`paychannel` varchar(64) NOT NULL DEFAULT 'APP' COMMENT '支付渠道 0:APP;1:WEB;2:SCAN;3:FACE;4:PACKAGE;5:POINT',
`amount` decimal(10,2) NOT NULL DEFAULT '0.00' COMMENT '订单金额',
`period` int(4) NOT NULL DEFAULT '0' COMMENT '订单有效期(单位秒),默认0',
`reason` varchar(64) NOT NULL DEFAULT '' COMMENT '订单失败原因',
`status` varchar(16) NOT NULL DEFAULT 'SUCCESS' COMMENT '支付状态 CREATED;WAITING_PAYMENT;SUCCESS;TRADE_SUCCESS;FAILED;CANCELED;TIMEOUTED',
`createtime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '下单时间',
`updatetime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`remark` text COMMENT '成功时保存用于调起移动支付的JSON字符串',
PRIMARY KEY (`guid`),
UNIQUE KEY `sys_user_payment_order_outerno` (`outerno`) USING HASH
) ENGINE=MyISAM DEFAULT CHARSET=utf8mb4 COMMENT='支付订单表';
-- ----------------------------
-- Records of sys_payment_order
-- ----------------------------
BEGIN;
COMMIT;
-- ----------------------------
-- Table structure for sys_user_amount
-- ----------------------------
DROP TABLE IF EXISTS `sys_user_amount`;
CREATE TABLE `sys_user_amount` (
`guid` bigint(20) unsigned NOT NULL COMMENT '用户编码',
`balance` int(11) NOT NULL DEFAULT '0' COMMENT '账户余额',
`createtime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`updatetime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`guid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户余额账户表';
-- ----------------------------
-- Records of sys_user_amount
-- ----------------------------
BEGIN;
INSERT INTO `sys_user_amount` VALUES (1, 10000, '2022-11-05 17:02:15', '2023-02-12 21:48:55');
INSERT INTO `sys_user_amount` VALUES (2, 10000, '2022-11-05 17:02:56', '2023-02-12 21:48:57');
INSERT INTO `sys_user_amount` VALUES (3, 10000, '2022-11-05 17:03:06', '2023-02-12 21:48:59');
INSERT INTO `sys_user_amount` VALUES (4, 10000, '2022-11-05 17:03:10', '2023-02-12 21:49:01');
INSERT INTO `sys_user_amount` VALUES (5, 10000, '2022-11-05 17:03:15', '2023-02-12 21:49:03');
INSERT INTO `sys_user_amount` VALUES (6, 10000, '2022-11-05 17:03:19', '2023-02-12 21:49:05');
INSERT INTO `sys_user_amount` VALUES (7, 10000, '2022-11-05 17:03:23', '2023-02-12 21:49:08');
INSERT INTO `sys_user_amount` VALUES (8, 10000, '2022-11-05 17:03:27', '2023-02-12 21:49:09');
INSERT INTO `sys_user_amount` VALUES (9, 10000, '2022-11-05 17:03:31', '2023-02-12 21:49:11');
INSERT INTO `sys_user_amount` VALUES (10, 10000, '2022-11-05 17:03:36', '2023-02-12 21:49:13');
COMMIT;
-- ----------------------------
-- Table structure for sys_user_bill
-- ----------------------------
DROP TABLE IF EXISTS `sys_user_bill`;
CREATE TABLE `sys_user_bill` (
`guid` bigint(20) unsigned NOT NULL COMMENT '账单编码',
`taskid` bigint(20) unsigned NOT NULL COMMENT '订单编码',
`userid` bigint(20) unsigned NOT NULL COMMENT '用户编码',
`outerno` varchar(64) NOT NULL DEFAULT '' COMMENT '提现的商户订单号',
`billobject` tinyint(1) NOT NULL DEFAULT '0' COMMENT '账单对象 0:用户;1:商家;2:平台;3:第三方',
`objectid` bigint(20) unsigned NOT NULL COMMENT '账单对象编码',
`billtype` tinyint(1) NOT NULL DEFAULT '0' COMMENT '账单类别 0:发单;1:接单;2:充值;3:提现;4:退款',
`paytype` tinyint(1) NOT NULL DEFAULT '0' COMMENT '支付方式 0:余额;1:支付宝;2:微信支付;3:银行卡',
`money` int(11) NOT NULL DEFAULT '0' COMMENT '账单金额',
`subject` varchar(128) NOT NULL DEFAULT '' COMMENT '备注',
`benefitclass` tinyint(1) NOT NULL DEFAULT '0' COMMENT '优惠类型,0:无;1:积分;2:会员折扣;3:优惠券;4:红包;5:其他',
`benefitid` bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT '优惠实物编码',
`benefitmoney` int(11) NOT NULL DEFAULT '0' COMMENT '优惠额度',
`realitymoney` int(11) NOT NULL DEFAULT '0' COMMENT '实际支付金额',
`billtime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '账单时间',
PRIMARY KEY (`guid`),
KEY `sys_user_bill_union_key01` (`taskid`,`userid`,`billtype`) USING HASH,
KEY `sys_user_bill_billtime` (`userid`,`billtime`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户账单表';
-- ----------------------------
-- Records of sys_user_bill
-- ----------------------------
BEGIN;
COMMIT;
-- ----------------------------
-- Table structure for sys_user_task
-- ----------------------------
DROP TABLE IF EXISTS `sys_user_task`;
CREATE TABLE `sys_user_task` (
`guid` bigint(20) unsigned NOT NULL COMMENT '订单编码',
`userobject` tinyint(1) NOT NULL DEFAULT '0' COMMENT '发单用户类型 0:用户;1:商家;2:平台;3:第三方',
`userid` bigint(20) unsigned NOT NULL COMMENT '发单用户编码',
`acceptobject` tinyint(1) NOT NULL DEFAULT '0' COMMENT '接单用户类型 0:用户;1:商家;2:平台;3:第三方',
`accepterid` bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT '接单用户编码',
`province` varchar(64) NOT NULL DEFAULT '' COMMENT '省份',
`city` varchar(64) NOT NULL DEFAULT '' COMMENT '城市',
`citycode` varchar(16) NOT NULL DEFAULT '' COMMENT '城市编码',
`district` varchar(32) NOT NULL DEFAULT '' COMMENT '地区名称',
`zipcode` varchar(16) NOT NULL DEFAULT '' COMMENT '邮编',
`images` varchar(1024) NOT NULL DEFAULT '' COMMENT '订单图片',
`money` int(11) NOT NULL DEFAULT '0' COMMENT '酬金',
`descript` varchar(128) NOT NULL DEFAULT '' COMMENT '订单描述',
`closetime` int(11) NOT NULL DEFAULT '0' COMMENT '秒数,希望从发布订单开始到后多少秒完成',
`publishtime` timestamp NULL DEFAULT NULL COMMENT '发布时间',
`accepttime` timestamp NULL DEFAULT NULL COMMENT '接单时间',
`finishtime` timestamp NULL DEFAULT NULL COMMENT '接单者完成时间',
`modifiedtime` timestamp NULL DEFAULT NULL COMMENT '发单人修改时间',
`canceltime` timestamp NULL DEFAULT NULL COMMENT '取消时间',
`confirmtime` timestamp NULL DEFAULT NULL COMMENT '发单人确认完成时间',
`type` tinyint(1) NOT NULL DEFAULT '0' COMMENT '订单类别 0:普通订单;1:活动订单',
`status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '订单状态 0:未发布;1:等待接单;2:执行中;3:已完成待付款;4:已完成待评论;5:用户已取消;6:接单人已取消;7:订单超时;8:已删除',
`paystatus` tinyint(1) NOT NULL DEFAULT '0' COMMENT '订单支付状态 0:未支付;1:已支付',
`tradetype` tinyint(1) NOT NULL DEFAULT '0' COMMENT '交易类型 0:发单;1:接单;2:充值;3:提现;4:退款',
`paytype` tinyint(1) NOT NULL DEFAULT '0' COMMENT '支付方式 0:余额;1:支付宝;2:微信支付;3:银行卡',
PRIMARY KEY (`guid`) USING BTREE,
KEY `sys_user_task_confirmtime` (`confirmtime`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户订单表';
-- ----------------------------
-- Records of sys_user_task
-- ----------------------------
BEGIN;
COMMIT;
-- ----------------------------
-- Table structure for sys_user_trade
-- ----------------------------
DROP TABLE IF EXISTS `sys_user_trade`;
CREATE TABLE `sys_user_trade` (
`guid` bigint(20) unsigned NOT NULL COMMENT '交易编码',
`taskid` bigint(20) unsigned NOT NULL COMMENT '订单编码',
`userid` bigint(20) unsigned NOT NULL COMMENT '用户编码',
`tradeobject` tinyint(1) NOT NULL DEFAULT '0' COMMENT '交易对象,0:用户;1:商家;2:平台;3:第三方',
`objectid` bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT '交易对象编码',
`tradetype` tinyint(1) NOT NULL DEFAULT '0' COMMENT '交易类型 0:发单;1:接单;2:充值;3:提现;4:退款',
`paytype` tinyint(1) NOT NULL DEFAULT '0' COMMENT '支付方式 0:余额;1:支付宝;2:微信支付;3:银行卡',
`money` int(11) NOT NULL DEFAULT '0' COMMENT '交易金额',
`status` tinyint(1) NOT NULL DEFAULT '0' COMMENT '交易状态,0:未完成;1:已完成',
`remark` varchar(64) NOT NULL DEFAULT '' COMMENT '备注,保存商户提现订单号',
`publishtime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '发布时间',
`accepttime` timestamp NULL DEFAULT NULL COMMENT '接单时间',
`canceltime` timestamp NULL DEFAULT NULL COMMENT '取消时间',
`tradetime` timestamp NULL DEFAULT NULL COMMENT '交易完成时间',
PRIMARY KEY (`guid`),
KEY `sys_user_trade_union_key01` (`taskid`,`userid`,`objectid`,`tradetype`) USING HASH
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='交易记录表';
-- ----------------------------
-- Records of sys_user_trade
-- ----------------------------
BEGIN;
COMMIT;
SET FOREIGN_KEY_CHECKS = 1;
-- 查看测试结果
-- 未付款任务
SELECT COUNT(guid) AS 未付款任务数 FROM sys_user_task WHERE paystatus = 0;
-- 已付款任务
SELECT COUNT(guid) AS 已付款任务数 FROM sys_user_task WHERE paystatus = 1;
-- 已支付数
SELECT COUNT(guid) AS 已支付数 FROM sys_payment_order;
-- 账单数
SELECT COUNT(guid) AS 账单数 FROM sys_user_bill;
-- 交易数
SELECT COUNT(guid) AS 交易数 FROM sys_user_trade;
-- 会计分录数
SELECT COUNT(guid) AS 会计分录数 FROM sys_accounting_voucher;
-- 恢复数据脚本
DELETE FROM sys_payment_order;
DELETE FROM sys_user_task;
DELETE FROM sys_user_bill;
DELETE FROM sys_user_trade;
DELETE FROM sys_accounting_voucher;
UPDATE sys_user_amount SET balance = 10000;
保存并发送事务消息
对于跑腿应用,用户在发布任务时也就等同于创建了订单。所以,用户发布任务也就是整个应用的入口。
@Test
public void balancePayOne() {
try {
// 下单
long taskid = userTaskService.createTask(1L, "XXXX省", "YYYY市", "100", "ZZZZ区", "123456", "", 1, "余额支付测试", 600, 1, 0, 0, 0);
// 封装事务消息
TransactionMessage message = paymentOrderService.balancePay(taskid, 1L, 0, 1, "余额支付测试");
// TODO:省略中间若干步骤
// 发送消息
transactionMessageService.saveAndSendMessage(message);
} catch (Exception e) {
e.printStackTrace();
}
}
首先,调用
UserTaskService.createTask()
方法,得到分布式系统中的全局唯一ID。然后,调用
PaymentOrderService.balancePay()
方法,获得封装的事务消息TransactionMessage
。最后,调用
transactionMessageService.saveAndSendMessage()
方法,发送封装好的事务消息。
可以给执行这三步的方法增加一个@Transactional
注解。
当transactionMessageService.saveAndSendMessage()
方法将事务消息保存到MongoDB之后,就立即发送它。
@Transactional(rollbackFor = Exception.class)
public synchronized void saveAndSendMessage(TransactionMessage message) throws Exception {
boolean flag = false;
if (null == message) {
throw ......;
}
if (StringUtils.isBlank(message.getQueue())) {
throw ......;
}
// 发送中
message.setStatus(MessageStatusEnum.SENDING.name());
message.setIsdead(MessageDeadEnum.ACTIVE.name());
message.setCreatetime(DateUtil.parse(new Date()));
message.setSendtime(new Date());
// 重发次数
message.setRepeats(0);
try {
// 保存事务消息
flag = saveMessage(message);
if (!flag) {
return;
}
// 发送事务消息到MQ
sendMessage(message);
......
}
}
所有的消息都由TransactionMessageListener
负责监听并消费。
@JmsListener(destination = GlobalConstant.MESSAGE_QUEUE_CONSUMER_TRANSACTION_QUEUE, containerFactory="queueListener")
public void onPaymentMessage(ObjectMessage objectMessage) throws Exception {
TransactionMessage message = null;
PaymentOrder paymentOrder = null;
JSONObject params = null;
try {
// 强转为事务消息对象
message = (TransactionMessage) objectMessage.getObject();
// 事务类型的消息
if (message.getType().equalsIgnoreCase(MessageTypeEnum.TRANSACTION.name())) {
paymentOrder = JSON.parseObject(message.getContent(), new TypeReference<PaymentOrder>() {});
// 获得回调附加参数
params = JSONObject.parseObject(message.getParams());
// 丢到线程池中执行
TransactionTask transactionTask = new TransactionTask(message, paymentOrder, params);
transactionTask.setTransactionExecutor(transactionExecutor);
TransactionFixedThreadPool.addTask(transactionTask);
// 确认消息接收
objectMessage.acknowledge();
logger.info("收到事务消息,消息编码:{}", message.getGuid());
}
} catch (JMSException e) {
......
}
}
所有具体的工作都由TransactionExecutor
完成,而对TransactionExecutor.completeTransaction()
方法的调用,则由TransactionTask.run()
方法完成。
package cn.javabook.chapter10.executor;
import cn.javabook.chapter10.exception.AccountingProviderException;
import cn.javabook.chapter10.exception.AppListenerQueueException;
import cn.javabook.chapter10.service.TransactionMessageService;
import cn.javabook.chapter10.service.AccountingVoucherService;
import cn.javabook.cloud.core.constant.GlobalConstant;
import cn.javabook.cloud.core.enums.EntryTypeEnum;
import cn.javabook.cloud.core.enums.PayTypeEnum;
import cn.javabook.cloud.core.enums.TradeStatusEnum;
import cn.javabook.cloud.core.parent.BaseListener;
import cn.javabook.chapter10.entity.AccountingVoucher;
import cn.javabook.chapter10.entity.PaymentOrder;
import cn.javabook.chapter10.entity.TransactionMessage;
import cn.javabook.chapter10.service.PaymentOrderService;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import javax.annotation.Resource;
/**
* 完成支付订单的处理
*
*/
@Component
public class TransactionExecutor extends BaseListener {
private static final long serialVersionUID = 4998242490907751862L;
@Resource
private TransactionMessageService transactionMessageService;
@Resource
private PaymentOrderService paymentOrderService;
@Resource
private AccountingVoucherService accountingVoucherService;
/**
* 完成支付订单的处理
*
* @param message
* @param paymentOrder
* @param params
*/
public synchronized void completeTransaction(TransactionMessage message, PaymentOrder paymentOrder, JSONObject params) {
Integer payment_result = null;
Integer accounting_voucher_result = null;
PaymentOrder existPaymentOrder = null;
AccountingVoucher accountingVoucher = null;
boolean result_payment = false;
boolean result_accounting = false;
long messageid = message.getGuid();
try {
// 在缓存中查询支付订单保存状态
payment_result = (Integer) paymentOrderService.getSavedObject(GlobalConstant.RIDES_PAYMENT_ORDER_SAVED_PREFIX + paymentOrder.getOuterno());
// 缓存中未创建支付订单
if (null == payment_result || 0 == payment_result) {
// 在数据库中查询
existPaymentOrder = paymentOrderService.findPaymentOrderByOuterNo(paymentOrder.getOuterno());
// 幂等判断,已有则不创建
// 数据库中不存在
if (null == existPaymentOrder) {
// 实际完成支付订单的方法
result_payment = completeTransaction(paymentOrder, params);
} else {// 数据库中已存在
// 创建支付订单成功
result_payment = true;
// 保存支付订单的已存在状态
paymentOrderService.setSavedObject(GlobalConstant.RIDES_PAYMENT_ORDER_SAVED_PREFIX + paymentOrder.getOuterno(), "1", GlobalConstant.EXPIRE_MAX_TIMEOUT);
}
} else {
// 缓存中已创建支付订单
// 在数据库中查询,防止出现缓存中有而数据库中无的情况
existPaymentOrder = paymentOrderService.findPaymentOrderByOuterNo(paymentOrder.getOuterno());
// 幂等判断,已有则不创建
// 数据库中还不存在
if (null == existPaymentOrder) {
result_payment = completeTransaction(paymentOrder, params);
} else {// 缓存中和数据库中都已存在
// 创建支付订单成功
result_payment = true;
}
}
// 在缓存中查询
accounting_voucher_result = (Integer) accountingVoucherService.getSavedObject(GlobalConstant.RIDES_ACCOUNTINT_VOUCHER_SAVED_PREFIX + paymentOrder.getOuterno());
// 缓存中不存在会计分录
if (null == accounting_voucher_result || 0 == accounting_voucher_result) {
// 在数据库中查询
accountingVoucher = accountingVoucherService.queryAccountingVoucherByVoucherno(paymentOrder.getOuterno());
// 幂等判断,已有则不创建
if (null == accountingVoucher) {
// 创建会计分录
result_accounting = createAccountingVoucher(paymentOrder);
} else {
// 创建会计分录成功
result_accounting = true;
// 保存会计分录的已存在状态
accountingVoucherService.setSavedObject(GlobalConstant.RIDES_ACCOUNTINT_VOUCHER_SAVED_PREFIX + paymentOrder.getOuterno(), "1", GlobalConstant.EXPIRE_MAX_TIMEOUT);
}
} else {// 缓存中已存在会计分录
// 在数据库中查询
accountingVoucher = accountingVoucherService.queryAccountingVoucherByVoucherno(paymentOrder.getOuterno());
// 幂等判断,已有则不创建
if (null == accountingVoucher) {
// 创建会计分录
result_accounting = createAccountingVoucher(paymentOrder);
} else {
// 创建会计分录成功
result_accounting = true;
}
}
// 只有支付订单和会计分录同时成功完成,才能删除redis缓存和事务消息
if (result_payment && result_accounting) {
// 删除redis中保存的PaymentOrder支付订单缓存信息
paymentOrderService.removePaymentOrder(GlobalConstant.RIDES_PAYMENT_ORDER_SAVED_PREFIX + paymentOrder.getOuterno());
// 删除事务消息
transactionMessageService.removeMessageById(messageid, GlobalConstant.MONGO_TABLE_TRANSACTIONMESSAGE);
}
} catch (Exception e) {
logger.error("完成支付订单异常,消息编码:{}", messageid);
e.printStackTrace();
throw AppListenerQueueException.EXCEPTION_LISTENER_QUEUE_COMPLETE_PAYMENT_ORDER;
}
}
/**
* 完成支付订单的处理
*
* @param paymentOrder
* @param params
* @return
*/
public synchronized boolean completeTransaction(PaymentOrder paymentOrder, JSONObject params) {
boolean flag = false;
boolean result_payment = false;
try {
int result_code = paymentOrderService.completePaymentOrder(paymentOrder, params);
if (0 == result_code) {
// 创建支付订单记录,状态为交易完成
paymentOrder.setStatus(TradeStatusEnum.TRADE_SUCCESS.name());
flag = paymentOrderService.insertUserPaymentOrder(paymentOrder);
if (flag) {
// 创建支付订单成功
result_payment = true;
// 保存支付订单的已存在状态
paymentOrderService.setSavedObject(GlobalConstant.RIDES_PAYMENT_ORDER_SAVED_PREFIX + paymentOrder.getOuterno(), "1", GlobalConstant.EXPIRE_MAX_TIMEOUT);
// 余额
if (paymentOrder.getPaytype().equals(PayTypeEnum.BALANCE.name())) {
logger.info("余额支付订单成功,交易订单号:{},商户订单号:{}", paymentOrder.getTransno(), paymentOrder.getOuterno());
}
} else {
// 创建支付订单异常
result_payment = false;
logger.error("创建支付订单记录异常,交易订单号:{},商户订单号:{}", paymentOrder.getTransno(), paymentOrder.getOuterno());
//throw AppListenerQueueException.EXCEPTION_LISTENER_QUEUE_CREATE_PAYMENT_ORDER;
}
}
} catch (Exception e) {
// 创建支付订单异常
result_payment = false;
logger.error("创建支付订单记录异常,交易订单号:{},商户订单号:{}", paymentOrder.getTransno(), paymentOrder.getOuterno());
}
return result_payment;
}
/**
* 创建会计分录
*
* @param paymentOrder
* @return
*/
public synchronized boolean createAccountingVoucher(PaymentOrder paymentOrder) {
boolean flag = false;
boolean result_accounting = false;
AccountingVoucher accountingVoucher = sealAccountingVoucher(paymentOrder);
try {
flag = accountingVoucherService.createAccountingVoucher(accountingVoucher.getVoucherno(), accountingVoucher.getRequestno(), accountingVoucher.getEntrytype(),
accountingVoucher.getOrigin(), accountingVoucher.getPayamount(), accountingVoucher.getChangeamount(), accountingVoucher.getPayeraccountno(),
accountingVoucher.getReceiveraccountno(), accountingVoucher.getIncome(), accountingVoucher.getCost(), accountingVoucher.getProfit(), accountingVoucher.getRemark());
if (flag) {
// 创建会计分录成功
result_accounting = true;
// 保存会计分录的已存在状态
accountingVoucherService.setSavedObject(GlobalConstant.RIDES_ACCOUNTINT_VOUCHER_SAVED_PREFIX + paymentOrder.getOuterno(), "1", GlobalConstant.EXPIRE_MAX_TIMEOUT);
logger.info("创建会计分录成功,商户订单号:{}", paymentOrder.getOuterno());
} else {
// 创建会计分录异常
logger.error("创建会计分录异常,商户订单号:{}", paymentOrder.getOuterno());
}
} catch (Exception e) {
// 创建会计分录异常
result_accounting = false;
logger.error("创建会计分录异常,商户订单号:{}", paymentOrder.getOuterno());
throw AccountingProviderException.EXCEPTION_ACCOUNTING_CREATE;
}
return result_accounting;
}
/**
* 封装会计凭据
*
* @param paymentOrder
* @return
*/
private AccountingVoucher sealAccountingVoucher(PaymentOrder paymentOrder) {
AccountingVoucher accountingVoucher = new AccountingVoucher();
accountingVoucher.setVoucherno(paymentOrder.getOuterno());
accountingVoucher.setRequestno(System.currentTimeMillis());
accountingVoucher.setEntrytype(EntryTypeEnum.BALANCE_PAY.name());
// 来源系统
accountingVoucher.setOrigin(paymentOrder.getPaychannel());
accountingVoucher.setPayamount(paymentOrder.getAmount());
// 平台帐户变动金额
accountingVoucher.setChangeamount(0.00);
// 付款方账号
accountingVoucher.setPayeraccountno(String.valueOf(paymentOrder.getUserid()));
// 收款方账号
accountingVoucher.setReceiveraccountno(String.valueOf(0));
accountingVoucher.setIncome(0.00);
accountingVoucher.setCost(0.00);
accountingVoucher.setProfit(0.00);
accountingVoucher.setRemark(paymentOrder.getRemark());
return accountingVoucher;
}
}
TransactionExecutor
代码较多,但核心的就是completeTransaction(TransactionMessage message, PaymentOrder paymentOrder, JSONObject params)
方法。
这段代码主要就是做的幂等性判断与操作。实际上,在大多数高并发应用中,这种防御性
代码随处可见。
整个过程可以用一张简化过时序图来表示。

做到这一步,整个按顺序正常执行的业务逻辑流程就全部运行完了。不过这里面有两点还需要再强调一下。
由于可能会被调用多次,因此
TransactionExecutor.completeTransaction()
方法必须具有幂等性。即使同一条消息已经被消息队列确认消费了,但仍会至少再发一次。也就是说,在
可靠消息最终一致性
方案中,每个消息至少会被发送两次。
事务消息轮询
按照正常逻辑执行完毕之后,事务消息就会被保存到MongoDB,现在再来看看这一块是怎么处理的。
上面之所以说对于同一个事务消息,在正常情况下它要被发送至少两次,这是因为在发送消息之前,TransactionMessageService
就已经把消息保存到了MongoDB,之后将被轮询服务AppListenerScheduleExecutor
再次发送,而AppListenerScheduleExecutor
是被周期性计划任务线程池AppListenerScheduleConfiguration
调用的。
轮询服务AppListenerScheduleExecutor
只有一个方法,那就是处理活跃的消息和已经死亡的消息。
package cn.javabook.chapter10.executor;
import cn.javabook.chapter10.service.AppListenerScheduleService;
import cn.javabook.cloud.core.BaseObject;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 计划任务执行实现类
*
*/
@Component
public class AppListenerScheduleExecutor extends BaseObject {
private static final long serialVersionUID = 1070629478791251442L;
@Resource
private AppListenerScheduleService appListenerScheduleService;
private static int sending_active_counter = 0;
private static int sending_dead_counter = 0;
/**
* 启动延迟1秒,每60秒执行1次
*
* @throws Exception
*/
@Scheduled(initialDelay=1000, fixedDelay=60000)
public void executeOneMinute() throws Exception {
boolean done = false;
// 处理状态为“发送中”但超时没有被成功消费确认的消息
done = appListenerScheduleService.handleSendingAndActiveMessage();
if (done) {
logger.info("处理[ACTIVE]消息结束,已处理 {} 次", ++sending_active_counter);
}
// 处理状态为“发送中”但已标记为死亡的消息
done = appListenerScheduleService.handleSendingAndDeadMessage();
if (done) {
logger.info("处理[DEAD]消息结束,已处理 {} 次", ++sending_dead_counter);
}
}
}
AppListenerScheduleExecutor
只是负责完成定时执行的动作,具体执行的方法是通过调用AppListenerScheduleService
实现的。
handleSendingAndActiveMessage()
方法专门处理状态为发送中
但超时没有被确认的消息。handleSendingAndDeadMessage()
则是专门处理处理状态为发送中
但已标记为死亡的消息,也就是反复尝试发送超过一定次数后仍没有被确认的消息(即通常所说的死信队列
)。
handleSendingAndActiveMessage()
方法其实有两个:一个无参,一个有参,并且由前者来调用后者,实现多条事务消息的批量处理。
而且时间间隔的设定,也是是依据重发次数在AppListenerScheduleService
的静态代码块中实现的。
package cn.javabook.chapter10.service;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.Map;
import cn.javabook.chapter10.exception.AppListenerScheduleException;
import cn.javabook.cloud.core.constant.GlobalConstant;
import cn.javabook.cloud.core.enums.MessageDeadEnum;
import cn.javabook.cloud.core.enums.MessageStatusEnum;
import cn.javabook.cloud.core.parent.BaseException;
import cn.javabook.cloud.core.parent.BaseService;
import cn.javabook.cloud.core.utils.DateUtil;
import cn.javabook.chapter10.entity.TransactionMessage;
import org.springframework.stereotype.Service;
import com.google.common.collect.Maps;
import javax.annotation.Resource;
/**
* 消息定时器接口实现
*
*/
@Service
public class AppListenerScheduleService extends BaseService {
private static final long serialVersionUID = -8782414015511522102L;
private static final Map<Integer, Integer> notifyParam = Maps.newConcurrentMap();
@Resource
private TransactionMessageService transactionMessageService;
/**
* 初始化静态变量
*/
static {
// 第一次间隔1分钟后发送
notifyParam.put(1, GlobalConstant.MESSAGE_QUEUE_MESSAGE_SEND_1_TIME);
// 第二次间隔1分钟后发送
notifyParam.put(2, GlobalConstant.MESSAGE_QUEUE_MESSAGE_SEND_2_TIME);
// 第三次间隔2分钟后发送
notifyParam.put(3, GlobalConstant.MESSAGE_QUEUE_MESSAGE_SEND_3_TIME);
// 第四次间隔5分钟后发送
notifyParam.put(4, GlobalConstant.MESSAGE_QUEUE_MESSAGE_SEND_4_TIME);
// 第五次间隔10分钟后发送
notifyParam.put(5, GlobalConstant.MESSAGE_QUEUE_MESSAGE_SEND_5_TIME);
}
/**
* 从MongoDB中过滤出状态为「发送中」但超时没有被成功消费确认的消息
*
*/
public boolean handleSendingAndActiveMessage() throws Exception {
Map<String, Object> paramMap = null;
Map<String, TransactionMessage> messageMap = null;
try {
// 每页条数
int numPerPage = GlobalConstant.PAGE_DEFAULT_SIZE_PER_PAGE;
// 一次最多处理页数
int maxHandlePageCount = GlobalConstant.PAGE_DEFAULT_SIZE_PER_PAGE;
// 查询条件
paramMap = Maps.newConcurrentMap();
// 发送中的消息
paramMap.put("status", MessageStatusEnum.SENDING.name());
// 存活的消息
paramMap.put("isdead", MessageDeadEnum.ACTIVE.name());
// 8分钟前的SENDING消息
paramMap.put("sendTimeBefore", DateUtil.before(GlobalConstant.MESSAGE_QUEUE_MESSAGE_HANDLE_DURATION));
// 分页查询的排序方式,正向排序
paramMap.put("sortby", "ASC");
messageMap = getMessageMap(numPerPage, maxHandlePageCount, paramMap, true);
if (null == messageMap) {
return false;
} else {
if (0 >= messageMap.size()) {
return false;
}
}
return handleSendingAndActiveMessage(messageMap);
} catch (Exception e) {
logger.error("处理[ACTIVE]状态的消息异常:{}", e.getMessage());
throw AppListenerScheduleException.EXCEPTION_SCHEDULE_SENDING_ACTIVE;
}
}
/**
* 从MongoDB中过滤出状态为「发送中」但已标记为死亡的消息
*
*/
public boolean handleSendingAndDeadMessage() throws Exception {
Map<String, Object> paramMap = null;
Map<String, TransactionMessage> messageMap = null;
try {
// 每页条数
int numPerPage = GlobalConstant.PAGE_DEFAULT_SIZE_PER_PAGE;
// 一次最多处理页数
int maxHandlePageCount = GlobalConstant.PAGE_DEFAULT_SIZE_PER_PAGE;
// 查询条件
paramMap = Maps.newConcurrentMap();
// 发送中的消息
paramMap.put("status", MessageStatusEnum.SENDING.name());
// 死亡的消息
paramMap.put("isdead", MessageDeadEnum.DEAD.name());
// 8分钟前的SENDING消息
paramMap.put("sendTimeBefore", DateUtil.before(GlobalConstant.MESSAGE_QUEUE_MESSAGE_HANDLE_DURATION));
// 分页查询的排序方式,正向排序
paramMap.put("sortby", "ASC");
messageMap = getMessageMap(numPerPage, maxHandlePageCount, paramMap, true);
if (null == messageMap) {
return false;
} else {
if (0 >= messageMap.size()) {
return false;
}
}
return handleSendingAndDeadMessage(messageMap);
} catch (Exception e) {
logger.error("处理[DEAD]状态的消息异常:{}", e.getMessage());
throw AppListenerScheduleException.EXCEPTION_SCHEDULE_SENDING_DEAD;
}
}
/**
* 处理[ACTIVE]状态的消息
*
*/
private boolean handleSendingAndActiveMessage(Map<String, TransactionMessage> messageMap) {
logger.info("开始处理状态为[ACTIVE]的消息Map,共[" + messageMap.size() + "]条");
TransactionMessage message = null;
try {
// 单条消息处理
for (Map.Entry<String, TransactionMessage> entry : messageMap.entrySet()) {
message = entry.getValue();
try {
// 判断发送次数
logger.info("消息编码为[{}]的消息,已经重新发送了 {} 次", message.getGuid(), message.getRepeats() + 1);
// 如果超过最大发送次数直接退出
if (GlobalConstant.MESSAGE_QUEUE_MESSAGE_MAX_SEND_TIMES < message.getRepeats()) {
// 标记为死亡
transactionMessageService.setMessageToAreadlyDead(message.getGuid());
continue;
}
// 判断是否达到发送消息的时间间隔条件
int repeats = message.getRepeats();
int times = notifyParam.get(repeats == 0 ? 1 : repeats);
long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
long needTime = currentTimeInMillis - times * 60 * 1000;
long hasTime = message.getSendtime().getTime();
// 判断是否达到了可以再次发送的时间条件
if (hasTime > needTime) {
logger.info("当前时间:{},{}消息上次发送时间:{},{}分钟后才可以再发送", DateUtil.parse(new Date()), message.getGuid(), DateUtil.parse(message.getSendtime()), times);
continue;
}
// 重新发送消息
transactionMessageService.reSendMessage(message);
} catch (Exception e) {
logger.error("处理单条[ACTIVE]消息异常,消息编码:{}", message.getGuid());
throw AppListenerScheduleException.EXCEPTION_SCHEDULE_SENDING_ACTIVE;
}
}
return true;
} catch (Exception e) {
logger.error("处理状态为[ACTIVE]的消息Map异常:{}", e.getMessage());
throw AppListenerScheduleException.EXCEPTION_SCHEDULE_SENDING_ACTIVE;
}
}
/**
* 处理[DEAD]状态的消息
*
*/
private boolean handleSendingAndDeadMessage(Map<String, TransactionMessage> messageMap) {
logger.info("开始处理状态为[DEAD]的消息Map,共[" + messageMap.size() + "]条");
TransactionMessage message = null;
try {
// 单条消息处理
for (Map.Entry<String, TransactionMessage> entry : messageMap.entrySet()) {
message = entry.getValue();
try {
transactionMessageService.removeMessageById(message.getGuid(), GlobalConstant.MONGO_TABLE_TRANSACTIONMESSAGE);
logger.info("删除已死亡消息成功,消息编码:{}", message.getGuid());
} catch (Exception e) {
logger.error("处理单挑[DEAD]消息异常,消息编码:{}", message.getGuid());
throw AppListenerScheduleException.EXCEPTION_SCHEDULE_SENDING_DEAD;
}
}
return true;
} catch (Exception e) {
logger.error("处理状态为[DEAD]的消息Map异常:{}", e.getMessage());
throw AppListenerScheduleException.EXCEPTION_SCHEDULE_SENDING_DEAD;
}
}
/**
* 根据分页参数及查询条件批量获取消息数据
*
* @param numPerPage 每页记录数
* @param maxHandlePageCount 最多获取页数
* @param paramMap 查询参数
*/
private Map<String, TransactionMessage> getMessageMap(int numPerPage, int maxHandlePageCount, Map<String, Object> paramMap, boolean hastime) {
// 每次拿到的结果集
List<TransactionMessage> list = null;
// 转换成map
Map<String, TransactionMessage> messageMap = Maps.newConcurrentMap();
try {
// 获取分页数据集
String status = (String) paramMap.get("status");
String isdead = (String) paramMap.get("isdead");
if (hastime) {
String timeBefore = (String) paramMap.get("sendTimeBefore");
list = (List<TransactionMessage>) transactionMessageService.getMessageList(status, isdead, timeBefore, TransactionMessage.class);
} else {
list = (List<TransactionMessage>) transactionMessageService.getMessageList(status, isdead, TransactionMessage.class);
}
if (null == list) {
return null;
}
for (TransactionMessage message : list) {
messageMap.put(String.valueOf(message.getGuid()), message);
}
return messageMap;
} catch (Exception e) {
logger.error("批量查询消息数据Map异常:{}", e.getMessage());
throw BaseException.EXCEPTION_PUBLIC_DB_QUERY_FAILED;
}
}
}
handleSendingAndDeadMessage()
的逻辑和handleSendingAndActiveMessage()
类似,它也有两个方法:一个无数,一个有参。
这样,在重载的方法handleSendingAndActiveMessage(MessageMap)
中,消息会被重发。
......
transactionMessageService.reSendMessage(message);
......
而在重载的方法handleSendingAndDeadMessage(MessageMap)
中,消息会被删除。
......
transactionMessageService.removeMessageById(message.getGuid(), ......);
......
被重新发送的消息,会被TransactionMessageListener
监听并消费,继续执行正常的逻辑。
事务消息轮询的时序图如下。

发送并保存事务消息
和事务消息轮询
组成可靠消息最终一致性
解决方案的两个核心部分。
虽然看起来代码非常多,但真正核心的思想一句话就说完了:保存消息
-> 发送消息
-> 重发消息
-> 删除死亡的消息
。
感谢支持
更多内容,请移步《超级个体》。