网站开发cms,怎么用frontpage做网站,招聘网站建设的项目描述,工地建筑模板尺寸基于Java多线程处理数据 背景代码实现 背景
在日常工作中#xff0c;有一个同步企微客户-学员关系接口的定时任务在执行中随着数据量的不断增长#xff0c;定时任务的执行结束时间也出现了当天执行不完的情况#xff0c;影响到了正常业务的运行。基于这种情况#xff0c;在… 基于Java多线程处理数据 背景代码实现 背景
在日常工作中有一个同步企微客户-学员关系接口的定时任务在执行中随着数据量的不断增长定时任务的执行结束时间也出现了当天执行不完的情况影响到了正常业务的运行。基于这种情况在对该定时任务的业务逻辑代码分析验证后得出是调用企微客户-学员关系接口时耗时引起的但是查阅企微接口文档又不支持批量调用只能逐个调用。那么这种情况下既然批量调用接口不支持那么可以采用多线程并发调用的方式来降低定时任务整体的执行时间于是就需要用到线程池来进行多线程操作。
代码实现
在这里我将会使用spring自带的线程池类 ThreadPoolTaskExecutor 来进行处理 ThreadPoolTaskExecutor 是对 ThreadPoolExecutor 进行了封装处理源代码中可以看到 而线程池类ThreadPoolExecutor 是JDK的线程池类继承 AbstractExecutorService public class ThreadPoolExecutor extends AbstractExecutorService { AbstractExecutorService 实现 ExecutorService public abstract class AbstractExecutorService implements ExecutorService { ExecutorService 继承 Executor public interface ExecutorService extends Executor { 下面开始初始化线程池类 ThreadPoolTaskExecutor配置类 ThreadPoolConfig 代码如下
/*** 线程池配置***/Configurationpublic class ThreadPoolConfig{// 核心线程池大小private int corePoolSize 50;// 最大可创建的线程数private int maxPoolSize 200;// 队列最大长度private int queueCapacity 1000;// 线程池维护线程所允许的空闲时间private int keepAliveSeconds 300;Bean(name threadPoolTaskExecutor)public ThreadPoolTaskExecutor threadPoolTaskExecutor(){ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();executor.setMaxPoolSize(maxPoolSize);executor.setCorePoolSize(corePoolSize);executor.setQueueCapacity(queueCapacity);executor.setKeepAliveSeconds(keepAliveSeconds);// 线程池对拒绝任务(无线程可用)的处理策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());return executor;}}补充同步企微客户-学员关系定时任务 SyncWechatWorkCustomerLinkDetailHandler 代码如下
ComponentJobHandler(syncWechatWorkCustomerLinkDetailHandler)public class SyncWechatWorkCustomerLinkDetailHandler extends IJobHandler {Autowiredprivate IWechatCustomerLinkDetailService wechatCustomerLinkDetailService;Overridepublic ReturnTString execute(String params) throws Exception {wechatCustomerLinkDetailService.syncWechatWorkCustomerLinkDetail(params);return ReturnT.SUCCESS;}}业务处理实现类 syncWechatWorkCustomerLinkDetail 代码如下 Overridepublic void syncWechatWorkCustomerLinkDetail(String params) {XxlJobLogger.log(补充任务开始执行...[{}],params);//查询条件对象WechatCustomerLinkDetail searchparam new WechatCustomerLinkDetail();if (StringUtils.isNotEmpty(params)) {Long[] ids Convert.toLongArray(params);//根据ids查询数据searchparam.setLinkIds(ids);}// 分页查询企微获客助手客户链接int pageNo 0;int pageSize 200;while(true){pageNo;XxlJobLogger.log(第【{}】页数据开始补充...,pageNo);PageHelper.startPage(pageNo, pageSize);PageHelper.orderBy(id asc);ListWechatCustomerLinkDetail list wechatCustomerLinkDetailMapper.selectWechatCustomerLinkDetailList(searchparam);PageHelper.clearPage();if (CollUtil.isEmpty(list) ) {break;}//开始补充数据multiThreadProcessData(list);XxlJobLogger.log(第【{}】页数据补充完成...,pageNo);}}多线程处理列表中的数据类 multiThreadProcessData 代码如下 /*** 使用多线程处理列表中的数据* param list 待处理的微信客户链接详情列表*/public void multiThreadProcessData(ListWechatCustomerLinkDetail list) {// 将大集合分割为多个小集合以便多线程处理ListListWechatCustomerLinkDetail partitionData partitionData(list, 10);// 获取线程池执行器ThreadPoolTaskExecutor executor SpringUtils.getBean(threadPoolTaskExecutor);// 创建计数器用于线程同步CountDownLatch latch new CountDownLatch(partitionData.size());for (ListWechatCustomerLinkDetail details : partitionData) {// 提交任务给线程池执行每个任务负责处理一个分割后的列表executor.execute(() - {try {for (WechatCustomerLinkDetail detail : details) {//打印线程名称//System.out.println(nameThread.currentThread().getName());// 对每个详情进行处理填充微信用户名称信息 这里就是业务逻辑处理的地方fillWechatUserNameInfo(detail);}} catch (Exception e) {// 捕获异常并打印避免线程异常中断e.printStackTrace();} finally {// 处理完成后计数器减一用于线程同步latch.countDown();}});}// 等待所有任务完成try {latch.await();} catch (InterruptedException e) {// 线程被中断打印异常信息e.printStackTrace();}}分割数据列表 partitionData 代码 /*** 分割数据列表成多个小块。* param dataList 待分割的数据列表包含微信客户链接详情。* param partitionSize 每个分区的大小。* return 分割后的数据列表每个元素是一个分区分区内部保持原有顺序。*/private ListListWechatCustomerLinkDetail partitionData(ListWechatCustomerLinkDetail dataList, int partitionSize) {ListListWechatCustomerLinkDetail partitions new ArrayList();// 总数据量int size dataList.size();// 每个分区的实际大小整除操作保证每个分区大小尽量均匀int batchSize size / partitionSize;// 遍历分区数量次为每个分区添加数据for (int i 0; i partitionSize; i) {// 当前分区的起始索引int fromIndex i * batchSize;// 当前分区的结束索引如果是最后一个分区则包含所有剩余数据int toIndex (i partitionSize - 1) ? size : fromIndex batchSize;// 将当前分区的数据添加到分区列表中partitions.add(dataList.subList(fromIndex, toIndex));}return partitions;}到这里整个基于多线程处理数据的代码就整理完了代码结构并不复杂主要是注意数据查询以及服务器最大线程数相关数据防止线程不够用的情况。