郑州网站建设学习,Php做网站创业,福州婚庆网站建设哪家好,宁国网站建设|网站建设报价 - 新支点网站建设Spring Boot定时任务原理
在现代应用中#xff0c;定时任务的调度是实现周期性操作的关键机制。Spring Boot 提供了强大的定时任务支持#xff0c;通过注解驱动的方式#xff0c;开发者可以轻松地为方法添加定时任务功能。本文将深入探讨 Spring Boot 中定时任务的实现原理…Spring Boot定时任务原理
在现代应用中定时任务的调度是实现周期性操作的关键机制。Spring Boot 提供了强大的定时任务支持通过注解驱动的方式开发者可以轻松地为方法添加定时任务功能。本文将深入探讨 Spring Boot 中定时任务的实现原理重点分析 EnableScheduling 和 ScheduledAnnotationBeanPostProcessor 的作用以及任务如何被注册和执行。我们还将详细介绍底层使用的线程池调度器 ThreadPoolTaskScheduler 和 Java 内置的 ScheduledThreadPoolExecutor它们如何协同工作保证定时任务的准确执行。此外我们还将探讨任务调度的线程阻塞与唤醒机制深入剖析延迟队列DelayedWorkQueue如何有效管理任务的执行顺序。通过本文的学习你将能够更好地理解和应用 Spring Boot 定时任务提升应用的调度能力和性能。
1.注解驱动
Spring Boot通过EnableScheduling激活定时任务支持,而EnableScheduling注解导入了SchedulingConfiguration,这个类创建了一个名为ScheduledAnnotationBeanPostProcessor 的bean,而这个bean就是定时任务的关键
/*** {code Configuration} class that registers a {link ScheduledAnnotationBeanPostProcessor}* bean capable of processing Springs {link Scheduled} annotation.** pThis configuration class is automatically imported when using the* {link EnableScheduling EnableScheduling} annotation. See* {code EnableScheduling}s javadoc for complete usage details.** author Chris Beams* since 3.1* see EnableScheduling* see ScheduledAnnotationBeanPostProcessor*/
Configuration(proxyBeanMethods false)
Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {Bean(name TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)Role(BeanDefinition.ROLE_INFRASTRUCTURE)public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {return new ScheduledAnnotationBeanPostProcessor();}}2.对ScheduledAnnotationBeanPostProcessor的分析
1. 类职责
核心作用扫描 Spring Bean 中的 Scheduled 注解方法将其转换为定时任务并注册到任务调度器。
2. 定时任务注册的关键流程
代码都是经过简化的代码,实际上我去看Spring的源码,发现代码都很长,但是整体意思是差不多的
Bean 初始化后扫描注解关键方法postProcessAfterInitialization
Override
public Object postProcessAfterInitialization(Object bean, String beanName) {// 1. 跳过 AOP 基础设施类if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||bean instanceof ScheduledExecutorService) {// Ignore AOP infrastructure such as scoped proxies.return bean;}// 2. 检查类是否包含 Scheduled 注解Class? targetClass AopProxyUtils.ultimateTargetClass(bean);if (!nonAnnotatedClasses.contains(targetClass) AnnotationUtils.isCandidateClass(targetClass, List.of(Scheduled.class, Schedules.class))) {// 3. 反射查找所有带 Scheduled 的方法MapMethod, SetScheduled annotatedMethods MethodIntrospector.selectMethods(targetClass, method - AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class));// 4. 处理每个带注解的方法annotatedMethods.forEach((method, scheduledAnnotations) - scheduledAnnotations.forEach(scheduled - processScheduled(scheduled, method, bean)));}return bean;
}跳过无关 Bean如 AOP 代理类、TaskScheduler 本身。反射扫描方法通过 MethodIntrospector 查找所有带有 Scheduled 的方法。注解聚合支持 Schedules 多注解合并。
解析任务参数并注册关键方法processScheduled
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {// 1. 创建 Runnable 任务Runnable runnable createRunnable(bean, method);// 2. 解析时间参数cron/fixedDelay/fixedRateif (StringUtils.hasText(cron)) {// 处理 cron 表达式CronTask task new CronTask(runnable, new CronTrigger(cron, timeZone));tasks.add(registrar.scheduleCronTask(task));} else if (fixedDelay 0) {// 处理 fixedDelayFixedDelayTask task new FixedDelayTask(runnable, fixedDelay, initialDelay);tasks.add(registrar.scheduleFixedDelayTask(task));} else if (fixedRate 0) {// 处理 fixedRateFixedRateTask task new FixedRateTask(runnable, fixedRate, initialDelay);tasks.add(registrar.scheduleFixedRateTask(task));}// 3. 注册任务到 ScheduledTaskRegistrarsynchronized (scheduledTasks) {scheduledTasks.computeIfAbsent(bean, key - new LinkedHashSet()).addAll(tasks);}
}任务封装将方法封装为 ScheduledMethodRunnable。时间参数解析 支持 cron、fixedDelay、fixedRate 三种模式。处理 initialDelay 初始延迟。使用 embeddedValueResolver 解析占位符如 ${task.interval}。 任务注册最终任务被添加到 ScheduledTaskRegistrar。
启动任务调度关键方法finishRegistration
private void finishRegistration() {// 1. 配置 TaskScheduler优先级显式设置 查找 Bean 默认单线程if (registrar.getScheduler() null) {TaskScheduler scheduler resolveSchedulerBean(beanFactory, TaskScheduler.class, false);registrar.setTaskScheduler(scheduler);}// 2. 调用 SchedulingConfigurer 自定义配置扩展点ListSchedulingConfigurer configurers beanFactory.getBeansOfType(SchedulingConfigurer.class);configurers.forEach(configurer - configurer.configureTasks(registrar));// 3. 启动所有注册的任务registrar.afterPropertiesSet();
}调度器解析 默认查找名为 taskScheduler 的 Bean。若无则创建单线程调度器Executors.newSingleThreadScheduledExecutor()。 扩展点允许通过 SchedulingConfigurer 自定义任务注册逻辑。最终启动调用 afterPropertiesSet() 触发任务调度。
3.ThreadPoolTaskScheduler的剖析
ThreadPoolTaskScheduler 是 Spring 对 Java ScheduledThreadPoolExecutor 的封装是 Scheduled 定时任务的底层执行引擎。
继承关系继承 ExecutorConfigurationSupport实现 TaskScheduler 接口整合了线程池管理与定时任务调度。底层依赖基于 ScheduledThreadPoolExecutor支持 周期性任务fixedRate/fixedDelay和 动态触发任务如 cron 表达式。
线程池初始化关键方法initializeExecutor
同样,这里和以后的部分也都是伪代码
Override
protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {// 创建 ScheduledThreadPoolExecutorthis.scheduledExecutor createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler);// 配置线程池策略如取消后立即移除任务if (this.scheduledExecutor instanceof ScheduledThreadPoolExecutor scheduledPoolExecutor) {scheduledPoolExecutor.setRemoveOnCancelPolicy(this.removeOnCancelPolicy);// 其他策略设置...}return this.scheduledExecutor;
}这部分是我复制源码的,可以清晰的看到,底层就是new了ScheduledThreadPoolExecutor protected ScheduledExecutorService createExecutor(int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler);}4.ScheduledThreadPoolExecutor的原理分析
核心成员
任务队列使用 DelayedWorkQueue内部实现的小顶堆按任务执行时间排序。线程池复用 ThreadPoolExecutor 的线程管理机制支持核心线程数和最大线程数配置。
2. 定时任务调度机制
所有定时任务被封装为 ScheduledFutureTask 对象其核心逻辑如下
private class ScheduledFutureTaskV extends FutureTaskV implements RunnableScheduledFutureV {private long time; // 下一次执行时间纳秒private final long period; // 周期正数fixedRate负数fixedDelayprivate int heapIndex; // 在 DelayedWorkQueue 中的索引public void run() {if (isPeriodic()) {// 周期性任务重新计算下一次执行时间并重新加入队列setNextRunTime();reExecutePeriodic(outerTask);} else {// 一次性任务直接执行super.run();}}
}任务提交通过 schedule、scheduleAtFixedRate 等方法提交任务。队列管理任务被封装为 ScheduledFutureTask 并加入 DelayedWorkQueue。线程唤醒工作线程 (Worker) 从队列获取任务若任务未到执行时间线程进入限时等待available.awaitNanos(delay)。任务执行到达执行时间后线程执行任务 固定速率fixedRate执行完成后根据 period 计算下一次执行时间time period。固定延迟fixedDelay执行完成后根据当前时间计算下一次执行时间time now() (-period)。 重新入队周期性任务执行后重新加入队列等待下次调度。
3.DelayedWorkQueue的简单剖析
DelayQueue队列是一个延迟队列DelayQueue中存放的元素必须实现Delayed接口的元素实现接口后相当于是每个元素都有个过期时间当队列进行take获取元素时先要判断元素有没有过期只有过期的元素才能出队操作没有过期的队列需要等待剩余过期时间才能进行出队操作。
DelayQueue队列内部使用了PriorityQueue优先队列来进行存放数据它采用的是二叉堆进行的优先队列使用ReentrantLock锁来控制线程同步由于内部元素是采用的PriorityQueue来进行存放数据所以Delayed接口实现了Comparable接口用于比较来控制优先级
线程阻塞与唤醒逻辑
(1) 取任务时的阻塞take() 方法
当线程调用 take() 方法从队列中获取任务时若队列为空或队头任务未到期线程会进入阻塞状态
public E take() throws InterruptedException {final ReentrantLock lock this.lock;lock.lockInterruptibly();try {for (;;) {E first q.peek();if (first null) {available.await(); // 队列为空时无限等待} else {long delay first.getDelay(NANOSECONDS);if (delay 0) return q.poll(); // 任务已到期取出执行if (leader ! null) {available.await(); // 其他线程已为队头任务等待本线程无限等待} else {Thread thisThread Thread.currentThread();leader thisThread; // 标记当前线程为“领导者”try {available.awaitNanos(delay); // 限时等待到期时间} finally {if (leader thisThread) leader null;}}}}} finally {if (leader null q.peek() ! null) available.signal();lock.unlock();}
}关键逻辑 leader 线程优化避免多个线程同时等待同一任务到期仅一个线程leader限时等待其他线程无限等待限时等待通过 available.awaitNanos(delay) 阻塞到任务到期时间。
(2) 插入新任务时的唤醒offer() 方法
当新任务被插入队列时若新任务成为队头即最早到期会触发唤醒逻辑
public boolean offer(E e) {final ReentrantLock lock this.lock;lock.lock();try {q.offer(e); // 插入任务并调整堆结构if (q.peek() e) { // 新任务成为队头leader null;available.signal(); // 唤醒等待线程}return true;} finally {lock.unlock();}
}唤醒条件 插入的任务成为新的队头即其到期时间最早。调用available.signal()唤醒等待的线程leader)或其他线程 (3) 唤醒机制总结
何时唤醒 超时唤醒等待线程因任务到期而被 JVM 自动唤醒。插入新任务唤醒新任务的到期时间早于当前队头任务时插入线程会触发唤醒。 唤醒对象 若存在 leader 线程正在限时等待队头任务优先唤醒它。若无 leader唤醒任意一个等待线程