Skip to content

Latest commit

 

History

History
228 lines (152 loc) · 5.34 KB

README-zh.md

File metadata and controls

228 lines (152 loc) · 5.34 KB

celery-sqlalchemy-scheduler

一个基于 sqlalchemy 的 scheduler,作为 celery 定时任务的辅助工具。

快速开始

中文文档 English

依赖

  • Python 3
  • celery >= 4.2
  • sqlalchemy

首先必须安装 celerysqlalchemy, 并且celery应该大于等于 4.2 版本。

$ pip install sqlalchemy celery

安装

通过 PyPi 安装:

$ pip install celery-sqlalchemy-scheduler

通过 github 仓库进行安装:

$ git clone [email protected]:AngelLiang/celery-sqlalchemy-scheduler.git
$ cd celery-sqlalchemy-scheduler
$ python setup.py install

使用示例

安装celery_sqlalchemy_scheduler之后,你可以查看examples目录下的代码:

  1. 启动 celery worker:

    $ celery worker -A tasks -l info
    
  2. 使用DatabaseScheduler作为 scheduler 启动 celery beat:

    $ celery beat -A tasks -S celery_sqlalchemy_scheduler.schedulers:DatabaseScheduler -l info
    

使用说明

beat 启动之后,默认会在当前目录下生成名称为schedule.db的 sqlite 数据库。Windows 下可以使用 SQLiteStudio.exe 工具打开查看里面的数据。

sqlite

数据库同步 scheduler 到 beat

当需要更新 scheduler,只需要修改schedule.db相关数据即可。修改好数据库的 scheduler 后,celery_sqlalchemy_scheduler并不会马上同步数据库的数据到 beat,我们最后还需要修改celery_periodic_task_changed表的第一条数据,只需要把last_update字段更新到最新的时间即可。当 beat 在下一个“心跳”之后,就会同步数据库的数据到 beat。

配置数据库

在配置 Celery 的时候,可以设置 sqlalchemy 数据库的路径,示例如下:

from celery import Celery

celery = Celery('tasks')

beat_dburi = 'sqlite:///schedule.db'

celery.conf.update(
    {'beat_dburi': beat_dburi}
)

当然,你可以改为使用 MySQL 或 PostgreSQL:

# MySQL: `pip install mysql-connector`
beat_dburi = 'mysql+mysqlconnector://root:[email protected]:3306/celery-schedule'

# PostgreSQL: `pip install psycopg2`
beat_dburi = 'postgresql+psycopg2://postgres:[email protected]:5432/celery-schedule'

示例代码一

可以查看 examples/base/tasks.py 代码获取相关细节说明

相关讨论:AngelLiang#15 (comment)

开一个console,启动 Celery Worker

$ pipenv shell
$ cd examples/base

# Celery < 5.0
$ celery worker -A tasks:celery -l info

# Celery >= 5.0
$ celery -A tasks:celery worker -l info

开第二个console,启动 Celery Beat

$ pipenv shell
$ cd examples/base

# Celery < 5.0
$ celery beat -A tasks:celery -S tasks:DatabaseScheduler -l info

# Celery >= 5.0
$ celery -A tasks:celery beat -S tasks:DatabaseScheduler -l info

示例代码二

examples/base/tasks.py

# coding=utf-8
"""
Ready::

    $ pipenv install
    $ pipenv shell
    $ python setup.py install

Run Worker::

    $ cd examples/base
    $ celery worker -A tasks:celery -l info

Run Beat::

    $ cd examples/base
    $ celery beat -A tasks:celery -S tasks:DatabaseScheduler -l info

"""
import os
import platform
from datetime import timedelta
from celery import Celery
from celery import schedules

from celery_sqlalchemy_scheduler.schedulers import DatabaseScheduler  # noqa

if platform.system() == 'Windows':
    # Celery在Windows环境下运行需要设置这个变量,否则调用任务会报错
    os.environ['FORKED_BY_MULTIPROCESSING'] = '1'

backend = 'rpc://'
broker_url = 'amqp://guest:guest@localhost:5672//'

# 如果数据库修改了下面的schedule,beat重启后数据库会被下面的配置覆盖
beat_schedule = {
    'echo-every-3-seconds': {
        'task': 'tasks.echo',
        'schedule': timedelta(seconds=3),
        'args': ('hello', )
    },
    'add-every-minutes': {
        'task': 'tasks.add',
        'schedule': schedules.crontab('*', '*', '*'),
        'args': (1, 2)
    },
}

beat_scheduler = 'celery_sqlalchemy_scheduler.schedulers:DatabaseScheduler'

beat_sync_every = 0

# The maximum number of seconds beat can sleep between checking the schedule.
# default: 0
beat_max_loop_interval = 10

# 非celery和beat的配置,配置beat_dburi数据库路径
beat_dburi = 'sqlite:///schedule.db'
# OR
# beat_dburi = 'mysql+mysqlconnector://root:[email protected]/celery-schedule'

# 配置时区
timezone = 'Asia/Shanghai'

# 默认每个worker跑完10个任务后,自我销毁程序重建来释放内存
# 防止内存泄漏
worker_max_tasks_per_child = 10

celery = Celery('tasks',
                backend=backend,
                broker=broker_url)

config = dict(
    beat_schedule=beat_schedule,
    beat_scheduler=beat_scheduler,
    beat_max_loop_interval=beat_max_loop_interval,
    beat_dburi=beat_dburi,

    timezone=timezone,
    worker_max_tasks_per_child=worker_max_tasks_per_child
)

celery.conf.update(config)


@celery.task
def add(x, y):
    return x + y


@celery.task
def echo(data):
    print(data)


if __name__ == "__main__":
    celery.start()

参考

本工具主要参考了以下资料和源码: