django(windows系统)使用celery框架
1、安装celery:pip install celery ,安装eventlet:pip install eventlet2、创建celery_task.py模块,写上:from celery import Celery#'redis://密码@ip/库号'broker = 'redis://192.168.137.10/15'backend='redis://192.168.137.10/14
1、celery是一个框架,用于实现异步任务、定时任务、延迟任务。
2、celery是一个独立的框架,需要在命令行去启动它。
3、这里我使用redis数据库来保存任务和任务结果
安装:windows系统下,需要借助eventlet来启动
pip install celery
pip install eventlet
使用:
1、创建一个包,celery_task
2、包内必须要有celery.py文件,文件内容:
from celery import Celery #加载django环境 import os import django django.setup() os.environ.setdefault("DJANGO_SETTINGS_MODULE","luffyapi.settings.dev") broker = 'redis://192.168.137.10/15'#broker任务队列 backend = 'redis://192.168.137.10/14'#结构存储,执行完的结果存在这里 app = Celery(__name__,broker=broker,backend=backend,include=['celery_task.task1','celery_task.task2'])
#定时任务,写到celery文件 #时区设置成东八区 app.conf.timezone='Asia/Shanghai' #禁用格林威治时区 app.conf.enable_utc=False #任务定时配置 from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { 'low-task':{ #指定要执行的任务 'task':'celery_task.task1.add', #每10秒执行一次 'schedule':timedelta(seconds=3), #每周一的7:30执行 # 'schedule':crontab(hour=7,minute=30,day_of_week=1), 'args':(300,150) ,#任务需要传递的参数 } } #配置定时任务后,需要启动beat ,这个beat就是代替我们去手动点击运行,到时间就点击运行,执行定时任务 #另起terminal,切换到celery_task的上级目录执行: celery -A celery_task beat -l INFO
其中app=celery()
第一个参数是名字,
第四个参数include=[] 里面存放的一个个任务的py模块名
3、任务py模块
task1.py文件下:
from .celery import app @app.task def add(x,y): #"luffyapi.settings.dev" 项目同名目录下settings文件夹下的dev.py配置文件,没有更改目录结构就去掉.dev print(x+y) return x+y
task2.py文件下:
from .celery import app @app.task def mutile(x,y): print(x,y) return x*y
4、启动celery
terminal下:cd 到 celery_task 目录的上一级目录下执行:
celery -A celery_task worker -l INFO -P eventlet
运行结果如下:说明任务已经启动
5、使用:
在celery_task包同级目录下创建一个py文件用来进行测试。
t_task.py文件中的内容:
from celery_task.task1 import add from celery_task.task2 import mutile #立即执行任务 ret1 = add.delay(10,12) ret2 = mutile.delay(2,3) print(ret1) print(ret2)
#执行延迟任务 from datetime import datetime,timedelta #必须是utc时间,timedelta是时间对象,10秒后,days=, eta = datetime.utcnow()+timedelta(seconds=30) ret=add.apply_async(args=(2022,300),eta=eta) print(ret)
运行t_task.py文件,拿到任务的id号
拓展知识:因为script是一个包,一旦其内部的py模块,右键运行的时候,该包的路径就会被加到环境变量中。所以t_task.py中的导模块,在pycharm中会提示飘红,但这种方式没有错。
6、获取结果:在celery_task包同级目录下创建一个get_result.py模块用来进行测试。
内容:
from celery_task.celery import app from celery.result import AsyncResult id='dcf1ff75-2fd1-484d-b3ef-32cf954f452e' #5中运行后得到的结果,就是任务id if __name__=='__main__': asy = AsyncResult(id=id,app=app) if asy.successful(): result = asy.get() print(result) elif asy.failed(): print('任务失败') elif asy.status=='PENDING': print('任务等待被执行') elif asy.status=='RETRY': print('任务异常后正在重试') elif asy.status=='STARTED': print('任务已经开始被执行') else: print('sssssss')
运行该模块:拿到任务结果了。
更多推荐
所有评论(0)