酒店官方网站的功能建设,中午版wordpress,小当网 绵阳网站建设,WordPress怎么加按钮文章目录 一、什么是分布式调度二、Elastic-Job 介绍三、Elastic-Job 实战3.1 环境搭建3.1.1 本地部署3.1.2 服务器部署3.1.3 Zookeeper 管控台界面 3.2 入门案例3.3 SpringBoot 集成 Elastic-Job3.4 任务分片#xff08;★#xff09;3.5 Dataflow 类型调度任务 一、什么是分… 文章目录 一、什么是分布式调度二、Elastic-Job 介绍三、Elastic-Job 实战3.1 环境搭建3.1.1 本地部署3.1.2 服务器部署3.1.3 Zookeeper 管控台界面 3.2 入门案例3.3 SpringBoot 集成 Elastic-Job3.4 任务分片★3.5 Dataflow 类型调度任务 一、什么是分布式调度
什么是任务调度 我们可以思考一下下面业务场景的解决方案 ① 某电商平台需要每天上午10点下午3点晚上8点发放一批优惠券 ② 某银行系统需要在信用卡到期还款日的前三天进行短信提醒 ③ 某财务系统需要在每天凌晨0:10分结算前一天的财务数据统计汇总 以上场景就是任务调度所需要解决的问题。
任务调度是为了自动完成特定任务在约定的特定时刻去执行任务的过程
以上的场景可以使用定时任务注解 Scheduled 贴在业务方法上并在启动类上贴上 EnableScheduling 注解以实现任务调度。
Scheduled(cron 0/20 * * * * ? )
public void doWork(){//doSomething
}什么是分布式调度
感觉 Spring 给我们提供的这个注解可以完成任务调度的功能为什么还需要分布式呢主要有如下这几点原因
单机处理极限原本 1 分钟内需要处理 1 万个订单但是现在需要 1 分钟内处理 10 万个订单原来一个统计需要1小时现在业务方需要10分钟就统计出来。你也许会说你也可以多线程、单机多进程处理。的确多线程并行处理可以提高单位时间的处理效率但是单机能力毕竟有限(主要是CPU、内存和磁盘)始终会有单机处理不过来的情况。高可用单机版的定式任务调度只能在一台机器上运行如果程序或者系统出现异常就会导致功能不可用。虽然可以在单机程序实现的足够稳定但始终有机会遇到非程序引起的故障而这个对于一个系统的核心功能来说是不可接受的。防止重复执行在单机模式下定时任务是没什么问题的。但当我们部署了多台服务同时又每台服务又有定时任务时若不进行合理的控制在同一时间只有一个定时任务启动执行这时定时执行可能存在执复、混乱和错误的情况了。
这个时候就需要分布式的任务调度来实现了。 二、Elastic-Job 介绍
Elastic-Job 是一个分布式调度的解决方案它由两个相互独立的子项目 Elastic-job-Lite 和 Elastic-Job-Cloud 组成使用 Elastic-Job 可以快速实现分布式任务调度。
官方地址https://shardingsphere.apache.org/elasticjob/
功能列表
分布式调度协调在分布式环境中任务能够按照指定的调度策略执行并且能够避免同一任务多实例重复执行。丰富的调度策略基于成熟的定时任务作业框架 Quartz cron 表达式执行定时任务。弹性拓容缩容当集群中增加一个实例它应当能够被选举被执行任务当集群减少一个实例时他所执行的任务能被转移到别的示例中执行。失效转移某示例在任务执行失败后会被转移到其他实例执行。错过执行任务重触发若因某种原因导致作业错过执行自动记录错误执行的作业并在下次作业完成后自动触发。支持并行调度支持任务分片任务分片是指将一个任务分成多个小任务在多个实例同时执行。作业分片一致性当任务被分片后保证同一分片在分布式环境中仅一个执行实例。支持作业生命周期操作可以动态对任务进行开启及停止操作丰富的作业类型。
执行架构如下 三、Elastic-Job 实战
3.1 环境搭建
zookeeper 可以理解为 elastic-job 的注册中心分布式调度等功能由它实现首先要下载资源。 csdn搜索资源 zookeeper-3.4.11.tar.gz
3.1.1 本地部署
将 zookeeper-3.4.11.tar.gz 解压并将 conf 目录下 zoo_sample.cfg 拷贝一份命名成 zoo.cfg 其中 zookeeper 默认端口是 2181 切换到 bin 目录下双击 zkServer.cmd即可启动 zookeeper 3.1.2 服务器部署
step1将 zookeeper-3.4.11.tar.gz上传到 /usr/local/software目录下 step2解压文件到指定目录
tar -zxvf /usr/local/software/zookeeper-3.4.11.tar.gz -C /usr/local/step3拷贝配置文件
cp /usr/local/software/zookeeper-3.4.11/conf/zoo_sample.cfg /usr/local/software/zookeeper-3.4.11/conf/zoo.cfgstep4启动
/usr/local/zookeeper-3.4.11/bin/zkServer.sh startstep5检查进程是否开启需要查看到QuorumPeerMain进程如果存在则证明启动成功。
jpszookeeper常用名称参考 linux下的zookeeper启动、停止 常用命令
注如果启动显示 Starting zookeeper ... already running as process 7827. 但是 jps 中没有 QuorumPeerMain 进程。则需查看 zookeeper_server.pid 文件的位置并删除。
# 查看该文件位置
find / -name zookeeper_server.pid
# 跳转到该文件的位置并删除
rm -rf zookeeper_server.pid另外服务器需要暂时关闭防火墙 systemctl stop firewalld并可使用 firewall-cmd --state 查看防火墙状态。 具体可参考Linux关闭防火墙命令 3.1.3 Zookeeper 管控台界面
搜索下载zooInspector.zip
解压后进入 build 目录运行 jar 包java -jar zookeeper-dev-ZooInspector.jar 点击绿色按钮输入连接的IP和端口号即可。 3.2 入门案例
版本要求JDK 要求1.7 以上版本Maven 要求 3.0.4 及以上版本Zookeeper 要求 3.4.6 以上版本
1、引入 pom 依赖
dependencygroupIdcom.dangdang/groupIdartifactIdelastic-job-lite-core/artifactIdversion2.1.5/version
/dependency2、调度任务类
public class MyElasticJob implements SimpleJob {Overridepublic void execute(ShardingContext shardingContext) {System.out.println(执行任务 new Date());}
}3、zookeeper 的配置类 启动类定义 JobScheduler 对象里面传入两个对象定时任务配置对象、注册中心配置并调用 init() 方法完成初始化。
定时任务配置对象中要设置任务名称、cron表达式和分片数量并设置 任务对象的全路径类名。 注册中心配置对象中要设置注册中心的地址、项目名以及节点的超时时间。
public class JobDemo {public static void main(String[] args) {new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();}/*** Zookeeper注册中心配置** return 注册中心配置对象*/private static CoordinatorRegistryCenter createRegistryCenter() {// ZookeeperConfiguration(zookeeper地址, 项目名)ZookeeperConfiguration configuration new ZookeeperConfiguration(localhost:2181, elastic-job);// 设置节点超时时间即每隔一段时间查看当前节点是否下线configuration.setConnectionTimeoutMilliseconds(100);ZookeeperRegistryCenter center new ZookeeperRegistryCenter(configuration);center.init();return center;}/*** 定时任务配置** return 定时任务配置对象*/private static LiteJobConfiguration createJobConfiguration() {// 定义作业核心配置 newBuilder(任务名称, corn表达式, 分片数量)JobCoreConfiguration simpleCoreConfig JobCoreConfiguration.newBuilder(myElasticJob, 0/10 * * * * ?, 1).build();// 定义simple类型配置MyElasticJob.class.getCanonicalName() 是获取MyElasticJob的全限定类名全路径类名SimpleJobConfiguration configuration new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());// 定义Lite作业根配置并返回// 设置overwrite(true)允许覆盖cron表达式默认不允许会每5s执行一次return LiteJobConfiguration.newBuilder(configuration).overwrite(true).build();}
}注 .overwrite(true) 如果不设置默认 5 秒执行一次。
4、运行main方法。当开启第二个实例的时候第一个实例停止打印当关闭第二个实例的时候第一个实例又重新开始运行。 3.3 SpringBoot 集成 Elastic-Job
1、添加 pom 依赖
project xmlnshttp://maven.apache.org/POM/4.0.0 xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.axy/groupIdartifactIdelastic-job-boot/artifactIdpackagingjar/packagingversion1.0-SNAPSHOT/versionnameelastic-job-boot/nameurlhttp://maven.apache.org/urlparentgroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-parent/artifactIdversion2.1.3.RELEASE/version/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdcom.dangdang/groupIdartifactIdelastic-job-lite-spring/artifactIdversion2.1.5/version/dependency.../dependenciesbuildfinalNameelastic-job-boot/finalName/build
/project2、因为配置中心的地址并不是固定的所以我们应该把这个地址信息配置在配置文件中所以在配置文件 application.yml 中添加配置如下
zookeeper:url: localhost:2181groupName: elastic-job-boot3、调度任务类交给 Spring 进行管理
Component
public class MyElasticJob implements SimpleJob {Overridepublic void execute(ShardingContext shardingContext) {System.out.println(定时调度 new Date());}
}4、zookeeper 的配置类 elastic-job 集成 SpringBoot 以后需要创建的不是 JobScheduler 对象而是 SpringJobScheduler 对象并交给 Spring 管理。其次初始化该对象还需要传入需要分布式调度的任务对象当参数。
Configuration
public class TestJobConfig {Bean(initMethod init) // 创建bean后调用init方法public SpringJobScheduler testScheduler(MyElasticJob job, CoordinatorRegistryCenter registryCenter) {// 方法形参自动会去spring的容器中寻找首先会去看类型匹配然后才会去看变量名匹配方式和Autowird一样LiteJobConfiguration configuration createJobConfiguration(job.getClass(), 0/4 * * * * ?, 1);return new SpringJobScheduler(job, registryCenter, configuration);}Beanpublic CoordinatorRegistryCenter registryCenter(Value(${zookeeper.url}) String url,Value(${zookeeper.groupName}) String groupName) {// ZookeeperConfiguration(zookeeper地址, 项目名)ZookeeperConfiguration configuration new ZookeeperConfiguration(url, groupName);// 设置节点超时时间configuration.setConnectionTimeoutMilliseconds(100);ZookeeperRegistryCenter center new ZookeeperRegistryCenter(configuration);center.init();return center;}/*** 定时任务配置* 这个定时任务使用的场景比较灵活因此不建议放在spring的容器当中** param clazz 定时任务的字节码* param cron cron表达式* param shardingCount 分片数量* return 定时任务配置对象*/private static LiteJobConfiguration createJobConfiguration(Class clazz, String cron, int shardingCount) {// 定义作业核心配置 newBuilder(任务名称, corn表达式, 分片数量)JobCoreConfiguration.Builder jobBuilder JobCoreConfiguration.newBuilder(clazz.getSimpleName(), cron, shardingCount);JobCoreConfiguration simpleCoreConfig jobBuilder.build();// 定义simple类型配置MyElasticJob.class.getCanonicalName() 是获取MyElasticJob的全限定类名全路径类名System.out.println(MyElasticJob.class.getCanonicalName(): MyElasticJob.class.getCanonicalName());JobTypeConfiguration configuration;configuration new SimpleJobConfiguration(simpleCoreConfig, clazz.getCanonicalName());// 定义Lite作业根配置并返回// 设置overwrite(true)允许覆盖cron表达式默认不允许return LiteJobConfiguration.newBuilder(configuration).overwrite(true).build();}
}5、启动项目 3.4 任务分片★
分片就是在就是在机器中分线程执行分片的数量决定了最终分得线程的数量将一个任务拆分为多个独立的任务项然后由分布式的应用实例分别执行某一个或者几个分布项下面以案例来逐步带入分片的概念。
现要处理下图表中数据的 backedUp 属性设置为 1。表一共有 20 条数据根据 type 进行分类可以分成 text、image、radio 和 video 四类那我们可以自定义任务分 4 片分片索引分别为0、1、2、3。
当只有一台机器的情况下在机器 A 启动四个线程分别处理四个分片的内容。当有两台机器的情况下机器 A 启动两个线程负责索引 0 1 的分片内容机器 B 负责 2 3 的分片内容。当有三台机器的情况下机器 A 负责索引 0 1 的分片内容机器 B 负责 2机器 C 负责 3。当有四台机器的情况下机器 A 负责索引 0 的分片内容机器 B 负责 1机器 C 负责 2机器 D 负责 3。 注分片数建议等于机器个数的倍数。如分片四个在两台机器上那么就是每台机器分两个线程来执行任务。 如何实现上文的案例呢这里我们忽略关于数据库层面的配置主要的类与配置如下
1、这里我们新建文件对象
AllArgsConstructor
NoArgsConstructor
Data
public class FileCustom {// 唯⼀标识private Long id;//⽂件名private String name;//⽂件类型private String type;//⽂件内容private String content;// 是否已备份private Boolean backedUp false;
}2、这里我们先定义 zookeeper 的配置类
在初始化定时任务配置的时候以字符串的形式传入分片参数传入0text,1image,2radio,3vedio设置分片个数为 4并添加分片功能 shardingItemParameters(...)。
如果 分片个数 小于 分片参数则取参数中前几个。如分片取 2则只会对 0text,1image 进行处理如果 分片个数 大于 分片参数则多出的参数补 null。如分片取 5则参数字符串会变为 0text,1image,2radio,3vedio,4null
Configuration
public class TestJobConfig {Bean(initMethod init)public SpringJobScheduler fileScheduler(FileCustomElasticJob job, CoordinatorRegistryCenter registryCenter) {LiteJobConfiguration configuration createJobConfiguration(job.getClass(), 0 0/1 * * * ?, 4, 0text,1image,2radio,3vedio);return new SpringJobScheduler(job, registryCenter, configuration);}Beanpublic CoordinatorRegistryCenter registryCenter(Value(${zookeeper.url}) String url, Value(${zookeeper.groupName}) String groupName) {// ZookeeperConfiguration(zookeeper地址, 项目名)ZookeeperConfiguration configuration new ZookeeperConfiguration(url, groupName);// 设置节点超时时间configuration.setConnectionTimeoutMilliseconds(100);ZookeeperRegistryCenter center new ZookeeperRegistryCenter(configuration);center.init();return center;}/*** 定时任务配置* 这个定时任务使用的场景比较灵活因此不建议放在spring的容器当中** param clazz 定时任务的字节码* param cron cron表达式* param shardingCount 分片数量* param shardingParam 分片参数* return 定时任务配置对象*/private static LiteJobConfiguration createJobConfiguration(Class clazz, String cron, int shardingCount,String shardingParam) {// 定义作业核心配置 newBuilder(任务名称, corn表达式, 分片数量)JobCoreConfiguration.Builder jobBuilder JobCoreConfiguration.newBuilder(clazz.getSimpleName(), cron, shardingCount);if (!StringUtils.isEmpty(shardingParam)) {jobBuilder.shardingItemParameters(shardingParam); // 添加分片功能}JobCoreConfiguration simpleCoreConfig jobBuilder.build();// 定义simple类型配置JobTypeConfiguration configuration;configuration new SimpleJobConfiguration(simpleCoreConfig, clazz.getCanonicalName());// 定义Lite作业根配置并返回// 设置overwrite(true)允许覆盖cron表达式默认不允许return LiteJobConfiguration.newBuilder(configuration).overwrite(true).build();}
}3、文件对象的调度任务类
Slf4j
Component
public class FileCustomElasticJob implements SimpleJob {Autowiredprivate FileCustomMapper fileCustomMapper;Overridepublic void execute(ShardingContext shardingContext) {long threadId Thread.currentThread().getId();System.out.printf(线程ID:{}任务的名称:{}任务参数:{}分片个数L:{}分片索引号:{}分片参数:{}\n,threadId,shardingContext.getJobName(),shardingContext.getJobParameter(),shardingContext.getShardingTotalCount(),shardingContext.getShardingItem(),shardingContext.getShardingParameter());doWorkByParameter(shardingContext.getShardingParameter());}/*** 根据类型查询出所有的备份任务** param shardingParameter 线程对应处理的文件类型*/private void doWorkByParameter(String shardingParameter) {ListFileCustom fileCustoms fileCustomMapper.selectByType(shardingParameter);for (FileCustom fileCustom : fileCustoms) {backUp(fileCustom);}}/*** 模拟备份操作** param fileCustom 备份对象*/private void backUp(FileCustom fileCustom) {System.out.println(备份的方法名 fileCustom.getName() 备份的类型 fileCustom.getType());System.out.println();try {TimeUnit.SECONDS.sleep(1); // 延时一秒} catch (InterruptedException e) {throw new RuntimeException(e);}fileCustomMapper.changeState(fileCustom.getId(), 1); // 修改数据的 backedUp}
}4、这里我们模拟有两台机器即两个实例。
从运行结果我们可以看出机器 A 开启了两个线程来处理分片索引为 0 1 的分片内容机器 B 开启了两个线程来处理分片索引为 2 3 的分片内容。 因此通过对任务的合理分片化可以达到任务并行处理的效果。分片的优点如下
分片项与业务处理解耦Elastic-ob并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器开发者需要自行处理分片项与真实数据的对应关系。最大限度利用资源将分片项设置大于服务器的数据最好是大于服务器倍数的数量作业将会合理利用分布式资源动态的分配分片项。 3.5 Dataflow 类型调度任务
Dataflow 类型适用于要处理的数据量很大的情况Dataflow 类型的定时任务需要实现 Datafowjob 接口该接口提供 2 个方法供覆盖分别用于抓取 fetchData 和处理 processData 数据我们继续对例子进行改造。 Dataflow 类型用于处理数据流他和 Simplejob 不同它以数据流的方式执行调用 fetchData 抓取数据知道抓取不到数据才停止作业。
1、修改 zookeeper 配置类增加数据类型判断和逻辑
Configuration
public class TestJobConfig {Bean(initMethod init)public SpringJobScheduler fileScheduler(FileDataFlowJob job, CoordinatorRegistryCenter registryCenter) {LiteJobConfiguration configuration createJobConfiguration(job.getClass(), 0 0/1 * * * ?, 4, 0text,1image,2radio,3vedio, true);return new SpringJobScheduler(job, registryCenter, configuration);}Beanpublic CoordinatorRegistryCenter registryCenter(Value(${zookeeper.url}) String url, Value(${zookeeper.groupName}) String groupName) {// ZookeeperConfiguration(zookeeper地址, 项目名)ZookeeperConfiguration configuration new ZookeeperConfiguration(url, groupName);// 设置节点超时时间configuration.setConnectionTimeoutMilliseconds(100);ZookeeperRegistryCenter center new ZookeeperRegistryCenter(configuration);center.init();return center;}/*** 定时任务配置* 这个定时任务使用的场景比较灵活因此不建议放在spring的容器当中** param clazz 定时任务的字节码* param cron cron表达式* param shardingCount 分片数量* param shardingParam 分片参数* param isDataFlow 是否是DataFlow类型* return 定时任务配置对象*/private static LiteJobConfiguration createJobConfiguration(Class clazz, String cron, int shardingCount,String shardingParam, boolean isDataFlow) {// 定义作业核心配置 newBuilder(任务名称, corn表达式, 分片数量)JobCoreConfiguration.Builder jobBuilder JobCoreConfiguration.newBuilder(clazz.getSimpleName(), cron, shardingCount);if (!StringUtils.isEmpty(shardingParam)) {jobBuilder.shardingItemParameters(shardingParam); // 添加分片功能}JobCoreConfiguration simpleCoreConfig jobBuilder.build();// 定义simple类型配置JobTypeConfiguration configuration;if (isDataFlow) {// true 代表流处理configuration new DataflowJobConfiguration(simpleCoreConfig, clazz.getCanonicalName(), true);} else {configuration new SimpleJobConfiguration(simpleCoreConfig, clazz.getCanonicalName());}// 定义Lite作业根配置并返回// 设置overwrite(true)允许覆盖cron表达式默认不允许return LiteJobConfiguration.newBuilder(configuration).overwrite(true).build();}
}2、定义新的 DataFlow 任务调度对象
Component
public class FileDataFlowJob implements DataflowJobFileCustom {Autowiredprivate FileCustomMapper fileCustomMapper;/*** 抓取数据** param shardingContext* return*/Overridepublic ListFileCustom fetchData(ShardingContext shardingContext) {// 取决于数据能否抓取到数据有数据会继续调用该方法// 如果没数据就会停止此次定时任务执行停止// 直到下次任务调度接着抓取System.out.println(开始抓取数据...);// select * from t_file_custom where backedUp 0 and type #{type} limit #{count}ListFileCustom fileCustomList fileCustomMapper.selectLimit(shardingContext.getShardingParameter(), 2); // 查找 backedUp0 的前两条数据return fileCustomList; // 如果为null则直接返回如果不为null则调用下方方法处理数据}/*** 处理数据** param shardingContext* param list*/Overridepublic void processData(ShardingContext shardingContext, ListFileCustom list) {for (FileCustom fileCustom : list) {backUp(fileCustom);}}/*** 模拟备份操作** param fileCustom 备份对象*/private void backUp(FileCustom fileCustom) {System.out.println(备份的方法名 fileCustom.getName() 备份的类型 fileCustom.getType());System.out.println();try {TimeUnit.SECONDS.sleep(1); // 延时一秒} catch (InterruptedException e) {throw new RuntimeException(e);}fileCustomMapper.changeState(fileCustom.getId(), 1);}
}3、启动项目发现每次抓取两条数据重复执行有数据会继续调用该方法如果没数据就会停止此次定时任务执行停止。直至下次任务调度接着抓取。 文章参考Java微服务商城高并发秒杀项目实战|Spring Cloud Alibaba真实项目实战商城双11秒杀高并发消息支付分布式事物Seata