django(windows系统)使用celery框架
·
1、celery是一个框架,用于实现异步任务、定时任务、延迟任务。
2、celery是一个独立的框架,需要在命令行去启动它。
3、这里我使用redis数据库来保存任务和任务结果
安装:windows系统下,需要借助eventlet来启动
pip install celery
pip install eventlet
使用:

1、创建一个包,celery_task
2、包内必须要有celery.py文件,文件内容:
from celery import Celery
#加载django环境
import os
import django
django.setup()
os.environ.setdefault("DJANGO_SETTINGS_MODULE","luffyapi.settings.dev")
broker = 'redis://192.168.137.10/15'#broker任务队列
backend = 'redis://192.168.137.10/14'#结构存储,执行完的结果存在这里
app = Celery(__name__,broker=broker,backend=backend,include=['celery_task.task1','celery_task.task2'])
#定时任务,写到celery文件
#时区设置成东八区
app.conf.timezone='Asia/Shanghai'
#禁用格林威治时区
app.conf.enable_utc=False
#任务定时配置
from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'low-task':{
#指定要执行的任务
'task':'celery_task.task1.add',
#每10秒执行一次
'schedule':timedelta(seconds=3),
#每周一的7:30执行
# 'schedule':crontab(hour=7,minute=30,day_of_week=1),
'args':(300,150) ,#任务需要传递的参数
}
} #配置定时任务后,需要启动beat ,这个beat就是代替我们去手动点击运行,到时间就点击运行,执行定时任务
#另起terminal,切换到celery_task的上级目录执行: celery -A celery_task beat -l INFO
其中app=celery()
第一个参数是名字,
第四个参数include=[] 里面存放的一个个任务的py模块名
3、任务py模块
task1.py文件下:
from .celery import app
@app.task
def add(x,y):
#"luffyapi.settings.dev" 项目同名目录下settings文件夹下的dev.py配置文件,没有更改目录结构就去掉.dev
print(x+y)
return x+y
task2.py文件下:
from .celery import app
@app.task
def mutile(x,y):
print(x,y)
return x*y
4、启动celery
terminal下:cd 到 celery_task 目录的上一级目录下执行:
celery -A celery_task worker -l INFO -P eventlet
运行结果如下:说明任务已经启动

5、使用:

在celery_task包同级目录下创建一个py文件用来进行测试。
t_task.py文件中的内容:
from celery_task.task1 import add from celery_task.task2 import mutile #立即执行任务 ret1 = add.delay(10,12) ret2 = mutile.delay(2,3) print(ret1) print(ret2)
#执行延迟任务 from datetime import datetime,timedelta #必须是utc时间,timedelta是时间对象,10秒后,days=, eta = datetime.utcnow()+timedelta(seconds=30) ret=add.apply_async(args=(2022,300),eta=eta) print(ret)
运行t_task.py文件,拿到任务的id号

拓展知识:因为script是一个包,一旦其内部的py模块,右键运行的时候,该包的路径就会被加到环境变量中。所以t_task.py中的导模块,在pycharm中会提示飘红,但这种方式没有错。
6、获取结果:在celery_task包同级目录下创建一个get_result.py模块用来进行测试。

内容:
from celery_task.celery import app
from celery.result import AsyncResult
id='dcf1ff75-2fd1-484d-b3ef-32cf954f452e' #5中运行后得到的结果,就是任务id
if __name__=='__main__':
asy = AsyncResult(id=id,app=app)
if asy.successful():
result = asy.get()
print(result)
elif asy.failed():
print('任务失败')
elif asy.status=='PENDING':
print('任务等待被执行')
elif asy.status=='RETRY':
print('任务异常后正在重试')
elif asy.status=='STARTED':
print('任务已经开始被执行')
else:
print('sssssss')
运行该模块:拿到任务结果了。

更多推荐
所有评论(0)