申请网站服务器,大数据与网站开发技术,企查查企业信息,圣宠宠物网站建设总体思路是#xff0c;主节点接收到任务请求#xff0c;将根据任务情况拆分成多个任务块#xff0c;将任务块标识的主键放入redis。发送redis消息#xff0c;等待其他节点运行完毕#xff0c;结束处理。接收到信息的节点注册本节点信息到redis、开启多线程、获取任务块、执… 总体思路是主节点接收到任务请求将根据任务情况拆分成多个任务块将任务块标识的主键放入redis。发送redis消息等待其他节点运行完毕结束处理。接收到信息的节点注册本节点信息到redis、开启多线程、获取任务块、执行任务、结束处理。
1、主节点接收任务请求 Overridepublic void executeTaskInfo(PrepareDTO prepareDTO) {//异常标记String taskInfo prepareDTO.getTaskId();//任务分组状态String taskStatus ;try {log.info(数据准备任务并设定任务执行状态,{}, prepareDTO);this.dataPrepareBo.doStartGroupJobInfo(prepareDTO);//给redis集合中放计算对象log.info(开始放入计算任务:{}, prepareDTO);boolean getTaskFlag this.dataPrepareBo.pushCalculationObject(prepareDTO);if (!getTaskFlag) {taskStatus String.format(没有获取数据或计划已取消,%s, taskInfo);log.error(taskStatus);throw new Exception(taskStatus);}//发消息执行缓存中任务log.info(发消息执行任务:{}, prepareDTO);sendMessage(prepareDTO);//等待任务执行完毕log.info(等待任务执行结果);taskStatus this.getGroupUpLoadTaskFinsh(prepareDTO);} catch (Exception e) {//捕获日志e.printStackTrace();taskStatus 获取任务状态异常 e;log.info(taskStatus);dataPrepareBo.putExceptionMsg2Cache(taskInfo, 数据准备分发计算任务线程异常: taskStatus);} finally {//做任务结束处理this.doGroupTaskFinshpPocess(prepareDTO, taskStatus);}}
2发送消息 Overridepublic void sendMessage(String topic, String msg) {this.redisTemplate.convertAndSend(topic, msg);}
3节点接收任务并执行 public void doUpLoadTask(String msg) throws Exception {log.info(开始执行明细任务{} msg);String taskId this.getTaskId(msg);try {Object cancelFlag this.redisTemplate.opsForValue().get(String.format(EngineConstant.JOB_CANCEL_FLAG, taskId));if(cancelFlag ! null 1.equals(cancelFlag.toString())){log.info(本次任务已取消);return;}//上传本机执行信息到redisthis.cacheBo.initGroupUpLoadTaskStats(taskId,ENGINE_DISTRIBUTION_RUNNING.getKey());//从缓存获取任务,获取任务后启线程执行任务。如果没获取到任务则本节点任务执行完毕//循环获取任务this.groupTaskProcessBO.doGroupTaskProcess(taskId, null);//处理结束this.cacheBo.finishGroupUpLoadTaskStats(taskId,ENGINE_DISTRIBUTION_RUNNING.getKey());} catch (Exception e) {//记录日志taskUpldExeLogCDTO.setRunStas(-1);String exceptionInfo this.taskLogUtils.getExceptionInfo(e) ;taskUpldExeLogCDTO.setAbnInfo(exceptionInfo);throw e;} finally {//记录日志taskUpldExeLogCDTO.setEndtime(DateUtil.getCurrentDate());if(-1.equals(taskUpldExeLogCDTO.getRunStas())){//异常结束this.taskLogUtils.sendLogInfo(taskUpldExeLogCDTO,执行上传任务异常);} else {//正常结束taskUpldExeLogCDTO.setRunStas(1);this.taskLogUtils.sendLogInfo(taskUpldExeLogCDTO,执行上传任务正常);}}}
4开启线程执行任务 Overridepublic CalculationDTO doGroupTaskProcess(String taskId, TaskUpldExeLogCDTO taskUpldExeLogCDTO) throws Exception {ListFuture futureList new ArrayList();//开始执行明细任务处理ThreadPoolTaskExecutor taskTransferExecutor ToolUtil.getExecutor(engine-file-tasks-pool-, Math.min(parallelProcessNum,10), 8);ExecutorListHolder.putThreadPool(String.format(GroupConstant.PREPARE_ENGINE_POOL,taskId), taskTransferExecutor.getThreadPoolExecutor());for(int i 0 ; i parallelProcessNum ; i) {DoGroupUpLoadTaskThread doGroupUpLoadTaskThread new DoGroupUpLoadTaskThread(taskId, redisTemplate, calculationBo, null, null);Future? future taskTransferExecutor.submit(doGroupUpLoadTaskThread);futureList.add(future);}if (!CollectionUtil.isEmpty(futureList)) {futureList.forEach(f - {try {f.get(GroupTaskProcessBOImpl.maxTime, TimeUnit.SECONDS);} catch (Exception e) {e.printStackTrace();}});}log.info(本节点执行分组任务执行完毕{}, taskId : GroupConstant.IDENTITY);return null;}
5线程执行明细 Overridepublic ResponseDTO call() throws Exception {//执行任务while(true) {FilterTableUniqueDTO filterTableUniqueDTO (FilterTableUniqueDTO)this.redisTemplate.opsForList().leftPop(String.format(ENGINE_FILTERTABLEUNIQUE_QUEUE.getKey(), taskId));log.debug(取出任务: filterTableUniqueDTO);if(null filterTableUniqueDTO) {break ;}long lastNum this.redisTemplate.opsForList().size(String.format(ENGINE_FILTERTABLEUNIQUE_QUEUE.getKey(), taskId));log.info(生成文件剩余任务数量: lastNum);
// 处理任务calculationBo.GenerateFile(filterTableUniqueDTO, taskUpldDetlLogCDTO);}return null;}
以上是主要入口总体思路涉及代码详细实现整理起来涉及内容比较繁多将在第二部分分享。