移动网站 html5,工信部网站备案方法,天津电子商务网站,做网站还是移动开发高可用电商支付架构设计
在现代电商业务中#xff0c;支付过程是其中至关重要的一环#xff0c;一个高可用、安全稳定的支付架构不仅可以提高整个系统的可靠性和扩展性#xff0c;降低维护成本#xff0c;还可以优化用户体验#xff0c;增加用户黏性。
本文将提出一种高…高可用电商支付架构设计
在现代电商业务中支付过程是其中至关重要的一环一个高可用、安全稳定的支付架构不仅可以提高整个系统的可靠性和扩展性降低维护成本还可以优化用户体验增加用户黏性。
本文将提出一种高可用的电商支付架构设计方案并全程记录落地方法。 高可用电商支付架构设计一、前言1、电商支付一般流程2、本项目业务背景 二、订单服务1、数据库设计2、如何确保订单的幂等性3、未支付订单如何处理 三、支付服务1、支付接口的选择1微信支付2支付宝 2、支付流程1二维码生成2数据库设计 3、支付结果接收失败怎么处理 四、通知服务1、消息队列的选择2、消息队列使用方法3、如何确保消息的可靠性4、如何避免消息的重复消费 五、总结 一、前言
1、电商支付一般流程
传统意义上的支付过程是A向B下单商品C通过渠道D支付成功后B将商品交给A。
在电商背景下依旧大致沿用这一套流程只是会进行责任细化例如上述中的B会根据业务功能拆分成多个微服务。
每个微服务具有单一的功能不同微服务之间相互独立运行呈现一种高内聚、低耦合的状态。
电商支付大致可以划分成如下几个部分。
前端用户通过电脑或手机下订单。订单服务处理订单并进行库存锁定。支付服务处理支付请求和第三方支付平台进行交互。通知服务支付完成后通知用户和相关系统。外部支付提供商支付宝等第三方支付平台。
2、本项目业务背景
本项目依托于在线教育平台平台有在线支付的需求包括购买课程、购买学习资料、购买教师一对一服务等等这些购买的对象很明显涉及多个微服务所以我们需要根据业务功能对电商支付过程进行封装封装成多个微服务以便于代码复用。
封装成三个微服务
订单服务用户点击购买后请求发送到订单服务请求中携带参数包括商品类型、商品id、商品价格、支付渠道等等。支付服务订单服务需要根据用户选择的支付方式将请求发送到支付服务中支付服务与第三方的支付平台进行调用。通知服务在支付完成后需要将支付结果通知给用户支付结果的表现形式可以是文字提醒、跳转页面等等。
本项目基于SpringBoot进行开发默认已经完成了基础服务例如前端网关认证授权购买对象所属服务例如课程、资料、教师微服务。
二、订单服务
用户在前端进行下单选择商品规则、支付方式等等。 点击支付请求从前端转发到网关网关转发到订单服务经过认证授权后执行业务逻辑。
1、数据库设计
首先需要对提交的下单请求进行封装处理保存到数据库。
订单表中的字段包括订单号订单类型订单价格下单用户id订单支付状态创建时间订单描述第三方微服务唯一id等等。
第三方微服务唯一id是指下单业务涉及到的第三方微服务的唯一id例如本次下单的是课程服务的商品那在选课业务中就应当插入一张选课记录表表中包含字段唯一id这个id用于避免用户重复下单某件商品。 其中订单号是后台系统中该订单的唯一标识在并发分布式系统中如何生成唯一标识有多种方法。
数据库自增长序列或字段生成id优点代码简单成本小缺点强依赖DB数据库迁移时需要考虑DB版本支持问题只有一个主库可以生成时有单点故障的风险主从节点切换时可能导致重复发号难于扩展UUID优点简单性能高全球唯一方便数据迁移合并缺点16字节存储成本高信息不安全字符串查询效率低无序主键插入效率低Redis生成IdRedis生成Id效率比较高用5台Redis每个自增步长为5日期自增Id作为Id即可优点不依赖数据库Id有序缺点Redis单点故障影响可用性且代码和配置工作量大zookeeper生成Id用zookeeper的znode生成序列号缺点在高并发分布式性能差雪花算法snowflakeTwitter开源的分布式Id生成算法由41位时间戳10位机器Id12位毫秒内流水号1位符号位0组成优点稳定灵活递增缺点强依赖机器时钟机器时钟回拨会导致重复发号分布式不同机器时钟不可能完全同步导致不是全局递增
这里我们采用雪花算法对雪花算法的生成实现如下所示 public final class IdWorkerUtils {private static final Random RANDOM new Random();private static final long WORKER_ID_BITS 5L;private static final long DATACENTERIDBITS 5L;private static final long MAX_WORKER_ID ~(-1L WORKER_ID_BITS);private static final long MAX_DATACENTER_ID ~(-1L DATACENTERIDBITS);private static final long SEQUENCE_BITS 12L;private static final long WORKER_ID_SHIFT SEQUENCE_BITS;private static final long DATACENTER_ID_SHIFT SEQUENCE_BITS WORKER_ID_BITS;private static final long TIMESTAMP_LEFT_SHIFT SEQUENCE_BITS WORKER_ID_BITS DATACENTERIDBITS;private static final long SEQUENCE_MASK ~(-1L SEQUENCE_BITS);private static final IdWorkerUtils ID_WORKER_UTILS new IdWorkerUtils();private long workerId;private long datacenterId;private long idepoch;private long sequence 0;private long lastTimestamp -1L;/*** description 构造方法* param* return* author * date 2024/6/22 21:03*/private IdWorkerUtils() {this(RANDOM.nextInt((int) MAX_WORKER_ID), RANDOM.nextInt((int) MAX_DATACENTER_ID), 1288834974657L);}private IdWorkerUtils(final long workerId, final long datacenterId, final long idepoch) {if (workerId MAX_WORKER_ID || workerId 0) {throw new IllegalArgumentException(String.format(worker Id cant be greater than %d or less than 0, MAX_WORKER_ID));}if (datacenterId MAX_DATACENTER_ID || datacenterId 0) {throw new IllegalArgumentException(String.format(datacenter Id cant be greater than %d or less than 0, MAX_DATACENTER_ID));}this.workerId workerId;this.datacenterId datacenterId;this.idepoch idepoch;}/*** description 返回雪花算法生成器* param* return* author * date 2024/6/22 21:03*/public static IdWorkerUtils getInstance() {return ID_WORKER_UTILS;}/*** description 生成id* param* return* author * date 2024/6/22 21:03*/public synchronized long nextId() {long timestamp timeGen();if (timestamp lastTimestamp) {throw new RuntimeException(String.format(Clock moved backwards. Refusing to generate id for %d milliseconds, lastTimestamp - timestamp));}if (lastTimestamp timestamp) {sequence (sequence 1) SEQUENCE_MASK;if (sequence 0) {timestamp tilNextMillis(lastTimestamp);}} else {sequence 0L;}lastTimestamp timestamp;return ((timestamp - idepoch) TIMESTAMP_LEFT_SHIFT)| (datacenterId DATACENTER_ID_SHIFT)| (workerId WORKER_ID_SHIFT) | sequence;}/*** description 如果机器时钟回调自旋直到时钟板正* param* return* author * date 2024/6/22 21:13*/private long tilNextMillis(final long lastTimestamp) {long timestamp timeGen();while (timestamp lastTimestamp) {timestamp timeGen();}return timestamp;}/*** description 得到机器时钟* param* return* author * date 2024/6/22 21:13*/private long timeGen() {return System.currentTimeMillis();}
}
在订单表中我们对商品详情的处理是序列化的方式一个订单可以包括多个商品如果后续要对商品进行处理例如查找所有购买某类商品的用户信息那就需要更细化的订单商品表。
创建订单商品表表中包括字段商品id订单id除此以外还需要包括商品类型、商品名称、商品价格等等。 订单业务还涉及到支付问题在高并发分布式场景下应当在用户下订单后及时的将支付信息保存到数据库中否则会面临重复支付、遗失支付信息的问题所以在这一节一同设计支付表。
支付记录表是订单业务与支付业务相关联的表需要包括字段订单id支付id支付交易id第三方支付渠道以及支付id订单总金额下单用户id支付状态支付时间创建时间。
支付id是这条记录的id而支付交易id是与第三方支付平台交互的id。 2、如何确保订单的幂等性
在保存订单信息到数据库之前根据订单表中的用户id和第三方微服务唯一id进行查询如果可以查到对应记录就说明该用户已经下单过直接返回。
需要注意的是支付记录表无需进行幂等性判定因为如果支付失败或者用户退出支付页面则支付交易号失效这里的失效是指无法重新使用该交易号与第三方支付平台进行交互需要重新生成一个交易号但是不能直接修改这条记录应该作为一条支付失败的记录进行保存。
3、未支付订单如何处理
一个订单在下单后迟迟没有支付那我们就需要考虑取消该订单这个时间一般设置为30min或者1h可以采用延时队列实现该功能。
在创建队列或者发送消息时添加超时时间如果超过这个时间就将消息投递到队列绑定的死信交换机中由监听死信队列的消费者执行处理业务。
//创建消息Message message MessageBuiider.withBody(msg).setExpiration(1800000).build();//消息idCorrelationData correlationData new correlationData(id);//发送消息rabbitTemplate,convertAndsend(ttl.direct,ttl,message,correlationData);
三、支付服务
业务背景是pc端从用户的角度考虑在用户点击下单后需要展示一个二维码用户手机扫码跳转到支付页面进行支付支付完成后pc端进行相应的提示。
一般市面常用的第三方支付平台是微信支付和支付宝需要根据用户选择支付类型执行对应支付流程这个很容易实现Spring提供了根据String类型的Name依赖注入 Resource(nameweixin_pay)
private Pay WeixinPay;//Object pay applicationContext.getBean(weixin_pay);
1、支付接口的选择
注意下面代码中涉及的appid等信息均需要替换成自己申请的信息仅做演示使用。
1微信支付
微信支付提供了若干种支付方式 其中
JSAPI支付是指商户通过调用微信支付提供的JSAPI接口在支付场景中调起微信支付模块完成收款。
Native支付是指商户系统按微信支付协议生成支付二维码用户再用微信“扫一扫”完成支付的模式。
这两个方式比较适合pc端支付场景两个支付流程大同小异我们选择JSAPI进行详细解释。
微信支付-JSAPI开发文档
在使用之前需要再微信支付平台注册获得AppID、商户号并设置回调域名需要在公网备案 步骤1-6用户在前端点击提交订单后台发送请求到微信支付生成预付订单得到预付订单标识
步骤7-11后台在微信浏览器内通过JSAPI调起支付API调起微信支付发起支付请求。
步骤12-18用户支付成功后商户可接收到微信支付支付结果通知支付结果通知API。
步骤19-22后台在没有接收到微信支付结果通知的情况下需要主动调用查询订单API查询支付结果。
从上述步骤分析中可以看到在支付服务中需要提供JSAPI下单的接口在传参中需要传入一个异步通知的notify_url这个url对应后台的一个接口如果未收到支付结果通知需要向微信支付查询支付结果。
所以一共需要提供三个接口分别是JSAPI下单的接口支付结果异步通知的接口查询支付结果的接口
JSAPI下单
//请求URLHttpPost httpPost new HttpPost(https://api.mch.weixin.qq.com/v3/pay/transactions/jsapi);// 请求body参数String reqdata { \amount\: { \total\: 100, \currency\: \CNY\ }, \mchid\: \1900006891\, \description\: \Image形象店-深圳腾大-QQ公仔\, \notify_url\: \https://www.weixin.qq.com/wxpay/pay.php\, \payer\: { \openid\: \o4GgauE1lgaPsLabrYvqhVg7O8yA\ }, \out_trade_no\: \1217752501201407033233388881\, \goods_tag\: \WXG\, \appid\: \wxdace645e0bc2c424\ }; StringEntity entity new StringEntity(reqdata,utf-8);entity.setContentType(application/json);httpPost.setEntity(entity);httpPost.setHeader(Accept, application/json);//完成签名并执行请求CloseableHttpResponse response httpClient.execute(httpPost);try {int statusCode response.getStatusLine().getStatusCode();if (statusCode 200) {System.out.println(success,return body EntityUtils.toString(response.getEntity()));} else if (statusCode 204) {System.out.println(success);} else {System.out.println(failed,resp code statusCode ,return body EntityUtils.toString(response.getEntity()));throw new IOException(request failed);}} finally {response.close();httpClient.close();}
支付结果异步通知的接口
通过先对post的请求体进行解密然后反序列化。
微信支付在回调通知中对关键信息进行了AES-256-GCM加密密钥需要再微信支付控制台进行设置。
/** 商户号 */public static String merchantId ;/** 商户API私钥路径 */public static String privateKeyPath ;/** 商户证书序列号 */public static String merchantSerialNumber ;/** 商户APIV3密钥 */public static String apiV3Key ;public static void main(String[] args) {// 使用自动更新平台证书的RSA配置// 一个商户号只能初始化一个配置否则会因为重复的下载任务报错Config config new RSAAutoCertificateConfig.Builder().merchantId(merchantId).privateKeyFromPath(privateKeyPath).merchantSerialNumber(merchantSerialNumber).apiV3Key(apiV3Key).build();JsapiService service new JsapiService.Builder().config(config).build();// request.setXxx(val)设置所需参数具体参数可见Request定义PrepayRequest request new PrepayRequest();Amount amount new Amount();amount.setTotal(100);request.setAmount(amount);request.setAppid(wxa9d9651ae******);request.setMchid(190000****);request.setDescription(测试商品标题);request.setNotifyUrl(https://notify_url);request.setOutTradeNo(out_trade_no_001);Payer payer new Payer();payer.setOpenid(oLTPCuN5a-nBD4rAL_fa********);request.setPayer(payer);PrepayResponse response service.prepay(request);System.out.println(response.getPrepayId());}
查询支付结果
//请求URLURIBuilder uriBuilder new URIBuilder(https://api.mch.weixin.qq.com/v3/pay/transactions/id/4200000745202011093730578574);uriBuilder.setParameter(mchid, mchId);//完成签名并执行请求HttpGet httpGet new HttpGet(uriBuilder.build());httpGet.addHeader(Accept, application/json);CloseableHttpResponse response httpClient.execute(httpGet);try {int statusCode response.getStatusLine().getStatusCode();if (statusCode 200) {System.out.println(success,return body EntityUtils.toString(response.getEntity()));} else if (statusCode 204) {System.out.println(success);} else {System.out.println(failed,resp code statusCode ,return body EntityUtils.toString(response.getEntity()));throw new IOException(request failed);}} finally {response.close();}
2支付宝
支付宝提供了多种支付方式 电脑网站支付提供了两种支付场景分别是手机支付宝app扫二维码支付在pc端登录支付宝支付。根据我们的实际需求可以选择电脑网站支付。
其实手机网站支付和电脑网站支付的后台接口设计方式类似我们任选其一即可。 步骤1-6用户进行下单如果是在pc端登录支付宝需要输入用户名密码先登录如果是在pc端用手机app扫码那直接输入支付密码确认支付后台会得到一个同步返回的参数。
步骤7支付结果通过notify_url异步发送到后台接口。
步骤8支付宝提供了查询api通过这个api可以查询支付结果。
从上述步骤分析我们可以发现后台一共需要实现三个接口向支付宝发送请求的接口支付宝异步发送支付结果的接口向支付宝查询支付结果的接口。
支付宝-电脑网站支付开发文档
支付宝开发提供了在线的api调试支持在线以表格填写参数的方式生成请求以及沙箱app辅助开发。
向支付宝发送请求 public class AlipayTradePagePay {public static void main(String[] args) throws AlipayApiException {// 初始化SDKAlipayClient alipayClient new DefaultAlipayClient(getAlipayConfig());// 构造请求参数以调用接口AlipayTradePagePayRequest request new AlipayTradePagePayRequest();AlipayTradePagePayModel model new AlipayTradePagePayModel();// 设置商户门店编号model.setStoreId(NJ_001);// 设置订单绝对超时时间model.setTimeExpire(2016-12-31 10:05:01);// 设置业务扩展参数ExtendParams extendParams new ExtendParams();extendParams.setSysServiceProviderId(2088511833207846);extendParams.setHbFqSellerPercent(100);extendParams.setHbFqNum(3);extendParams.setIndustryRefluxInfo({\scene_code\:\metro_tradeorder\,\channel\:\xxxx\,\scene_data\:{\asset_name\:\ALIPAY\}});extendParams.setSpecifiedSellerName(XXX的跨境小铺);extendParams.setRoyaltyFreeze(true);extendParams.setCardType(S0JP0000);model.setExtendParams(extendParams);// 设置订单标题model.setSubject(Iphone6 16G);// 设置请求来源地址model.setRequestFromUrl(https://);// 设置产品码model.setProductCode(FAST_INSTANT_TRADE_PAY);// 设置PC扫码支付的方式model.setQrPayMode(1);// 设置商户自定义二维码宽度model.setQrcodeWidth(100L);// 设置请求后页面的集成方式model.setIntegrationType(PCWEB);// 设置订单包含的商品列表信息ListGoodsDetail goodsDetail new ArrayListGoodsDetail();GoodsDetail goodsDetail0 new GoodsDetail();goodsDetail0.setGoodsName(ipad);goodsDetail0.setAlipayGoodsId(20010001);goodsDetail0.setQuantity(1L);goodsDetail0.setPrice(2000);goodsDetail0.setGoodsId(apple-01);goodsDetail0.setGoodsCategory(34543238);goodsDetail0.setCategoriesTree(124868003|126232002|126252004);goodsDetail0.setShowUrl(http://www.alipay.com/xxx.jpg);goodsDetail.add(goodsDetail0);model.setGoodsDetail(goodsDetail);// 设置商户的原始订单号model.setMerchantOrderNo(20161008001);// 设置商户订单号model.setOutTradeNo(20150320010101001);// 设置订单总金额model.setTotalAmount(88.88);// 设置商户传入业务信息model.setBusinessParams({\mc_create_trade_ip\:\127.0.0.1\});// 设置优惠参数 model.setPromoParams({\storeIdType\:\1\});request.setBizModel(model);// 第三方代调用模式下请设置app_auth_token// request.putOtherTextParam(app_auth_token, -- 请填写应用授权令牌 --);AlipayTradePagePayResponse response alipayClient.pageExecute(request, POST);// 如果需要返回GET请求请使用// AlipayTradePagePayResponse response alipayClient.pageExecute(request, GET);String pageRedirectionData response.getBody();System.out.println(pageRedirectionData);if (response.isSuccess()) {System.out.println(调用成功);} else {System.out.println(调用失败);// sdk版本是4.38.0.ALL及以上,可以参考下面的示例获取诊断链接// String diagnosisUrl DiagnosisUtils.getDiagnosisUrl(response);// System.out.println(diagnosisUrl);}}private static AlipayConfig getAlipayConfig() {String privateKey -- 请填写您的应用私钥例如MIIEvQIBADANB ... ... --;String alipayPublicKey -- 请填写您的支付宝公钥例如MIIBIjANBg... --;AlipayConfig alipayConfig new AlipayConfig();alipayConfig.setServerUrl(https://openapi.alipay.com/gateway.do);alipayConfig.setAppId(-- 请填写您的AppId例如2019091767145019 --);alipayConfig.setPrivateKey(privateKey);alipayConfig.setFormat(json);alipayConfig.setAlipayPublicKey(alipayPublicKey);alipayConfig.setCharset(UTF-8);alipayConfig.setSignType(RSA2);return alipayConfig;}
}
支付宝异步发送支付结果以Post的方式发送请求在请求中以Map的方式携带参数需要解码后进行验签处理。
将签名参数sign使用 base64 解码为字节码串。
使用 RSA 的验签方法通过签名字符串、签名参数经过 base64 解码及支付宝公钥验证签名。
验签和解码部分的问题可以查看开发文档
支付宝支付开发常见问题汇总 Map String , String params new HashMap String , String ();Map requestParams request.getParameterMap();for(Iterator iter requestParams.keySet().iterator();iter.hasNext();){String name (String)iter.next();String[] values (String [])requestParams.get(name);String valueStr ;for(int i 0;i values.length;i ){valueStr (ivalues.length-1)?valueStr values [i]:valueStr values[i] ,;}//乱码解决这段代码在出现乱码时使用。 //valueStr new String(valueStr.getBytes(ISO-8859-1), utf-8); params.put (name,valueStr);}//切记alipaypublickey是支付宝的公钥请去open.alipay.com对应应用下查看。 //boolean AlipaySignature.rsaCheckV1(MapString, String params, String publicKey, String charset, String sign_type) boolean AlipaySignature.rsaCheckV1(paramsMap, ALIPAY_PUBLIC_KEY, CHARSET, SIGN_TYPE) //调用SDK验证签名
if(signVerified){// TODO 验签成功后按照支付结果异步通知中的描述对支付结果中的业务内容进行二次校验校验成功后在response中返回success并继续商户自身业务处理校验失败返回failure
}else{// TODO 验签失败则记录异常日志并在response中返回failure.
}
向支付宝查询支付结果
public static void main(String[] args) throws AlipayApiException {// 初始化SDKAlipayClient alipayClient new DefaultAlipayClient(getAlipayConfig());// 构造请求参数以调用接口AlipayTradeQueryRequest request new AlipayTradeQueryRequest();AlipayTradeQueryModel model new AlipayTradeQueryModel();// 设置订单支付时传入的商户订单号model.setOutTradeNo(20150320010101001);// 设置查询选项ListString queryOptions new ArrayListString();queryOptions.add(trade_settle_info);model.setQueryOptions(queryOptions);// 设置支付宝交易号model.setTradeNo(2014112611001004680 073956707);request.setBizModel(model);AlipayTradeQueryResponse response alipayClient.execute(request);System.out.println(response.getBody());if (response.isSuccess()) {System.out.println(调用成功);} else {System.out.println(调用失败);// sdk版本是4.38.0.ALL及以上,可以参考下面的示例获取诊断链接// String diagnosisUrl DiagnosisUtils.getDiagnosisUrl(response);// System.out.println(diagnosisUrl);}}
2、支付流程
按照业务背景用户在前端点击下单后应该在前端展示二维码用户用手机扫码后进行支付展示的二维码应当与用户选择的支付渠道有关。
无论是微信支付还是支付宝支付都有相似的逻辑就是由后台向第三方支付平台发送请求由第三方平台拉起客户端并同步响应给后台。用户输入密码支付后将支付结果异步发送给后台同时后台也可以向第三方支付平台查询支付结果。
支付流程分析到这里还有两个问题一个是用户扫描的二维码如何生成其中应该包含哪些信息一个是支付结果如何保存在上一章的数据库设计中如何交互
1二维码生成
用户选择商品与支付渠道后点击下单前端应当展示一个二维码并提示用户用第三方app扫码支付这个二维码应该包含一个接口扫码之后将请求发送给第三方平台第三方平台接收到支付请求后拉起客户端执行支付业务。
在支付服务中创建一个接口在这个接口中执行向第三方平台发送请求的业务逻辑
ApiOperation(扫码下单接口)GetMapping(/requestpay)public void requestpay(String payNo, HttpServletResponse httpResponse) throws IOException {//根据订单id——payNo查询是否有该订单//第三方支付平台}
而二维码中应该包含这个接口的url生成一张包含对应url信息的二维码这个实现技术有很多例如ZXing。 String qrCode new QRCodeUtil().createQRCode(http://www.xxxx.cn/api/orders/requestpay?payNo payNo, 200, 200);
得到的是一个base64的String的url返回前端展示用户扫码后请求发送到后台后台将支付请求发送到第三方支付平台在前端拉起第三方客户端进行支付业务逻辑。
2数据库设计
在接收到支付结果后需要更新数据库支付服务的数据库设计已经在上一章中进行了分析。 更新支付记录表更新字段支付状态第三方平台支付交易号支付结果通知时间 PayRecord payRecord new PayRecord();
payRecord.setStatus(000002);//支付成功
payRecord.setOutPayNo(payStatus.getTrade_no());//支付交易号
payRecord.setPaySuccessTime(LocalDateTime.now());//通知时间
int update payRecordMapper.update(payRecord, new LambdaQueryWrapperPayRecord().eq(PayRecord::getPayNo, payNo));
更新订单表更新字段支付状态支付完成时间 Orders orders ordersMapper.selectById(orderId);
orders.setStatus(100002);
orders.setPaySuccessTime(LocalDateTime.now());
int update ordersMapper.update(orders, new LambdaQueryWrapperOrders().eq(Orders::getId, orderId));
之后需要还需要进行消息通知这个放在下一章通知服务中讲。
3、支付结果接收失败怎么处理
如果由于网络通信或notify_url设置错误等原因异步通知支付结果失败那我们需要手动查询支付结果第三方支付平台都提供了查询api问题是我们该以哪种方式查询接口呢
从用户的角度出发自然希望支付完成后能及时的得到支付结果的通知所以在后台向第三方支付平台发起请求后向消息队列发送一条定时消息定时结束投递到死信交换机由监听私信交换机的服务查询数据库根据支付状态判断是否支付完成如果是待支付就根据支付id向第三方渠道查询支付结果根据查询结果成功或失败更新到数据库如果查询不到结果或者支付未完成就将定时消息重新投递到消息队列。
消息队列的设置在下一章讲。
四、通知服务
无论支付成功或失败都需要对用户进行通知在支付结果异步通知或者手动查询支付结果之后需要进行消息通知。
1、消息队列的选择
常用的消息队列组件一般有三个RocketMQ、Kafka、RabbitMQ这三个消息队列各有优缺点有自己的使用场景接下来分析三个组件各自使用场景选择适合我们业务场景的组件。
1、RocketMQ是阿里自研的国产消息队列它接受来自生产者的消息将消息分类每一类是一个 topic一个topic存放在不同broker下对应queue和tag一个broker有一个commitlog消费者根据需要订阅 topic获取里面的消息。
2、Kafka由Linkedin公司开发是一个分布式、支持分区的partition、多副本的replica基于 zookeeper 协调的分布式消息系统。
3、RabbitMQ是一个开源的消息代理软件它实现了高级消息队列协议AMQP用于在分布式系统之间进行可靠的异步通信。它可以在不同的应用程序、服务和系统之间传递消息并确保消息的可靠性和顺序性。
RocketMQ和 Kafka 相比RocketMQ 在架构上做了减法在功能上做了加法
RabbitMQ 是一个消息代理中间件支持推送拉取两种模式Kafka 是一个分布式流处理平台只支持拉取。 对于秒杀活动等吞吐量要求高的优先选Kafka和RabbitMQ对于要求强对外提供能力有很多主题介入的可以考虑千级的RocketMQ百万级的RabbitMQ金融业务要求稳定安全的分布式部署Kafka和RocketMQ
我们需要将支付结果通知给用户这要求了高时效、高可用、支持多主题提高扩展性便于其他业务使用所以我们选择RabbitMQ。
2、消息队列使用方法
首先在docker中启动RabbitMQ容器。
在父工程、或需要使用消息队列的微服务中添加RabbitMQ的依赖 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId
/dependency
在配置文件中添加RabbitMQ的初始化配置 spring:rabbitmq:host: xxxxxxxxxport: xxxxusername: xxxxpassword: xxxxvirtual-host: /publisher-confirm-type: correlated #correlated 异步回调定义ConfirmCallbackMQ返回结果时会回调这个ConfirmCallbackpublisher-returns: false #开启publish-return功能同样是基于callback机制需要定义ReturnCallbacktemplate:mandatory: false #定义消息路由失败时的策略。true则调用ReturnCallbackfalse则直接丢弃消息listener:simple:acknowledge-mode: none #出现异常时返回unack消息回滚到mq没有异常返回ack ,manual:手动控制,none:丢弃消息不回滚到mqretry:enabled: true #开启消费者失败重试initial-interval: 1000ms #初识的失败等待时长为1秒multiplier: 1 #失败的等待时长倍数下次等待时长 multiplier * last-intervalmax-attempts: 3 #最大重试次数stateless: true #true无状态false有状态。如果业务中包含事务这里改为false
我们需要在支付服务中发送消息让课程服务接收消息并进行处理交换机Fanout广播模式。
在支付服务中创建交换机 /*** author zkp15* version 1.0* description 消息队列配置* date 2023/10/4 22:25*/
Configuration
public class PayNotifyConfig {//交换机public static final String PAYNOTIFY_EXCHANGE_FANOUT paynotify_exchange_fanout;//支付结果通知消息类型public static final String MESSAGE_TYPE payresult_notify;//声明交换机Bean(PAYNOTIFY_EXCHANGE_FANOUT)public FanoutExchange paynotify_exchange_fanout() {// 三个参数交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false);}}
在课程服务中创建交换机和消息队列并声明绑定关系 /*** author zkp15* version 1.0* description 消息队列配置* date 2023/10/4 22:25*/
Configuration
public class PayNotifyConfig {//交换机public static final String PAYNOTIFY_EXCHANGE_FANOUT paynotify_exchange_fanout;//通知队列名称public static final String CHOOSECOURSE_PAYNOTIFY_QUEUE choosecourse_paynotify_queue;//支付结果通知消息类型public static final String MESSAGE_TYPE payresult_notify;//声明交换机Bean(PAYNOTIFY_EXCHANGE_FANOUT)public FanoutExchange paynotify_exchange_direct() {// 三个参数交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false);}//声明队列Bean(CHOOSECOURSE_PAYNOTIFY_QUEUE)public Queue course_publish_queue() {return QueueBuilder.durable(CHOOSECOURSE_PAYNOTIFY_QUEUE).build();}//声明交换机和队列绑定Beanpublic Binding binding_course_publish_queue(Qualifier(CHOOSECOURSE_PAYNOTIFY_QUEUE) Queue queue, Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) {return BindingBuilder.bind(queue).to(exchange);}}
接下来由支付服务发送消息到交换机课程服务监听消息队列得到消息进行相应处理。
支付服务发送消息
//根据支付结果更新数据库
......//向消息队列插入消息
rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT, , msg);
课程服务监听消息
RabbitListener(queues PayNotifyConfig.CHOOSECOURSE_PAYNOTIFY_QUEUE)public void receive(String message) {//获取消息MqMessage mqMessage JSON.parseObject(message, MqMessage.class);//消息类型String messageType mqMessage.getMessageType();//这里只处理支付结果通知if (PayNotifyConfig.MESSAGE_TYPE.equals(messageType)) {//获取选课记录idString choosecourseId mqMessage.getBusinessKey1();//添加选课boolean b myCourseTablesService.saveChooseCourseStauts(choosecourseId);if (b) {//向用户发送付费课程支付成功、选课成功的通知......}}}
3、如何确保消息的可靠性
如果消息没有通知到用户用户可能会担心自己是否支付成功自己的钱去哪里了。
所以要确保消息的可靠性消息丢失的原因有很多例如生产者发送消息到消息队列时消息丢失消息队列宕机了导致消息全部丢失消费者在消费消息时由于网络等原因没有消费成功但是消息队列没有收到通知就会把消息销毁等等。
RabbitMQ提供了生产者确认机制、消息持久化、消费者确认机制。
生产者确认机制生产者发送消息到消息队列后消息队列会响应一个结果给生产者confirm
try {Channel channel rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);channel.confirmSelect();channel.basicPublish(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT, , null, msg);channel.addConfirmListener(new ConfirmListener() {//消息失败处理Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {log.info(sendQueue-ack-confirm-fail);try {Thread.sleep(3000l);} catch (InterruptedException e) {throw new RuntimeException(e);}//重发channel.basicPublish(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT, , null, msg);}//消息成功处理Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {log.info(sendQueue-ack-confirm-successs);}});} catch (Exception e) {log.error(发送消息失败——e.printStackTrace());}
消息持久化消息队列将消息存储在硬盘中但是这同样会增大开销导致响应缓慢
我们在声明交换机和消息队列时可以选择持久化
//声明交换机Bean(PAYNOTIFY_EXCHANGE_FANOUT)public FanoutExchange paynotify_exchange_direct() {// 三个参数交换机名称、是否持久化、当没有queue与其绑定时是否自动删除return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false);}//声明队列Bean(CHOOSECOURSE_PAYNOTIFY_QUEUE)public Queue course_publish_queue() {return QueueBuilder.durable(CHOOSECOURSE_PAYNOTIFY_QUEUE).build();}// 消息持久化Message msg MessageBuilder.withBody(message.getBytes(Standardcharsets.UTF_8)) //消息内容.setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久代.build();
消费者确认机制消费者消费消息后可以选择向MQ发送ack消息MQ接收到ack后删除消息。
一般情况下MQ会自动发送ack消息为了确保消息不丢失我们可以在执行业务逻辑后手动发送ack消息。
try {// 业务逻辑......// deliveryTag消息的indextrue表示批量处理所有小于deliveryTag的消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception e) {log.error(消费消息确认失败e.printStackTrace());try {//手动确认回滚 拒绝deliveryTag对应的消息第二个参数是否requeuetrue则重新入队列否则丢弃或者进入死信队列。channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);} catch (IOException ex) {throw new RuntimeException(ex);}}
4、如何避免消息的重复消费
从用户的角度来看如果通知了两条一模一样的消息会感觉莫名其妙可能会怀疑自己充了两次钱所以有必要研究分析如何避免消息的重复消费。
在Kafka中同一个partition会维护一个offsetoffset表示消息被消费的标记可以选择手动更新offset来避免消息的重复消费。
而RabbitMQ中我们采用分布式幂等性解决方案使用唯一id或者redis分布式锁。
在业务中我们采用唯一id在消息通知时根据课程id查询是否已经在课程表中如果存在就说明用户购买付费课程成功消息已经通知过了就不需要重复通知了。
如果用redis分布式锁该如何实现消息不重复消费呢我们可以在消息通知之前在redis中插入一个分布式锁锁的内容可以是消息唯一id这个可以在消息中添加一个id字段来实现在接收到消息时从redis中查询是否存在这个锁如果没有就拒绝消费消息如果有就消费消息并最终删除锁。
五、总结
本文提出了一种高可用的电商支付架构设计方案依托实际业务场景进行落地开发架构分为三个模块订单模块支付模块和通知模块。 在订单模块设计了订单数据库包括订单表、订单记录表考虑到了订单的幂等性问题和订单未能及时处理的问题。 在支付模块对常见支付接口进行了选择分析包括微信支付和支付宝分析了适合pc端支付的支付流程并设计了支付记录表使用二维码生成器生成二维码成功请求第三方支付平台实现支付并对支付结果接收失败的问题进行了预备处理。 在通知模块使用消息队列RabbitMQ将订单支付结果及时通知给其他微服务并对消息的可靠性和重复消费问题进行了讨论。
每个模块履行单一职责模块间耦合度较低代码复用率高极大地提升了系统的稳定性从而实现高可用。
从用户角度出发优化了业务逻辑提升了用户体验增强用户黏性符合电商平台视顾客如上帝的理念。