php个人网站源码,物联网应用技术就业方向及前景,wordpress oss静态,做淘宝链接网站Django Celery
一、知识要点概览表
模块知识点掌握程度要求Celery基础配置、任务定义、任务执行深入理解异步任务任务状态、结果存储、错误处理熟练应用周期任务定时任务、Crontab、任务调度熟练应用监控管理Flower、任务监控、性能优化理解应用
二、基础配置实现
1. 安装和…Django Celery
一、知识要点概览表
模块知识点掌握程度要求Celery基础配置、任务定义、任务执行深入理解异步任务任务状态、结果存储、错误处理熟练应用周期任务定时任务、Crontab、任务调度熟练应用监控管理Flower、任务监控、性能优化理解应用
二、基础配置实现
1. 安装和配置
# requirements.txt
celery5.3.1
django-celery-results2.5.1
django-celery-beat2.5.0
redis4.6.0# settings.py
INSTALLED_APPS [...django_celery_results,django_celery_beat,
]# Celery配置
CELERY_BROKER_URL redis://localhost:6379/0
CELERY_RESULT_BACKEND django-db
CELERY_ACCEPT_CONTENT [json]
CELERY_TASK_SERIALIZER json
CELERY_RESULT_SERIALIZER json
CELERY_TIMEZONE Asia/Shanghai# celery.py
import os
from celery import Celeryos.environ.setdefault(DJANGO_SETTINGS_MODULE, myproject.settings)app Celery(myproject)
app.config_from_object(django.conf:settings, namespaceCELERY)
app.autodiscover_tasks()app.task(bindTrue)
def debug_task(self):print(fRequest: {self.request!r})三、异步任务实现
1. 基本任务定义
# tasks.py
from celery import shared_task
from django.core.mail import send_mail
from .models import Usershared_task
def send_welcome_email(user_id):发送欢迎邮件try:user User.objects.get(iduser_id)send_mail(欢迎加入我们的平台,f你好 {user.username}欢迎使用我们的服务,noreplyexample.com,[user.email],fail_silentlyFalse,)return Trueexcept Exception as e:return str(e)shared_task(bindTrue, max_retries3)
def process_payment(self, order_id):处理支付任务from .models import Ordertry:order Order.objects.get(idorder_id)result process_payment_gateway(order)if result.success:order.status paidorder.save()return Payment processed successfullyelse:raise ValueError(Payment failed)except Exception as exc:self.retry(excexc, countdown60*5) # 5分钟后重试2. 任务链和组
from celery import chain, group, chord
from .tasks import process_payment, send_notification, update_inventorydef process_order(order):# 任务链按顺序执行任务task_chain chain(process_payment.s(order.id),update_inventory.s(order.id),send_notification.s(order.user.id))return task_chain()def process_bulk_orders(orders):# 任务组并行执行多个任务task_group group(process_payment.s(order.id)for order in orders)return task_group()def process_orders_with_summary(orders):# 和弦并行执行任务后执行回调def on_complete(results):successful sum(1 for r in results if r success)failed len(results) - successfulreturn f处理完成{successful}成功{failed}失败task_chord chord((process_payment.s(order.id) for order in orders),on_complete.s())return task_chord()3. 自定义任务类
from celery import Task
from django.core.cache import cacheclass BaseTaskWithRetry(Task):abstract Truemax_retries 3default_retry_delay 60 # 60秒后重试def on_failure(self, exc, task_id, args, kwargs, einfo):任务失败时的处理print(fTask {task_id} failed: {str(exc)})super().on_failure(exc, task_id, args, kwargs, einfo)def on_retry(self, exc, task_id, args, kwargs, einfo):任务重试时的处理print(fTask {task_id} retrying: {str(exc)})super().on_retry(exc, task_id, args, kwargs, einfo)shared_task(baseBaseTaskWithRetry)
def process_data(data_id):try:# 处理数据的逻辑result process_complex_data(data_id)return resultexcept Exception as exc:raise self.retry(excexc)四、周期任务实现
1. 基本周期任务
# tasks.py
from celery.schedules import crontab
from celery.task import periodic_taskperiodic_task(run_everytimedelta(hours24))
def daily_cleanup():每日清理任务cleanup_expired_tokens()cleanup_old_logs()return Daily cleanup completedperiodic_task(run_everycrontab(hour0, minute0),namegenerate_daily_report
)
def generate_daily_report():生成每日报告report Report.objects.create(datetimezone.now(),typedaily)report.generate()return fReport generated: {report.id}2. 动态周期任务
# models.py
from django_celery_beat.models import PeriodicTask, IntervalSchedule
from django.db import modelsclass ScheduledReport(models.Model):name models.CharField(max_length100)interval models.IntegerField(help_text间隔分钟)enabled models.BooleanField(defaultTrue)def save(self, *args, **kwargs):super().save(*args, **kwargs)self.update_periodic_task()def update_periodic_task(self):schedule, _ IntervalSchedule.objects.get_or_create(everyself.interval,periodIntervalSchedule.MINUTES,)PeriodicTask.objects.update_or_create(namefgenerate_report_{self.id},defaults{task: myapp.tasks.generate_report,interval: schedule,args: [self.id],enabled: self.enabled})3. 任务调度器
from django_celery_beat.models import CrontabSchedule, PeriodicTask
import jsondef schedule_report_task(name, hour, minute, day_of_week):创建定时报告任务schedule, _ CrontabSchedule.objects.get_or_create(hourhour,minuteminute,day_of_weekday_of_week,)task PeriodicTask.objects.create(crontabschedule,namefgenerate_report_{name},taskmyapp.tasks.generate_report,argsjson.dumps([name]),)return task# 使用示例
schedule_report_task(weekly_summary, hour0, minute0, day_of_week1) # 每周一五、监控和管理
1. Flower配置
# requirements.txt
flower2.0.1# settings.py
CELERY_FLOWER_USER admin
CELERY_FLOWER_PASSWORD password# 启动Flower
# celery -A myproject flower --port55552. 任务监控中间件
# middleware.py
from celery.signals import task_prerun, task_postrun
import time
import logginglogger logging.getLogger(celery.tasks)task_prerun.connect
def task_prerun_handler(task_id, task, args, kwargs, **kw):任务执行前的处理task.start_time time.time()task_postrun.connect
def task_postrun_handler(task_id, task, args, kwargs, retval, state, **kw):任务执行后的处理if hasattr(task, start_time):duration time.time() - task.start_timelogger.info(fTask {task.name}[{task_id}] fcompleted in {duration:.2f}s with state {state})六、Celery工作流程图 七、实际应用示例
1. 图片处理任务
# tasks.py
from PIL import Image
import os
from celery import shared_taskshared_task
def process_uploaded_image(image_path, sizes[(800, 600), (400, 300)]):处理上传的图片try:img Image.open(image_path)filename os.path.basename(image_path)name, ext os.path.splitext(filename)results []for size in sizes:resized img.copy()resized.thumbnail(size)new_filename f{name}_{size[0]}x{size[1]}{ext}new_path os.path.join(os.path.dirname(image_path), new_filename)resized.save(new_path)results.append(new_path)return resultsexcept Exception as e:return str(e)2. 站点监控任务
# tasks.py
import requests
from celery import shared_task
from django.core.mail import mail_admins
from .models import SiteMonitorshared_task(bindTrue, max_retries3)
def monitor_website(self, url):监控网站可用性try:response requests.get(url, timeout10)status response.status_code 200response_time response.elapsed.total_seconds()SiteMonitor.objects.create(urlurl,statusstatus,response_timeresponse_time)if not status:mail_admins(f网站{url}不可用,f状态码{response.status_code})return {url: url,status: status,response_time: response_time}except Exception as exc:self.retry(excexc, countdown60)八、最佳实践建议 任务设计原则 保持任务原子性实现幂等性合理设置超时时间添加适当的重试机制 性能优化 使用合适的序列化方式控制任务粒度合理设置并发数监控任务执行情况 错误处理 完善的异常捕获详细的日志记录合适的重试策略失败通知机制
这就是关于Django Celery的详细内容包括异步任务队列和周期任务的实现。通过实践这些内容你将能够在Django项目中熟练使用Celery处理异步任务。如果有任何问题欢迎随时提出 怎么样今天的内容还满意吗再次感谢朋友们的观看关注GZH凡人的AI工具箱回复666送您价值199的AI大礼包。最后祝您早日实现财务自由还请给个赞谢谢