深圳哪家网站建设的公司好,软件公司简介内容怎么写,铁岭网站制作,哪些网站做的最好文章目录前言一、重试场景分析一、如何实现重试1. 扫表2. 基于中间件自身特性3. 基于框架4. 根据公司业务特性自己实现的重试二、重试组件封装1. 需求分析2. 模块设计2.1 持久化模块1. 表定义2. 持久化接口定义3. 持久化配置类2.2 重试模块1.启动2.重试3. 业务端使用1. 引入依赖…
文章目录前言一、重试场景分析一、如何实现重试1. 扫表2. 基于中间件自身特性3. 基于框架4. 根据公司业务特性自己实现的重试二、重试组件封装1. 需求分析2. 模块设计2.1 持久化模块1. 表定义2. 持久化接口定义3. 持久化配置类2.2 重试模块1.启动2.重试3. 业务端使用1. 引入依赖2. 新增配置3. 使用总结前言
如何封装一套服务自身业务开箱即用的重试组件是个值得思考的问题 在开发支付系统过程中我们经常会遇到这样的业务场景调用下游系统、回调上游系统由于网络原因或者当时对方系统不可用导致调用失败那么调用失败就失败了么当然肯定不是一般都要有重试机制。这种重试机制实现有很多方式但是万万不可依赖其他系统的重试机制去重试你要重试调用的系统这个原因下面分析。本篇文章就重试场景给出一个个人觉得还不错的解决方案也是作者所在用的解决方案如有更好的解决方案欢迎交流。 一、重试场景分析
在支付系统中我们经常会将一些非核心业务流程做成异步的在核心主流程中往MQ写入一条相对应的待处理消息写入成功即认为业务处理成功了所以我们要证在消费端最大程度的保证处理成功。 在结果通知中也有失败重试策略我们对接支付渠道如支付宝如果不返回指定成功的报文信息其将在25小时以内完成8次通知通知的间隔频率一般是4m,10m,10m,1h,2h,6h,15。 这里我们分析个场景流程很简单如下 支付渠道通知我们的支付系统支付系统通知商户系统之间为同步调用渠道调用过来支付系统变更订单状态变更后调用商户系统如果调用商户系统失败了那么支付系统给渠道返回失败然后过一段时间后渠道发起重试再次调用支付系统支付系统再调用商户系统。借助渠道的通知重试策略来完成自身的重试通知。谁要是这么设计原地刨个坑活埋了他吧不要觉得没有人用这种方式事实就是真的有公司这么用。结果可想而知不出问题只能说明做的系统没交易量一旦有交易量支付系统会被商户系统给拖垮掉原因自行分析。
本篇文章呢我们以支付结果通知为例作为场景展开分析做一个面对这种场景的统一解决方案同时是没有使用充值VIP的RabbitMQ作为消息中间件。
既然没钱充值VIP购买其强大的重试功能只能自己开发了。
一、如何实现重试
1. 扫表
实现重试的方式有很多种有基于扫描表的如下 前置通知失败后即落入重试表待定时任务触发扫描表重新发起调用这种处理方案是很多公司在用的。这种方案虽然不会像上面有拖垮系统的风险但是问题还是很多的如定时任务多久触发一次有些交易对实时性要求比较高如果第一次因为网络原因导致的失败紧接着重试一般就能成功了那么就把定时任务设定1s一次的频率这种方式不再详细分析了…有点设计能力的人都不会采用这种方式吧。
2. 基于中间件自身特性
RocketMQ中间件本身已经支持重试下文直接截图了
3. 基于框架
针对RabbitMQ中间件spring提供的retry
server:port:8080
spring:rabbitmq:host: xxx.xxx.xxx.xxxport: 5672username: xxxxpassword: xxxpublisher-confirm-type: correlatedlistener:simple:acknowledge-mode: manualretry:enabled: truemax-attempts: 5initial-interval: 5000max-interval: 100004. 根据公司业务特性自己实现的重试 如上是自己基于“指数退避策略进行延迟重试”封装的一套重试组件也是本篇要介绍的方案。
二、重试组件封装
1. 需求分析
如何封装一套服务自身业务开箱即用的重试组件是个值得思考的问题但是Spring-boot已经给出了答案。我们在使用Springboot开发项目时候想要集成RabbitMQ只需要加入依赖然后配置yml就可以使用了一旦满足约定好的条件Springboot则帮我们激活所需要的Bean那么我们是不是也可以参考其思想自己也装配重试所需的Bean。 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactIdversion2.4.1/version/dependency决定了怎么做然后分析业务系统特性自己做的支付系统业务特性是一个系统会有多个队列的消费者并且每个队列消息处理失败后的重试次数、间隔时间也各不相同并且达到最大失败重试次数后要入通知重试表供后期业务系统恢复后再次发起重试。最终要的是使用系统只需要简单配置下就可以实现上面需求就像spring提供的retry机制一样简单配置下就行了不需要你知道底层原理。
2. 模块设计 从我们的架构图中可以看到其主要分为两个模块重试模块、持久化模块我们逐个分析这俩模块的设计实现首先从简单的开始持久化模块。
2.1 持久化模块
首先没得说需要建表需要使用starter提供的自动持久化功能就要创建starter持久化所需要的表
1. 表定义
/*** author Kkk* Description: 异常通知恢复表*/
Entity
Table(name notify_recover)
public class NotifyRecover implements Serializable {/**id*/IdColumn(nameid,insertable false)private Long id;/** 唯一标识键 */Column(nameunique_key)private String uniqueKey ;/** 场景码 */Column(namescene_code)private String sceneCode ;/** 调用方系统 */Column(namesystem_id)private String systemId;/** 通知内容 */Column(namenotify_content)private String notifyContent ;/** 通知方式:http mq */Column(namenotify_type)private int notifyType ;/** 交换器*/Column(nameexchange)private String exchange ;/** 异步通知路由键 */Column(namenotify_key)private String notifyKey ;/** 通知次数 */Column(namenotify_num)private int notifyNum ;/** 通知状态 */Column(namenotify_status)private String notifyStatus ;/** 备注 */Column(nameremark)private String remark ;/** 扩展字段 */Column(nameextend)private String extend ;/** 创建时间 */Column(namecreate_time,insertable false)private Date createTime ;/** 修改时间 */Column(nameupdate_time,insertable false)private Date updateTime ;Column(namebucket)private String bucket ;// ... ...
}2. 持久化接口定义
然后入表接口肯定也是需要的
/*** author Kkk* Description: 发送失败处理*/
public interface NotifyRecoverHandlerT {/*** 处理重发失败入重试表* param t*/public void handlerSendFail(T t);
}3. 持久化配置类
创建持久化配置类
/*** author Kkk* Description: 持久化配置类*/
Configuration
ConditionalOnProperty(prefix spring.rabbitmq.retry,value recover,havingValue true,matchIfMissing false)
public class JdbcHelperMqConfiguration {Bean(name jdbcSelectProvider)public JdbcSelectProvider jdbcSelectProviderBean() {return new JdbcSelectProvider();}Bean(name jdbcInsertProvider)public JdbcInsertProvider jdbcInsertProviderBean() {return new JdbcInsertProvider();}Bean(name jdbcUpdateProvider)public JdbcUpdateProvider jdbcUpdateProviderBean() {return new JdbcUpdateProvider();}Bean(name jdbcHelper)public JdbcHelper jdbcHelperBean(Qualifier(jdbcSelectProvider)JdbcSelectProvider jdbcSelectProvider,Qualifier(jdbcInsertProvider)JdbcInsertProvider jdbcInsertProvider,Qualifier(jdbcUpdateProvider)JdbcUpdateProvider jdbcUpdateProvider) {return new JdbcHelperImpl(jdbcSelectProvider,jdbcInsertProvider,jdbcUpdateProvider);}Bean(name notifyRecoverHandler)ConditionalOnMissingBean(value NotifyRecoverHandler.class)public NotifyRecoverHandler notifyRecoverHandlerBean(Qualifier(jdbcHelper)JdbcHelper jdbcHelper) {return new DefaultNotifyRecoverHandlerImpl(jdbcHelper);}
}此配置类的激活条件时配置了失败是否需要入重试表配置。同时也可以不使用starter提供的入表策略如果业务系统有自己的重试表那么就可以将失败的消息入到自定义的表中此处预留的扩展点。 jdbcSelectProvider、jdbcInsertProvider、jdbcUpdateProvider这个三个类为查询、新增、更新对应的处理类为底层的JDBC操作。
/*** author Kkk* Description: select提供类*/
public class JdbcSelectProviderT {private static final Logger logger LoggerFactory.getLogger(JdbcSelectProvider.class);Resourceprivate DataSource dataSource;public JdbcSelectProvider() {}public ListT select(String sql, Class outputClass) {return this.selectExecute(sql,outputClass);}private ListT selectExecute(String sql, Class outputClass,Object... params) {Connection connection null;PreparedStatement pst null;ResultSet res null;ListT ts null;try {connection DataSourceUtils.getConnection(this.dataSource);pst connection.prepareStatement(sql);for(int i 0; i params.length; i) {pst.setObject(i 1, params[i]);}res pst.executeQuery();ts mapRersultSetToObject(res, outputClass);} catch (SQLException var7) {var7.printStackTrace();}finally {try {connection.close();pst.close();} catch (SQLException throwables) {throwables.printStackTrace();}}return ts;}SuppressWarnings(unchecked)public ListT mapRersultSetToObject(ResultSet rs, Class outputClass) {ListT outputList null;try {if (rs ! null) {if (outputClass.isAnnotationPresent(Entity.class)) {ResultSetMetaData rsmd rs.getMetaData();Field[] fields outputClass.getDeclaredFields();while (rs.next()) {T bean (T) outputClass.newInstance();for (int _iterator 0; _iterator rsmd.getColumnCount(); _iterator) {String columnName rsmd.getColumnName(_iterator 1);Object columnValue rs.getObject(_iterator 1);for (Field field : fields) {if (field.isAnnotationPresent(Column.class)) {Column column field.getAnnotation(Column.class);if (column.name().equalsIgnoreCase(columnName) columnValue ! null) {BeanUtils.setProperty(bean, field.getName(), columnValue);break;}}}}if (outputList null) {outputList new ArrayListT();}outputList.add(bean);}} else {logger.error(查询结果集映射失败映射类需要Entity注解);}} else {return null;}} catch (Exception e) {logger.error(查询结果集映射失败,e);}return outputList;}
}
jdbcHelper对如上几个Provider进行了统一包装处理
/*** author Kkk* Description:*/
public class JdbcHelperImpl implements JdbcHelper {private Logger logger LoggerFactory.getLogger(JdbcHelperImpl.class);String s;private JdbcSelectProvider jdbcSelectProvider;private JdbcInsertProvider jdbcInsertProvider;private JdbcUpdateProvider jdbcUpdateProvider;ResultSetMapperNotifyRecover resultSetMapper new ResultSetMapperNotifyRecover();public JdbcHelperImpl(JdbcSelectProvider jdbcSelectProvider, JdbcInsertProvider jdbcInsertProvider,JdbcUpdateProvider jdbcUpdateProvider) {this.jdbcSelectProvider jdbcSelectProvider;this.jdbcInsertProvider jdbcInsertProvider;this.jdbcUpdateProvider jdbcUpdateProvider;}public ListNotifyRecover selectData(String uniqueKey,String sceneCode){StringBuilder stringBuilder new StringBuilder(SELECT * FROM notify_recover WHERE unique_key);stringBuilder.append(uniqueKey);stringBuilder.append(s);stringBuilder.append( AND scene_code);stringBuilder.append(sceneCode);stringBuilder.append(s);String sql stringBuilder.toString();ListNotifyRecover pojoList this.jdbcSelectProvider.select(sql, NotifyRecover.class);if(nullpojoList || pojoList.size()0 ){logger.info(根据uniqueKey{},sceneCode({})查询结果为空!,uniqueKey,sceneCode);return null;}return pojoList;}Overridepublic void insertData(NotifyRecover notifyRecover) {jdbcInsertProvider.insert(notifyRecover);}Overridepublic int updateData(NotifyRecover notifyRecover) {StringBuilder stringBuilder new StringBuilder(UPDATE notify_recover SET notify_status);stringBuilder.append(notifyRecover.getNotifyStatus());stringBuilder.append(, notify_num);stringBuilder.append(notifyRecover.getNotifyNum());stringBuilder.append( WHERE unique_key);stringBuilder.append(notifyRecover.getUniqueKey());stringBuilder.append(s);stringBuilder.append( AND scene_code);stringBuilder.append(notifyRecover.getSceneCode());stringBuilder.append(s);String sql stringBuilder.toString();int resultSet this.jdbcUpdateProvider.update(sql);return resultSet;}
}最后一部分持久化接口默认实现如果业务方想使用持久化进制并没有实现持久化接口则采用默认实现 Bean(name notifyRecoverHandler)ConditionalOnMissingBean(value NotifyRecoverHandler.class)public NotifyRecoverHandler notifyRecoverHandlerBean(Qualifier(jdbcHelper)JdbcHelper jdbcHelper) {return new DefaultNotifyRecoverHandlerImpl(jdbcHelper);}持久化默认实现
/*** author Kkk* Description: 持久化默认实现*/
public class DefaultNotifyRecoverHandlerImpl implements NotifyRecoverHandlerNotifyRecover {private Logger logger LoggerFactory.getLogger(DefaultNotifyRecoverHandlerImpl.class);BasicThreadFactory factory new BasicThreadFactory.Builder().namingPattern(recover-execute-thread-%d).uncaughtExceptionHandler(new NotifyRecoverThreadUncaughtExceptionHandler()).build();private ThreadPoolExecutor executor (ThreadPoolExecutor) Executors.newFixedThreadPool(4,factory);private JdbcHelper jdbcHelperImpl;public DefaultNotifyRecoverHandlerImpl(JdbcHelper jdbcHelperImpl) {this.jdbcHelperImpl jdbcHelperImpl;}Overridepublic void handlerSendFail(NotifyRecover notifyRecover) {executor.execute(new Runnable() {Overridepublic void run() {//采用异步持久化}});}
}到这里就完成了持久化工作了但是还有一个很重要的问题怎么将此类注册为Spring中的Bean呢方式多种最简单的是使用Import标签在重试的主配置类上引入此配置类。
Import(JdbcHelperMqConfiguration.class)
public class RabbitMqRetrySendConfigurationMultiply {
}2.2 重试模块
下面分析重试模块首先重试模块我们是基于RabbitMQ死信队列来做的关于死信、死信队列的概念这里不做解释了 重试最主要分为启动时、运行时两部分。
1.启动
根据配置自动生成死信队列并通过对应的交换器与原队列进行路由绑定大概流程见很久之前写的一篇博客[商户交易结果通知设计]当时只是针对支付系统通知功能做的并没有做什么组件化后期发现实际 项目中很多场景都需要这种重试机制所以为了避免重复代码的编写后期就简单的封装了下作为一个延迟重试组件以供在项目中开发作为一个组件直接引入依赖使用就行了。 要做的是如何将原来的代码片段封装到starter并装配到Spring中。
Configuration
EnableConfigurationProperties({RabbitMqRetryMultiplyProperties.class, SystenEnvProperties.class})
ConditionalOnProperty(prefix spring.rabbitmq,value isRetry,havingValue true)
ConditionalOnClass({ AmqpAdmin.class, RabbitTemplate.class })
Import(JdbcHelperMqConfiguration.class)
public class RabbitMqRetrySendConfigurationMultiply {Autowiredprivate RabbitMqRetryMultiplyProperties rabbitMqRetryMultiplyProperties;Autowiredprivate SystenEnvProperties systenEnvProperties;Bean(name rabbitMqService)public RabbitMqService rabbitMqServiceBean() {return new RabbitMqServiceImpl();}Bean(initMethod start, destroyMethod stop)public PscCommonRetryQueueManager pscCommonRetryQueueManager(Qualifier(rabbitMqService)RabbitMqService rabbitMqService,Autowired(required false) Qualifier(notifyRecoverHandler)NotifyRecoverHandler notifyRecoverHandler) {return PscCommonRetryQueueManager.builder().configs(rabbitMqRetryMultiplyProperties.getConfigs()).retryCountFlag(SystemConstant.RETRY_COUNT_FLAG).rabbitMqService(rabbitMqService).notifyRecoverHandler(notifyRecoverHandler).applicationName(systenEnvProperties.getName()).build();}
}即满足如下两个条件即会构建PscCommonRetryQueueManager这个Bean。 ConditionalOnProperty ConditionalOnClass 初始化时候会调用其start方法在看之前先看下配置类需要用户配置什么东西。
/*** author Kkk* Description: 重试配置类*/
Data
public class ConfigEntity implements Serializable {//重试次数private Integer retry_count5;//重试队列名private String retry_queue_name_prefix;//死信消息失效时间计算方式指数方式 exponentialprivate String message_expiration_typeexponential;//x-dead-letter-exchangeprivate String x_dead_letter_exchange;//x-dead-letter-exchangeprivate String x_dead_letter_routing_key;//延迟时间因子10s。具体延迟时间计算方式2^count*10spublic Integer delay_milliseconds10000;//项目需要消费的队列名称public String consumer_queue_name;//消息丢失处理策略public String notify_recover_handler;
}接下来看其start方法做了什么首先看下类继承关系 在接口中定义方法。
/*** author Kkk* Description: 重试管理接口*/
public interface RetryQueueManager {/*** 启动*/void start();/*** 停止*/void stop();/*** 发送延迟消息 -可捕获异常入重试表*/boolean sendRetryMessage(Message message);/***发送消息 -可捕获异常入重试表*/boolean sendMessage(String exchange, String routingKey, String jsonString,String uniqueKey,String sceneCode);/*** 发送消息 -可捕获异常入重试表*/boolean sendMessage(String exchange, String routingKey, String jsonString);/*** 发送延迟消息-发送网络异常可以放入重试表*/boolean sendRetryMessage(Message message,String uniqueKey,String sceneCode);
}抽象层抽取了写公共参数具体实现由子类实现。
/*** author Kkk* Description: 抽象层*/
public abstract class AbstractRetryQueueManager implements RetryQueueManager {private Logger logger LoggerFactory.getLogger(AbstractRetryQueueManager.class);// 重试处理protected NotifyRecoverHandler notifyRecoverHandler;// 消息处理protected RabbitMqService rabbitMqService;//消息重试次数标识 埋点到消息头中的字段public String retryCountFlag;//应用名称public String applicationName;//重试配置相关信息public ListRetryQueueConfigs retryQueueConfigs;Datapublic static final class RetryQueueConfigs {//重试次数public Integer retryCount10;//重试队列名public String retryQueueNamePrefix;//死信消息失效时间计算方式指数方式 exponentialpublic String messageExpirationTypeexponential;//x-dead-letter-exchangepublic String xDeadLetterExchangetopic;//x-dead-letter-routing-keypublic String xDeadLetterRoutingKey;//延迟时间因子10s。具体延迟时间计算方式2^count*10spublic Integer delayMilliseconds;//项目需要消费的队列名称public String consumerQueueName;}Overridepublic void start() {logger.info(开始创建重试队列);createRetryQueue();logger.info(创建重试队列完成);}/*** 应用启动构建重试队列*/protected abstract void createRetryQueue();Overridepublic void stop() {}// ... ...
}在子类实现抽象层方法createRetryQueue()生成死信交换器和队列并绑定接着根据配置生成指定个说的死信队列默认按照指数类型延迟时间因子10s。具体延迟时间计算方式2^count*10s然后将这些队列绑定到上面生成的交换器上由于这些生成的死信队列没有消费者所以消息过期后会再被路由到原队列中即可又被正常消费处理以此来达到延迟的效果原理比较简单。
Override
protected void createRetryQueue() {for (RetryQueueConfigs config:retryQueueConfigs) {TopicExchange topicExchange ExchangeBuilder.topicExchange(config.getXDeadLetterExchange()).build();rabbitAdmin.declareExchange(topicExchange);Queue queue1 QueueBuilder.durable(config.getConsumerQueueName()).build();rabbitAdmin.declareQueue(queue1);Binding binding BindingBuilder.bind(queue1).to(topicExchange).with(config.getXDeadLetterRoutingKey());rabbitAdmin.declareBinding(binding);if(ExpirationTypeEnum.EXPONENTIAL.getCode().equals(config.getMessageExpirationType())){logger.info(申明“指数型”重试队列开始...);for (int i 0; i config.getRetryCount(); i) {String queueName null;try {MapString, Object args new HashMapString, Object();//指定当成为死信时重定向到args.put(x-dead-letter-exchange, config.getXDeadLetterExchange());args.put(x-dead-letter-routing-key, config.getXDeadLetterRoutingKey());String expiration String.valueOf(Double.valueOf(Math.pow(2, i)).intValue()*config.getDelayMilliseconds());queueName config.getRetryQueueNamePrefix() . expiration;//声明重试队列将参数带入Queue queue QueueBuilder.durable(queueName).withArguments(args).build();rabbitAdmin.declareQueue(queue);logger.info(申明“指数型”重试队列成功[queueName:{}], queueName);}catch (Throwable e){logger.error(申明“指数型”重试队列失败[i:{}, queueName:{}, e.message{}]异常:, i, queueName, e.getMessage(), e);}}logger.info(申明“指数型”重试队列结束...);}}
}2.重试
判断重试次数消费端获取到消息后根据消息头埋点可以获到重试次数重试次数超过最大次数则入重试表待后期分析处理。
/*** 判断是否超过重试次数*/
public RetryEntity isOutOfRetryCount(Message message){int messageRetryCount getMessageRetryCount(message);RetryQueueConfigs config getRetryConfigByOriQueue(message);boolean resultmessageRetryCount(nullconfig?0:config.getRetryCount())?false:true;if(!result){logger.info(超过最大重试次数,入重试表!);//... ...}return new RetryEntity(result,messageRetryCount);
}/*** 获取重试次数*/
public int getMessageRetryCount(Message message){//初始为0int count 0;MapString, Object headers message.getMessageProperties().getHeaders();if(headers.containsKey(retryCountFlag)){count NumberUtils.toInt((String) message.getMessageProperties().getHeaders().get(retryCountFlag), 0);}return count;
}关于重试即消费端处理失败后进行重新投递根据重试次数计算要投递的队列名称。
Override
public boolean sendRetryMessage(Message message) {boolean resulttrue;try {//从消息题中获取到消息来源--队列名称然后根据队列名称获取到配置中心此队列配置的相关信息RetryQueueConfigs retryConfigByOriQueue getRetryConfigByOriQueue(message);//从消息头中获取到重试次数int retryCount getMessageRetryCount(message);//根据配置中心配置的死信消息失效时间计算方式默认指数方式和重试次数计算出死信队列名称后缀String expiration getRetryMessageExpiration(retryCount,retryConfigByOriQueue);logger.info(消息重发开始[expiration:{}, retryCount:{}], expiration, retryCount);//获取死信队列名称String queueName getRetryQueueName(expiration,retryConfigByOriQueue);logger.info(消息重发获取重试队列[expiration:{}, retryCount:{}, queueName:{}], expiration, retryCount, queueName);//发送消息rabbitMqService.sendRetry(, queueName, message, expiration, retryCount,retryCountFlag);logger.info(消息重发结束[expiration:{}, retryCount:{}], expiration, retryCount);} catch (Exception e) {logger.info(({})发送重试消息失败, JSON.toJSONString(message),e);resultfalse;}return result;
}3. 业务端使用
1. 引入依赖 dependencygroupIdcom.epay/groupIdartifactIddelay-component-spring-boot-stater/artifactIdversion1.0.0-SNAPSHOT/version/dependency2. 新增配置
3. 使用 总结
本篇简单的介绍了下在工作中将RabbitMQ进行简单封装作为延时组件使用在使用时只需要简单的进行配置下就可以达到延时效果降低了重复代码的编写大大缩短了项目开发周期由于工期紧张封装的starter还是比较粗糙的还有好多地方需要斟酌打磨。
本篇也只是提供一种思想吧在工作中可以借鉴下避免重复劳动将业务功能组件化以后不管在什么项目中只要有相同业务场景就可以引入现有组件快速完成业务功能开发。
拙技蒙斧正不胜雀跃。