User-Profile-Image
hankin
  • 5
  • 首页
  • 留言
  • 个人简介
  • 分类
    • 留言
    • 未分类
    • 数据库
    • Django
    • celery
  • 页面
  • 友链
    • 社会我毛哥
Help?

Please contact us on our email for need any support

Support
    首页   ›   celery   ›   正文
celery

Django Celery 配置以及定时任务crontab讲解

2020-06-22 11:11:55
1469  1 6

Contents

  • 1 Celery是什么?
  • 2 Django Celery配置
    • 2.1 模块环境
    • 2.2 项目settings.py中配置
    • 2.3 项目下的__init__.py(settings.py同级)
    • 2.4 项目下新建一个celery.py(settings.py同级)
    • 2.5 定义任务
    • 2.6 启动Celery方式
  • 3 Django Celery 线上配置
    • 3.1 安装supervisord
    • 3.2 celery的部署配置文件
    • 3.3 创建supervisor的守护进程
    • 3.4 启动服务
  • 4 其他:celery crontab讲解
    • 4.1 Celery的crontab表达式
    • 4.2 默认参数
    • 4.3 具体某个值
    • 4.4 设置范围
    • 4.5 设置间隔步长
    • 4.6 示例

平时Django开发中,我们经常会用到异步后台任务调度,比如邮件发送、三方接口的调用等。此时Celery便提供了强大的支持,无论Django用作是运维开发、DEVOPS、应用开发工作,我们都需要掌握。

Celery是什么?

Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统
专注于实时处理的异步任务队列
同时也支持任务调度
官方文档:https://docs.celeryproject.org/en/master/index.html
附Celery架构图

Django Celery配置

本例使用的是django+celery+redis配置,试用大多数情况,如有根据版本其他不同,请参考官方文档以官方为准。

模块环境

Python3.6
celery==4.3.0
django==1.11.11
django-celery==3.2.2
django-celery-beat==1.4.0
redis==3.2.1 # 注意redis版本 有些时候 与celery版本不兼容

项目settings.py中配置

# ===== CELERY 相关配置 ==========
import djcelery
djcelery.setup_loader()
# 根据环境broker redis地址不同
if ENV == 'dev':
    BROKER_URL = 'redis://127.0.0.1:6379/1'
else:
    BROKER_URL = '{}/1'.format(get_system_config("celery_broker"))
CELERY_RESULT_BACKEND = '{}/2'.format(get_system_config("celery_broker"))  # djcelery.backends.database:DatabaseBackend
# 默认会读每个项目每个app下的task.py任务
# CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
# CELERY_IMPORTS = ("procedure.inception_tasks", "task.tasks")
CELERY_TIMEZONE = 'Asia/Shanghai' # 时区
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 864000}  # 任务时效 10天
CELERY_ENABLE_UTC = False
CELERY_ACCEPT_CONTENT = ['json', 'pickle', 'yaml']
CELERY_TASK_SERIALIZER = 'json'  # 任务序列化格式
CELERY_RESULT_SERIALIZER = 'json'  # 任务结果序列化格式
CELERYD_CONCURRENCY = 24  # celery worker的并发数 根据并发量是适当配置,不易太大
CELERYD_FORCE_EXECV = True
CELERYD_MAX_TASKS_PER_CHILD = 2
BROKER_POOL_LIMIT = 0  # mysql gone 问题
# 动态定时任务 (非必要,用到beat请配置)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

# APP下面的配置
INSTALLED_APPS = (
    ...,
    'djcelery',
    'django_celery_beat', # 如果用到动态发布任务
)

项目下的__init__.py(settings.py同级)

from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)

项目下新建一个celery.py(settings.py同级)

# coding:utf-8
from __future__ import absolute_import, unicode_literals
import os
import django
from celery import Celery, platforms
from django.conf import settings
from celery.signals import after_task_publish, task_prerun, worker_ready
from celery.schedules import crontab
from djcelery.managers import TaskManager
# from apps.common_tasks.tasks import mysql_slowlog_alarm_task

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'IDB-API.settings')
django.setup()
app = Celery('IDB-API')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
#
# status = app.AsyncResult("").status
# result = app.AsyncResult("").result
# 启动是否使用root用户启动,如果是,必须配置True
platforms.C_FORCE_ROOT = True

# 自定义定时任务 #任务注册也可在 settings里完成
app.conf.update(
    # 一些初始化自定义的任务
    CELERYBEAT_SCHEDULE={
        'mysql_procedure_inspection_beat': {
            'task': 'mysql_procedure_inspection',
            # 'schedule': crontab(minute='0,10,20,30,40,50'),
            'schedule': datetime.timedelta(seconds=60 * 0.5),
            'args': ()
        }
    }
)

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

@after_task_publish.connect
def task_publish_after_handler(sender=None, body=None, **kwargs):
    print(u'Push Task Success: task_id: {body_id}; sender: {sender}'.format(body_id=str(body['id']).decode('utf-8'), sender=str(sender)))

@worker_ready.connect
def worker_ready_handler(sender=None, **kwargs):
    print(u"Celery Worker Ready Complete, start some task...")
    # mysql_slowlog_alarm_task.delay()

定义任务

django-celery 默认会读取每个app 下面的 task.py任务,所以我们只需要在特定的app下新建一个task.py文件,添加我们的任务:

def revoke_task(task_id):
    """取消任务"""
    result = {'code': 0, 'msg': '成功, 任务 [%s] 取消成功' % task_id}
    try:
        celery_app.control.revoke(task_id, terminate=True)
    except Exception as e:
        result = {'code': 1, 'msg': '错误, 取消任务 [%s] 失败: %s' % (task_id, str(e))}
    return result

@shared_task(name='test', max_retries=0, bind=True)
def test(self, a, b):
    print("test任务开始执行...")
    try:
        result = a + b
        time.sleep(4)
        print(self.request.id)
        # celery_app.control.revoke(self.request.id, terminate=True)
        time.sleep(6)
    except Exception as e:
        raise Exception({
            '错误拉': "啊哈哈"
        })
        # raise self.retry(exc=e, countdown=2)
    print("test任务执行结束!")
    return result

启动Celery方式

启动后台work方式:

python -u manage.py celery worker --loglevel=info

启动后台beat方式(如果用到动态定时任务beat请启动):

python -u manage.py celery beat --loglevel=info

一种合并的启动方式

python -u manage.py celery worker -B --loglevel=info

Django Celery 线上配置

这里再给大家介绍下,如果部署 celery
我采用的是用supervisord守护方式

安装supervisord

pip install supervisor

修改/etc/supervisord.conf(如没有此文件,请拷贝默认配置文件)文件,末尾

[include]
;files = relative/directory/*.ini
files = /etc/supervisor/*.ini

celery的部署配置文件

配置文件supervisord_celery.ini

[program:celerywoker]
user=root
command=/usr/local/python/python2.7.9/bin/python -u manage.py celery worker -B --loglevel=info
directory=/data/idb/idb-api
stdout_logfile=/var/log/celery/worker_info.log
autostart=true
stderr_logfile=/var/log/celery/worker_err.log
autorestart=true
redirect_stderr=true
stopsignal=QUIT
[supervisord]
[supervisorctl]

创建supervisor的守护进程

创建目录/etc/supervisor/,将文件supervisord_idb_celery.ini(注意确认修改相关路径)上传至该目录

启动服务

重启
supervisord -c /etc/supervisord.conf
查看守护进程
supervisorctl status
重启守护进程
supervisorctl restart xxx

其他:celery crontab讲解

我们需要用到定时任务时候就需要了解crontab,大体和linux的 crontab 类似。
配置Django中Celery的定时任务
settings.py中代码如下

from datetime import timedelta
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    # 'add-every-xx-seconds': {
    #     'task': 'app_blog.blog.tasks.print_info',
    #     'schedule': timedelta(seconds=2),  # 每 30 秒一次
    #     # 'schedule': timedelta(minutes=1),         # 每 1 分钟一次
    #     # 'schedule': timedelta(hours=4),           # 每 4 小时一次
    #     'args': ('settings中的定时任务',)  # 任务函数参数,如果只有一个参数,一定要加逗号
    # },
    'send_qq_blog_request_count': {
        'task': 'app_blog.blog.tasks.count_blog_everyday_request',
        'schedule': crontab(hour=23, minute=30),  # 每天晚上 23 点 30 分执行一次
    }
}

timedelta是datetime中的一个对象,需要from datetime import timedelta引入,有如下几个参数

  • days:天
  • seconds:秒
  • microseconds:微妙
  • milliseconds:毫秒
  • minutes:分
  • hours:小时

Celery的crontab表达式

crontab是比较完善,可能有些复杂,但能实现各种设置时间的需求。

引入:from celery.schedules import crontab

crontab()实例化的时候没设置任何参数,都是使用默认值。crontab一共有7个参数,常用有5个参数分别为:

  • month_of_year:月份。范围1-12
  • day_of_month:日期,范围1-31
  • day_of_week:周几,范围0-6
  • hour:小时,范围0-23
  • minute:分钟,范围0-59

默认参数

这些参数可以设置表达式,表达稍微复杂的设置。默认值都是”*”星号,代表任意时刻。即crontab()相当与:

rontab(minute='*', hour='*', day_of_week='*', day_of_month='*', month_of_year='*')

含义是每天、每小时、每分钟执行一次任务。这说法太反人类语言习惯,简单说就是每1分钟执行一次任务。

具体某个值

直接设置某个值,每小时的15分时刻执行一次任务

crontab(minute=15)

直接指定某个时刻。上方设置每天0点0分时刻执行任务。

crontab(minute=0, hour=0)

也可以设置多个值。例如0分和30分执行一次任务

crontab(minute='0,30')

这里使用字符串,用逗号隔开数值。这里的逗号是表示多个表达式or逻辑关系。

设置范围

设置范围也是设置多个值,例如指定9点到12点每个小时的每分钟执行任务。

crontab(minute='*', hour='9-12')

这里*号是默认值,可以省略如下:

crontab(hour='9-12')

逗号是or逻辑关系,指定9点到12点和20点中每分钟执行任务:

crontab(hour='9-12,20')

crontab的表达式越来越复杂了。celery还提供了一个类得到表达式解析结果,代码如下:

from celery.task.schedules import crontab_parser
r = crontab_parser(23, 0).parse('9-12,20')
print(r)
# 结果:set([9, 10, 11, 12, 20])

其中,crontab_parse是一个解析类。第1个参数是范围的最大值;第2个参数是范围的最小值。通过parse输入表达式,可得到表达式的解析结果set([9, 10, 11, 12, 20])。

设置间隔步长

设置1、3、5、7、9、11月份每天每分钟执行任务,按照上面的做法可以设置如下:

crontab(day_of_month='1,3,5,7,9,11')

观察数据可以发现,都是间隔2的步长。需要设置的数字比较少,若数字比较多显得很麻烦。例如我想每间隔2分钟就执行一次任务,要写30个数字想想就觉得很麻烦。crontab表达式还提供了间隔的处理,例如:

crontab(minute='*/2')
crontab(minute='0-59/2') #效果等同上面

这个/号不是除以的意思。相当与range的第3个参数步长,例如:

range(0, 59+1, 2)

示例

# 每2个小时中每分钟执行1次任务
crontab(hour='*/2')

# 每3个小时的0分时刻执行1次任务
# 即[0,3,6,9,12,15,18,21]点0分
crontab(minute=0, hour='*/3')

# 每3个小时或8点到12点的0分时刻执行1次任务
# 即[0,3,6,9,12,15,18,21]+[8,9,10,11,12]点0分
crontab(minute=0, hour='*/3,8-12')

# 每个季度的第1个月中,每天每分钟执行1次任务
# 月份范围是1-12,每3个月为[1,4,7,10]
crontab(month_of_year='*/3')

# 每月偶数天数的0点0分时刻执行1次任务
crontab(minute=0, hour=0, day_of_month='2-31/2')

# 每年5月11号的0点0分时刻执行1次任务
crontab(0, 0, day_of_month='11', month_of_year='5')

如本文“对您有用”,欢迎随意打赏作者,让我们坚持创作!

6 打赏
评论 (3)

点击这里取消回复。

欢迎您 游客  

  • 〇

    你个小老弟, nb

    4个月前
    回复
目录树
  • 1 Celery是什么?
  • 2 Django Celery配置
    • 2.1 模块环境
    • 2.2 项目settings.py中配置
    • 2.3 项目下的__init__.py(settings.py同级)
    • 2.4 项目下新建一个celery.py(settings.py同级)
    • 2.5 定义任务
    • 2.6 启动Celery方式
  • 3 Django Celery 线上配置
    • 3.1 安装supervisord
    • 3.2 celery的部署配置文件
    • 3.3 创建supervisor的守护进程
    • 3.4 启动服务
  • 4 其他:celery crontab讲解
    • 4.1 Celery的crontab表达式
    • 4.2 默认参数
    • 4.3 具体某个值
    • 4.4 设置范围
    • 4.5 设置间隔步长
    • 4.6 示例
疯狂小土豆
专注Python Web开发
13文章 26评论 25点赞 9349浏览

最新评论
+7
随机文章
日志配置: 如何正确配置日志(logging)
2年前
Django进阶:事务操作、悲观锁和乐观锁(附代码演示)
2个月前
(数据库平台)转载:美团MySQL数据库巡检系统的设计与应用
2年前
python加解密相关模块库及用法
2年前
50条有趣的Python一行代码,建议收藏!
4个月前
图像链接
归档文章
  • celery (1)
  • Django (7)
  • 数据库 (1)
  • 未分类 (4)
  • 留言 (1)
Copyright © 2022 网站备案号: 12313213213
                   疯狂的小土豆 主题. Designed by Mundy
主页
页面
博主
疯狂小土豆
疯狂小土豆 管理员
专注Python Web开发
13 文章 26 评论 9349 浏览
测试
测试
赞赏作者

请通过微信、支付宝 APP 扫一扫

感谢您对作者的支持!

 支付宝 微信支付