平时Django开发中,我们经常会用到异步后台任务调度,比如邮件发送、三方接口的调用等。此时Celery便提供了强大的支持,无论Django用作是运维开发、DEVOPS、应用开发工作,我们都需要掌握。
Celery是什么?
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统
专注于实时处理的异步任务队列
同时也支持任务调度
官方文档:https://docs.celeryproject.org/en/master/index.html
附Celery架构图
Django Celery配置
本例使用的是django+celery+redis配置,试用大多数情况,如有根据版本其他不同,请参考官方文档以官方为准。
模块环境
Python3.6
celery==4.3.0
django==1.11.11
django-celery==3.2.2
django-celery-beat==1.4.0
redis==3.2.1 # 注意redis版本 有些时候 与celery版本不兼容
项目settings.py
中配置
# ===== CELERY 相关配置 ==========
import djcelery
djcelery.setup_loader()
# 根据环境broker redis地址不同
if ENV == 'dev':
BROKER_URL = 'redis://127.0.0.1:6379/1'
else:
BROKER_URL = '{}/1'.format(get_system_config("celery_broker"))
CELERY_RESULT_BACKEND = '{}/2'.format(get_system_config("celery_broker")) # djcelery.backends.database:DatabaseBackend
# 默认会读每个项目每个app下的task.py任务
# CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
# CELERY_IMPORTS = ("procedure.inception_tasks", "task.tasks")
CELERY_TIMEZONE = 'Asia/Shanghai' # 时区
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 864000} # 任务时效 10天
CELERY_ENABLE_UTC = False
CELERY_ACCEPT_CONTENT = ['json', 'pickle', 'yaml']
CELERY_TASK_SERIALIZER = 'json' # 任务序列化格式
CELERY_RESULT_SERIALIZER = 'json' # 任务结果序列化格式
CELERYD_CONCURRENCY = 24 # celery worker的并发数 根据并发量是适当配置,不易太大
CELERYD_FORCE_EXECV = True
CELERYD_MAX_TASKS_PER_CHILD = 2
BROKER_POOL_LIMIT = 0 # mysql gone 问题
# 动态定时任务 (非必要,用到beat请配置)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# APP下面的配置
INSTALLED_APPS = (
...,
'djcelery',
'django_celery_beat', # 如果用到动态发布任务
)
项目下的__init__.py
(settings.py同级)
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
项目下新建一个celery.py
(settings.py同级)
# coding:utf-8
from __future__ import absolute_import, unicode_literals
import os
import django
from celery import Celery, platforms
from django.conf import settings
from celery.signals import after_task_publish, task_prerun, worker_ready
from celery.schedules import crontab
from djcelery.managers import TaskManager
# from apps.common_tasks.tasks import mysql_slowlog_alarm_task
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'IDB-API.settings')
django.setup()
app = Celery('IDB-API')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
#
# status = app.AsyncResult("").status
# result = app.AsyncResult("").result
# 启动是否使用root用户启动,如果是,必须配置True
platforms.C_FORCE_ROOT = True
# 自定义定时任务 #任务注册也可在 settings里完成
app.conf.update(
# 一些初始化自定义的任务
CELERYBEAT_SCHEDULE={
'mysql_procedure_inspection_beat': {
'task': 'mysql_procedure_inspection',
# 'schedule': crontab(minute='0,10,20,30,40,50'),
'schedule': datetime.timedelta(seconds=60 * 0.5),
'args': ()
}
}
)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
@after_task_publish.connect
def task_publish_after_handler(sender=None, body=None, **kwargs):
print(u'Push Task Success: task_id: {body_id}; sender: {sender}'.format(body_id=str(body['id']).decode('utf-8'), sender=str(sender)))
@worker_ready.connect
def worker_ready_handler(sender=None, **kwargs):
print(u"Celery Worker Ready Complete, start some task...")
# mysql_slowlog_alarm_task.delay()
定义任务
django-celery 默认会读取每个app 下面的 task.py
任务,所以我们只需要在特定的app下新建一个task.py
文件,添加我们的任务:
def revoke_task(task_id):
"""取消任务"""
result = {'code': 0, 'msg': '成功, 任务 [%s] 取消成功' % task_id}
try:
celery_app.control.revoke(task_id, terminate=True)
except Exception as e:
result = {'code': 1, 'msg': '错误, 取消任务 [%s] 失败: %s' % (task_id, str(e))}
return result
@shared_task(name='test', max_retries=0, bind=True)
def test(self, a, b):
print("test任务开始执行...")
try:
result = a + b
time.sleep(4)
print(self.request.id)
# celery_app.control.revoke(self.request.id, terminate=True)
time.sleep(6)
except Exception as e:
raise Exception({
'错误拉': "啊哈哈"
})
# raise self.retry(exc=e, countdown=2)
print("test任务执行结束!")
return result
启动Celery方式
启动后台work方式:
python -u manage.py celery worker --loglevel=info
启动后台beat方式(如果用到动态定时任务beat请启动):
python -u manage.py celery beat --loglevel=info
一种合并的启动方式
python -u manage.py celery worker -B --loglevel=info
Django Celery 线上配置
这里再给大家介绍下,如果部署 celery
我采用的是用supervisord守护方式
安装supervisord
pip install supervisor
修改/etc/supervisord.conf
(如没有此文件,请拷贝默认配置文件)文件,末尾
[include]
;files = relative/directory/*.ini
files = /etc/supervisor/*.ini
celery的部署配置文件
配置文件supervisord_celery.ini
[program:celerywoker]
user=root
command=/usr/local/python/python2.7.9/bin/python -u manage.py celery worker -B --loglevel=info
directory=/data/idb/idb-api
stdout_logfile=/var/log/celery/worker_info.log
autostart=true
stderr_logfile=/var/log/celery/worker_err.log
autorestart=true
redirect_stderr=true
stopsignal=QUIT
[supervisord]
[supervisorctl]
创建supervisor的守护进程
创建目录/etc/supervisor/
,将文件supervisord_idb_celery.ini
(注意确认修改相关路径)上传至该目录
启动服务
重启
supervisord -c /etc/supervisord.conf
查看守护进程
supervisorctl status
重启守护进程
supervisorctl restart xxx
其他:celery crontab讲解
我们需要用到定时任务时候就需要了解crontab,大体和linux的 crontab 类似。
配置Django中Celery的定时任务
settings.py中代码如下
from datetime import timedelta
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
# 'add-every-xx-seconds': {
# 'task': 'app_blog.blog.tasks.print_info',
# 'schedule': timedelta(seconds=2), # 每 30 秒一次
# # 'schedule': timedelta(minutes=1), # 每 1 分钟一次
# # 'schedule': timedelta(hours=4), # 每 4 小时一次
# 'args': ('settings中的定时任务',) # 任务函数参数,如果只有一个参数,一定要加逗号
# },
'send_qq_blog_request_count': {
'task': 'app_blog.blog.tasks.count_blog_everyday_request',
'schedule': crontab(hour=23, minute=30), # 每天晚上 23 点 30 分执行一次
}
}
timedelta
是datetime中的一个对象,需要from datetime import timedelta
引入,有如下几个参数
days
:天seconds
:秒microseconds
:微妙milliseconds
:毫秒minutes
:分hours
:小时
Celery的crontab表达式
crontab是比较完善,可能有些复杂,但能实现各种设置时间的需求。
引入:from celery.schedules import crontab
crontab()实例化的时候没设置任何参数,都是使用默认值。crontab一共有7个参数,常用有5个参数分别为:
month_of_year
:月份。范围1-12day_of_month
:日期,范围1-31day_of_week
:周几,范围0-6hour
:小时,范围0-23minute
:分钟,范围0-59
默认参数
这些参数可以设置表达式,表达稍微复杂的设置。默认值都是”*”星号,代表任意时刻。即crontab()
相当与:
rontab(minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*')
含义是每天、每小时、每分钟执行一次任务。这说法太反人类语言习惯,简单说就是每1分钟执行一次任务。
具体某个值
直接设置某个值,每小时的15分时刻执行一次任务
crontab(minute=15)
直接指定某个时刻。上方设置每天0点0分时刻执行任务。
crontab(minute=0, hour=0)
也可以设置多个值。例如0分和30分执行一次任务
crontab(minute='0,30')
这里使用字符串,用逗号隔开数值。这里的逗号是表示多个表达式or逻辑关系。
设置范围
设置范围也是设置多个值,例如指定9点到12点每个小时的每分钟执行任务。
crontab(minute='*', hour='9-12')
这里*号是默认值,可以省略如下:
crontab(hour='9-12')
逗号是or逻辑关系,指定9点到12点和20点中每分钟执行任务:
crontab(hour='9-12,20')
crontab的表达式越来越复杂了。celery还提供了一个类得到表达式解析结果,代码如下:
from celery.task.schedules import crontab_parser
r = crontab_parser(23, 0).parse('9-12,20')
print(r)
# 结果:set([9, 10, 11, 12, 20])
其中,crontab_parse
是一个解析类。第1个参数是范围的最大值;第2个参数是范围的最小值。通过parse
输入表达式,可得到表达式的解析结果set([9, 10, 11, 12, 20])
。
设置间隔步长
设置1、3、5、7、9、11月份每天每分钟执行任务,按照上面的做法可以设置如下:
crontab(day_of_month='1,3,5,7,9,11')
观察数据可以发现,都是间隔2的步长。需要设置的数字比较少,若数字比较多显得很麻烦。例如我想每间隔2分钟就执行一次任务,要写30个数字想想就觉得很麻烦。crontab表达式还提供了间隔的处理,例如:
crontab(minute='*/2')
crontab(minute='0-59/2') #效果等同上面
这个/号不是除以的意思。相当与range的第3个参数步长,例如:
range(0, 59+1, 2)
示例
# 每2个小时中每分钟执行1次任务
crontab(hour='*/2')
# 每3个小时的0分时刻执行1次任务
# 即[0,3,6,9,12,15,18,21]点0分
crontab(minute=0, hour='*/3')
# 每3个小时或8点到12点的0分时刻执行1次任务
# 即[0,3,6,9,12,15,18,21]+[8,9,10,11,12]点0分
crontab(minute=0, hour='*/3,8-12')
# 每个季度的第1个月中,每天每分钟执行1次任务
# 月份范围是1-12,每3个月为[1,4,7,10]
crontab(month_of_year='*/3')
# 每月偶数天数的0点0分时刻执行1次任务
crontab(minute=0, hour=0, day_of_month='2-31/2')
# 每年5月11号的0点0分时刻执行1次任务
crontab(0, 0, day_of_month='11', month_of_year='5')
你个小老弟, nb