当前位置: 首页 > news >正文

宝安哪有网站建设企业网站建设移动

宝安哪有网站建设,企业网站建设移动,江苏省华建建设股份有限公司网站,网站联盟广告名词解释目录 while True: sleep()Timeloop库threading.Timersched模块schedule模块APScheduler框架Celery框架数据流工具Apache Airflow概述Airflow 核心概念Airflow 的架构 总结以下几种方案实现定时任务#xff0c;可根据不同需求去使用不同方案。 while True: sleep() 利用whil… 目录 while True: sleep()Timeloop库threading.Timersched模块schedule模块APScheduler框架Celery框架数据流工具Apache Airflow概述Airflow 核心概念Airflow 的架构 总结以下几种方案实现定时任务可根据不同需求去使用不同方案。 while True: sleep() 利用while True的死循环加上 sleep()函数让其暂停一段时间达到每隔一段时间执行特定任务的目的。 比较简单例子如下 import datetime import timedef time_printer():now datetime.datetime.now()ts now.strftime(%Y-%m-%d %H:%M:%S)print(do func time :, ts)def loop_monitor():while True:time_printer()time.sleep(5)if __name__ __main__:loop_monitor()主要缺点 只能设定间隔不能指定具体的时间sleep 是一个阻塞函数也就是说 sleep 这一段时间程序什么也不能操作。 Timeloop库 Timeloop是一个库可用于运行多周期任务。 Timeloop内部维护了一个任务列表jobs用来管理所有任务。 可以使用装饰器标记任务这样就会把任务加到任务列表jobs中使用start方法启动任务列表重的所有任务。 示例如下 import time from timeloop import Timeloop from datetime import timedeltatl Timeloop()tl.job(intervaltimedelta(seconds2)) def sample_job_every_2s():print(2s job current time : {}.format(time.ctime()))if __name__ __main__:tl.start(blockTrue)运行后打印如下 [2023-10-02 09:48:41,926] [timeloop] [INFO] Starting Timeloop.. [2023-10-02 09:48:41,926] [timeloop] [INFO] Registered job function sample_job_every_2s at 0x7fc3d022d0d0 [2023-10-02 09:48:41,926] [timeloop] [INFO] Timeloop now started. Jobs will run based on the interval set 2s job current time : Mon Oct 2 09:48:43 2023 2s job current time : Mon Oct 2 09:48:45 2023 2s job current time : Mon Oct 2 09:48:47 2023同时Timeloop还有个stop方法可以用来暂停所有任务。 threading.Timer threading 模块中的 Timer 是一个非阻塞函数比 sleep 稍好一点timer最基本理解就是定时器我们可以启动多个定时任务这些定时器任务是异步执行所以不存在等待顺序执行问题。 主要有如下方法 方法说明Timer(interval, function, argsNone, kwargsNone)创建定时器cancel()取消定时器start()使用线程方式执行join(self, timeoutNone)主线程等待线程执行结束 示例 import datetimefrom threading import Timerdef time_printer():now datetime.datetime.now()ts now.strftime(%Y-%m-%d %H:%M:%S)print(do func time :, ts)# 注意 Timer 只能执行一次这里需要循环调用否则只能执行一次loop_monitor()def loop_monitor():t Timer(5, time_printer)t.start()if __name__ __main__:loop_monitor()sched模块 sched模块实现了一个通用事件调度器在调度器类使用一个延迟函数等待特定的时间执行任务。同时支持多线程应用程序在每个任务执行后会立刻调用延时函数以确保其他线程也能执行。 class sched.scheduler(timefunc, delayfunc)这个类定义了调度事件的通用接口它需要外部传入两个参数timefunc是一个没有参数的返回时间类型数字的函数(常用使用的如time模块里面的time)delayfunc应该是一个需要一个参数来调用、与timefunc的输出兼容、并且作用为延迟多个时间单位的函数(常用的如time模块的sleep)。 import datetime import time import scheddef time_printer():now datetime.datetime.now()ts now.strftime(%Y-%m-%d %H:%M:%S)print(do func time :, ts)loop_monitor()def loop_monitor():s sched.scheduler(time.time, time.sleep) # 生成调度器s.enter(5, 1, time_printer, ())s.run()if __name__ __main__:loop_monitor()scheduler对象主要方法: enter(delay, priority, action, argument)安排一个事件来延迟delay个时间单位。 cancel(event)从队列中删除事件。如果事件不是当前队列中的事件则该方法将跑出一个ValueError。 run()运行所有预定的事件。这个函数将等待(使用传递给构造函数的delayfunc()函数)然后执行事件直到不再有预定的事件。 比threading.Timer更好不需要循环调用。 schedule模块 schedule是一个第三方轻量级的任务调度模块可以按照秒分小时日期或者自定义事件执行时间。schedule允许用户使用简单、人性化的语法以预定的时间间隔定期运行Python函数(或其它可调用函数)。 示例 import schedule import timedef job():print(Im working...) schedule.every(10).seconds.do(job) schedule.every(10).minutes.do(job) schedule.every().hour.do(job) schedule.every().day.at(10:30).do(job) schedule.every(5).to(10).minutes.do(job) schedule.every().monday.do(job) schedule.every().wednesday.at(13:15).do(job) schedule.every().minute.at(:17).do(job)while True:schedule.run_pending()time.sleep(1)也可以通过 repeat() 装饰静态方法 import time from schedule import every, repeat, run_pendingrepeat(every().second) def job():print(working...)while True:run_pending()time.sleep(1)传递参数 import scheduledef greet(name):print(Hello, name) schedule.every(2).seconds.do(greet, nameAlice) schedule.every(4).seconds.do(greet, nameBob)while True:schedule.run_pending()装饰器同样能传递参数 from schedule import every, repeat, run_pendingrepeat(every().second, World) repeat(every().minute, Mars) def hello(planet):print(Hello, planet)while True:run_pending()取消任务 import schedulei 0 def some_task():global ii 1print(i)if i 10:schedule.cancel_job(job)print(cancel job)exit(0) job schedule.every().second.do(some_task)while True:schedule.run_pending()运行一次任务 import time import scheduledef job_that_executes_once():print(Hello)return schedule.CancelJobschedule.every().minute.at(:34).do(job_that_executes_once) while True:schedule.run_pending()time.sleep(1)根据标签检索任务 # 检索所有任务schedule.get_jobs() import scheduledef greet(name):print(Hello {}.format(name))schedule.every().day.do(greet, Andrea).tag(daily-tasks, friend) schedule.every().hour.do(greet, John).tag(hourly-tasks, friend) schedule.every().hour.do(greet, Monica).tag(hourly-tasks, customer) schedule.every().day.do(greet, Derek).tag(daily-tasks, guest)friends schedule.get_jobs(friend) print(friends)根据标签取消任务 # 取消所有任务schedule.clear() import scheduledef greet(name):print(Hello {}.format(name))if name Cancel:schedule.clear(second-tasks)print(cancel second-tasks)schedule.every().second.do(greet, Andrea).tag(second-tasks, friend) schedule.every().second.do(greet, John).tag(second-tasks, friend) schedule.every().hour.do(greet, Monica).tag(hourly-tasks, customer) schedule.every(5).seconds.do(greet, Cancel).tag(daily-tasks, guest)while True:schedule.run_pending()运行任务到某时间 import schedule from datetime import datetime, timedelta, timedef job():print(working...)schedule.every().second.until(23:59).do(job) # 今天23:59停止 schedule.every().second.until(2030-01-01 18:30).do(job) # 2030-01-01 18:30停止 schedule.every().second.until(timedelta(hours8)).do(job) # 8小时后停止 schedule.every().second.until(time(23, 59, 59)).do(job) # 今天23:59:59停止 schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job) # 2030-01-01 18:30停止while True:schedule.run_pending()马上运行所有任务主要用于测试 import scheduledef job():print(working...)def job1():print(Hello...)schedule.every().monday.at(12:40).do(job) schedule.every().tuesday.at(16:40).do(job1) schedule.run_all() schedule.run_all(delay_seconds3) # 任务间延迟3秒并行运行使用 Python 内置队列实现 import threading import time import scheduledef job1():print(Im running on thread %s % threading.current_thread())def job2():print(Im running on thread %s % threading.current_thread())def job3():print(Im running on thread %s % threading.current_thread())def run_threaded(job_func):job_thread threading.Thread(targetjob_func)job_thread.start()schedule.every(10).seconds.do(run_threaded, job1) schedule.every(10).seconds.do(run_threaded, job2) schedule.every(10).seconds.do(run_threaded, job3)while True:schedule.run_pending()time.sleep(1)APScheduler框架 APScheduleradvanceded python scheduler基于Quartz的一个Python定时任务框架实现了Quartz的所有功能使用起来十分方便。提供了基于日期、固定时间间隔以及crontab类型的任务并且可以持久化任务。基于这些功能我们可以很方便的实现一个Python定时任务系统。 具体使用参考文章APScheduler框架使用 Celery框架 Celery是一个简单灵活可靠的分布式系统用于处理大量消息同时为操作提供维护此类系统所需的工具, 也可用于任务调度。Celery 的配置比较麻烦如果你只是需要一个轻量级的调度工具Celery 不会是一个好选择。 Celery 是一个强大的分布式任务队列它可以让任务的执行完全脱离主程序甚至可以被分配到其他主机上运行。通常使用它来实现异步任务async task和定时任务crontab。异步任务比如是发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作 定时任务是需要在特定时间执行的任务。 具体使用参考 Celery使用优秀的python异步任务框架 Django21使用Celery任务框架 数据流工具Apache Airflow 概述 Apache Airflow 是Airbnb开源的一款数据流程工具目前是Apache孵化项目。以非常灵活的方式来支持数据的ETL过程同时还支持非常多的插件来完成诸如HDFS监控、邮件通知等功能。Airflow支持单机和分布式两种模式支持Master-Slave模式支持Mesos等资源调度有非常好的扩展性。被大量公司采用。 Airflow使用Python开发它通过DAGs(Directed Acyclic Graph, 有向无环图)来表达一个工作流中所要执行的任务以及任务之间的关系和依赖。比如如下的工作流中任务T1执行完成T2和T3才能开始执行T2和T3都执行完成T4才能开始执行。 Airflow提供了各种Operator实现可以完成各种任务实现 BashOperator – 执行 bash 命令或脚本。SSHOperator – 执行远程 bash 命令或脚本原理同 paramiko 模块。PythonOperator – 执行 Python 函数。EmailOperator – 发送 Email。HTTPOperator – 发送一个 HTTP 请求。MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等执行 SQL 任务。DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator… 除了以上这些 Operators 还可以方便的自定义 Operators 满足个性化的任务需求。 一些情况下我们需要根据执行结果执行不同的任务这样工作流会产生分支。如: 这种需求可以使用BranchPythonOperator来实现。 Airflow 核心概念 DAGs即有向无环图(Directed Acyclic Graph)将所有需要运行的tasks按照依赖关系组织起来描述的是所有tasks执行顺序。Operators可以简单理解为一个class描述了DAG中某个的task具体要做的事。其中airflow内置了很多operators如BashOperator 执行一个bash 命令PythonOperator 调用任意的Python 函数EmailOperator 用于发送邮件HTTPOperator 用于发送HTTP请求 SqlOperator 用于执行SQL命令等等同时用户可以自定义Operator这给用户提供了极大的便利性。TasksTask 是 Operator的一个实例也就是DAGs中的一个node。Task Instancetask的一次运行。Web 界面中可以看到task instance 有自己的状态包括”running”, “success”, “failed”, “skipped”, “up for retry”等。Task RelationshipsDAGs中的不同Tasks之间可以有依赖关系如 Task1 Task2表明Task2依赖于Task2了。通过将DAGs和Operators结合起来用户就可以创建各种复杂的 工作流workflow。 Airflow 的架构 在一个可扩展的生产环境中Airflow 含有以下组件 元数据库这个数据库存储有关任务状态的信息。调度器Scheduler 是一种使用 DAG 定义结合元数据中的任务状态来决定哪些任务需要被执行以及任务执行优先级的过程。调度器通常作为服务运行。执行器Executor 是一个消息队列进程它被绑定到调度器中用于确定实际执行每个任务计划的工作进程。有不同类型的执行器每个执行器都使用一个指定工作进程的类来执行任务。例如LocalExecutor 使用与调度器进程在同一台机器上运行的并行进程执行任务。其他像 CeleryExecutor 的执行器使用存在于独立的工作机器集群中的工作进程执行任务。Workers这些是实际执行任务逻辑的进程由正在使用的执行器确定。 Worker的具体实现由配置文件中的executor来指定airflow支持多种Executor: SequentialExecutor: 单进程顺序执行一般只用来测试LocalExecutor: 本地多进程执行CeleryExecutor: 使用Celery进行分布式任务调度DaskExecutor使用Dask进行分布式任务调度KubernetesExecutor: 1.10.0新增, 创建临时POD执行每次任务 生产环境一般使用CeleryExecutor和KubernetesExecutor。 使用CeleryExecutor的架构如图: 使用KubernetesExecutor的架构如图: 参考 https://mp.weixin.qq.com/s/dzA9xGoho50WK_-80hzelg https://zhuanlan.zhihu.com/p/448847300
http://www.w-s-a.com/news/129781/

相关文章:

  • wordpress网站是什么类似wordpress博客
  • 国际网站空间昆明做网站开发维护的公司
  • 建网站选号域名网站优化大赛
  • 师范街网站建设广告制作公司口号
  • 电子商务网站开发设计报告为什么wordpress主题中字体不统一
  • 百度站长快速收录网站建设完工确认书
  • 企业网站备案代理商建设工程施工合同2013
  • 要学做网站wordpress xss漏洞
  • 白云品牌型网站建设在网上做国际快递淘宝网站
  • 无锡网站建设方式推广软件赚钱的app
  • 如何控制一个网站软件开发wordpress教育插件
  • 网站开发属于软件开发类吗wordpress邮件失败
  • 凡科网站怎么设计win8网站模板
  • 深圳整站seo个人网站建设一般流程
  • 济南网站中企动力wordpress主题ripro
  • 淮北网站建设求职简历怎么做点击图片进网站
  • 自适应网站推广注册公司流程和费用公司注册
  • 电子商务网站建设预算表网站建设卩金手指科杰
  • 广西响应式网站哪家好产品网络推广怎样做
  • 移动网可以上的网站是什么样子的淘宝优惠券网站开发
  • wordpress php设置伊宁seo网站建设
  • 兰陵住房建设局网站wordpress中文标题
  • 福州搜索优化网站个人网页网站制作模板
  • 网站开发分哪几个步骤使用wordpress开发一个页面跳转
  • 网站制作后还能更改么wordpress 近期文章 代码
  • 做一个小网站需要多少钱wordpress集成paypal
  • 加强网站建设管理 及时更新自己设计装修的app
  • 集团网站设计案例网页制作网站开发
  • 怎么优化网站的单个关键词排名惠州品牌网站建设
  • 上海跨境电商网站制作wordpress弃用react