宝安哪有网站建设,企业网站建设移动,江苏省华建建设股份有限公司网站,网站联盟广告名词解释目录 while True: sleep()Timeloop库threading.Timersched模块schedule模块APScheduler框架Celery框架数据流工具Apache Airflow概述Airflow 核心概念Airflow 的架构 总结以下几种方案实现定时任务#xff0c;可根据不同需求去使用不同方案。 while True: sleep()
利用whil… 目录 while True: sleep()Timeloop库threading.Timersched模块schedule模块APScheduler框架Celery框架数据流工具Apache Airflow概述Airflow 核心概念Airflow 的架构 总结以下几种方案实现定时任务可根据不同需求去使用不同方案。 while True: sleep()
利用while True的死循环加上 sleep()函数让其暂停一段时间达到每隔一段时间执行特定任务的目的。
比较简单例子如下
import datetime
import timedef time_printer():now datetime.datetime.now()ts now.strftime(%Y-%m-%d %H:%M:%S)print(do func time :, ts)def loop_monitor():while True:time_printer()time.sleep(5)if __name__ __main__:loop_monitor()主要缺点
只能设定间隔不能指定具体的时间sleep 是一个阻塞函数也就是说 sleep 这一段时间程序什么也不能操作。
Timeloop库
Timeloop是一个库可用于运行多周期任务。
Timeloop内部维护了一个任务列表jobs用来管理所有任务。
可以使用装饰器标记任务这样就会把任务加到任务列表jobs中使用start方法启动任务列表重的所有任务。
示例如下
import time
from timeloop import Timeloop
from datetime import timedeltatl Timeloop()tl.job(intervaltimedelta(seconds2))
def sample_job_every_2s():print(2s job current time : {}.format(time.ctime()))if __name__ __main__:tl.start(blockTrue)运行后打印如下
[2023-10-02 09:48:41,926] [timeloop] [INFO] Starting Timeloop..
[2023-10-02 09:48:41,926] [timeloop] [INFO] Registered job function sample_job_every_2s at 0x7fc3d022d0d0
[2023-10-02 09:48:41,926] [timeloop] [INFO] Timeloop now started. Jobs will run based on the interval set
2s job current time : Mon Oct 2 09:48:43 2023
2s job current time : Mon Oct 2 09:48:45 2023
2s job current time : Mon Oct 2 09:48:47 2023同时Timeloop还有个stop方法可以用来暂停所有任务。
threading.Timer
threading 模块中的 Timer 是一个非阻塞函数比 sleep 稍好一点timer最基本理解就是定时器我们可以启动多个定时任务这些定时器任务是异步执行所以不存在等待顺序执行问题。
主要有如下方法
方法说明Timer(interval, function, argsNone, kwargsNone)创建定时器cancel()取消定时器start()使用线程方式执行join(self, timeoutNone)主线程等待线程执行结束
示例
import datetimefrom threading import Timerdef time_printer():now datetime.datetime.now()ts now.strftime(%Y-%m-%d %H:%M:%S)print(do func time :, ts)# 注意 Timer 只能执行一次这里需要循环调用否则只能执行一次loop_monitor()def loop_monitor():t Timer(5, time_printer)t.start()if __name__ __main__:loop_monitor()sched模块
sched模块实现了一个通用事件调度器在调度器类使用一个延迟函数等待特定的时间执行任务。同时支持多线程应用程序在每个任务执行后会立刻调用延时函数以确保其他线程也能执行。
class sched.scheduler(timefunc, delayfunc)这个类定义了调度事件的通用接口它需要外部传入两个参数timefunc是一个没有参数的返回时间类型数字的函数(常用使用的如time模块里面的time)delayfunc应该是一个需要一个参数来调用、与timefunc的输出兼容、并且作用为延迟多个时间单位的函数(常用的如time模块的sleep)。
import datetime
import time
import scheddef time_printer():now datetime.datetime.now()ts now.strftime(%Y-%m-%d %H:%M:%S)print(do func time :, ts)loop_monitor()def loop_monitor():s sched.scheduler(time.time, time.sleep) # 生成调度器s.enter(5, 1, time_printer, ())s.run()if __name__ __main__:loop_monitor()scheduler对象主要方法:
enter(delay, priority, action, argument)安排一个事件来延迟delay个时间单位。 cancel(event)从队列中删除事件。如果事件不是当前队列中的事件则该方法将跑出一个ValueError。 run()运行所有预定的事件。这个函数将等待(使用传递给构造函数的delayfunc()函数)然后执行事件直到不再有预定的事件。 比threading.Timer更好不需要循环调用。
schedule模块
schedule是一个第三方轻量级的任务调度模块可以按照秒分小时日期或者自定义事件执行时间。schedule允许用户使用简单、人性化的语法以预定的时间间隔定期运行Python函数(或其它可调用函数)。
示例
import schedule
import timedef job():print(Im working...)
schedule.every(10).seconds.do(job)
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at(10:30).do(job)
schedule.every(5).to(10).minutes.do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at(13:15).do(job)
schedule.every().minute.at(:17).do(job)while True:schedule.run_pending()time.sleep(1)也可以通过 repeat() 装饰静态方法
import time
from schedule import every, repeat, run_pendingrepeat(every().second)
def job():print(working...)while True:run_pending()time.sleep(1)传递参数
import scheduledef greet(name):print(Hello, name)
schedule.every(2).seconds.do(greet, nameAlice)
schedule.every(4).seconds.do(greet, nameBob)while True:schedule.run_pending()装饰器同样能传递参数
from schedule import every, repeat, run_pendingrepeat(every().second, World)
repeat(every().minute, Mars)
def hello(planet):print(Hello, planet)while True:run_pending()取消任务
import schedulei 0
def some_task():global ii 1print(i)if i 10:schedule.cancel_job(job)print(cancel job)exit(0)
job schedule.every().second.do(some_task)while True:schedule.run_pending()运行一次任务
import time
import scheduledef job_that_executes_once():print(Hello)return schedule.CancelJobschedule.every().minute.at(:34).do(job_that_executes_once)
while True:schedule.run_pending()time.sleep(1)根据标签检索任务
# 检索所有任务schedule.get_jobs()
import scheduledef greet(name):print(Hello {}.format(name))schedule.every().day.do(greet, Andrea).tag(daily-tasks, friend)
schedule.every().hour.do(greet, John).tag(hourly-tasks, friend)
schedule.every().hour.do(greet, Monica).tag(hourly-tasks, customer)
schedule.every().day.do(greet, Derek).tag(daily-tasks, guest)friends schedule.get_jobs(friend)
print(friends)根据标签取消任务
# 取消所有任务schedule.clear()
import scheduledef greet(name):print(Hello {}.format(name))if name Cancel:schedule.clear(second-tasks)print(cancel second-tasks)schedule.every().second.do(greet, Andrea).tag(second-tasks, friend)
schedule.every().second.do(greet, John).tag(second-tasks, friend)
schedule.every().hour.do(greet, Monica).tag(hourly-tasks, customer)
schedule.every(5).seconds.do(greet, Cancel).tag(daily-tasks, guest)while True:schedule.run_pending()运行任务到某时间
import schedule
from datetime import datetime, timedelta, timedef job():print(working...)schedule.every().second.until(23:59).do(job) # 今天23:59停止
schedule.every().second.until(2030-01-01 18:30).do(job) # 2030-01-01 18:30停止
schedule.every().second.until(timedelta(hours8)).do(job) # 8小时后停止
schedule.every().second.until(time(23, 59, 59)).do(job) # 今天23:59:59停止
schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job) # 2030-01-01 18:30停止while True:schedule.run_pending()马上运行所有任务主要用于测试
import scheduledef job():print(working...)def job1():print(Hello...)schedule.every().monday.at(12:40).do(job)
schedule.every().tuesday.at(16:40).do(job1)
schedule.run_all()
schedule.run_all(delay_seconds3) # 任务间延迟3秒并行运行使用 Python 内置队列实现
import threading
import time
import scheduledef job1():print(Im running on thread %s % threading.current_thread())def job2():print(Im running on thread %s % threading.current_thread())def job3():print(Im running on thread %s % threading.current_thread())def run_threaded(job_func):job_thread threading.Thread(targetjob_func)job_thread.start()schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)while True:schedule.run_pending()time.sleep(1)APScheduler框架
APScheduleradvanceded python scheduler基于Quartz的一个Python定时任务框架实现了Quartz的所有功能使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务并且可以持久化任务。基于这些功能我们可以很方便的实现一个Python定时任务系统。
具体使用参考文章APScheduler框架使用
Celery框架
Celery是一个简单灵活可靠的分布式系统用于处理大量消息同时为操作提供维护此类系统所需的工具, 也可用于任务调度。Celery 的配置比较麻烦如果你只是需要一个轻量级的调度工具Celery 不会是一个好选择。
Celery 是一个强大的分布式任务队列它可以让任务的执行完全脱离主程序甚至可以被分配到其他主机上运行。通常使用它来实现异步任务async task和定时任务crontab。异步任务比如是发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作 定时任务是需要在特定时间执行的任务。
具体使用参考
Celery使用优秀的python异步任务框架 Django21使用Celery任务框架
数据流工具Apache Airflow
概述
Apache Airflow 是Airbnb开源的一款数据流程工具目前是Apache孵化项目。以非常灵活的方式来支持数据的ETL过程同时还支持非常多的插件来完成诸如HDFS监控、邮件通知等功能。Airflow支持单机和分布式两种模式支持Master-Slave模式支持Mesos等资源调度有非常好的扩展性。被大量公司采用。
Airflow使用Python开发它通过DAGs(Directed Acyclic Graph, 有向无环图)来表达一个工作流中所要执行的任务以及任务之间的关系和依赖。比如如下的工作流中任务T1执行完成T2和T3才能开始执行T2和T3都执行完成T4才能开始执行。 Airflow提供了各种Operator实现可以完成各种任务实现
BashOperator – 执行 bash 命令或脚本。SSHOperator – 执行远程 bash 命令或脚本原理同 paramiko 模块。PythonOperator – 执行 Python 函数。EmailOperator – 发送 Email。HTTPOperator – 发送一个 HTTP 请求。MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等执行 SQL 任务。DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator…
除了以上这些 Operators 还可以方便的自定义 Operators 满足个性化的任务需求。
一些情况下我们需要根据执行结果执行不同的任务这样工作流会产生分支。如: 这种需求可以使用BranchPythonOperator来实现。
Airflow 核心概念
DAGs即有向无环图(Directed Acyclic Graph)将所有需要运行的tasks按照依赖关系组织起来描述的是所有tasks执行顺序。Operators可以简单理解为一个class描述了DAG中某个的task具体要做的事。其中airflow内置了很多operators如BashOperator 执行一个bash 命令PythonOperator 调用任意的Python 函数EmailOperator 用于发送邮件HTTPOperator 用于发送HTTP请求 SqlOperator 用于执行SQL命令等等同时用户可以自定义Operator这给用户提供了极大的便利性。TasksTask 是 Operator的一个实例也就是DAGs中的一个node。Task Instancetask的一次运行。Web 界面中可以看到task instance 有自己的状态包括”running”, “success”, “failed”, “skipped”, “up for retry”等。Task RelationshipsDAGs中的不同Tasks之间可以有依赖关系如 Task1 Task2表明Task2依赖于Task2了。通过将DAGs和Operators结合起来用户就可以创建各种复杂的 工作流workflow。
Airflow 的架构
在一个可扩展的生产环境中Airflow 含有以下组件
元数据库这个数据库存储有关任务状态的信息。调度器Scheduler 是一种使用 DAG 定义结合元数据中的任务状态来决定哪些任务需要被执行以及任务执行优先级的过程。调度器通常作为服务运行。执行器Executor 是一个消息队列进程它被绑定到调度器中用于确定实际执行每个任务计划的工作进程。有不同类型的执行器每个执行器都使用一个指定工作进程的类来执行任务。例如LocalExecutor 使用与调度器进程在同一台机器上运行的并行进程执行任务。其他像 CeleryExecutor 的执行器使用存在于独立的工作机器集群中的工作进程执行任务。Workers这些是实际执行任务逻辑的进程由正在使用的执行器确定。 Worker的具体实现由配置文件中的executor来指定airflow支持多种Executor:
SequentialExecutor: 单进程顺序执行一般只用来测试LocalExecutor: 本地多进程执行CeleryExecutor: 使用Celery进行分布式任务调度DaskExecutor使用Dask进行分布式任务调度KubernetesExecutor: 1.10.0新增, 创建临时POD执行每次任务
生产环境一般使用CeleryExecutor和KubernetesExecutor。
使用CeleryExecutor的架构如图: 使用KubernetesExecutor的架构如图: 参考
https://mp.weixin.qq.com/s/dzA9xGoho50WK_-80hzelg
https://zhuanlan.zhihu.com/p/448847300