Jusene's Blog

django-celery集成计划任务

字数统计: 1.9k阅读时长: 11 min
2019/11/11 Share

想做一个web界面的任务管理器,一直以来都是celery来做服务端,自己用脚本来发送异步请求,最近也在学习Django,就踩坑来试试django与celery的结合。

celery相关内容:

这里需要说明下,经测试选用celery3.x版本可以实现,而根据官网celery4.x版本没有结果返回。

官网文档:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html

安装django

1
pip install django
1
django-admin version
2
2.2.6

创建project和app的过程省略

安装django-celery

1
pip install django-celery
2
3
安装celery的版本:celery==3.1.26.post2
4
安装django-celery的版本:django-celery==3.3.1

配置

  • apps/settings.py

    1
    import djcelery
    2
    djcelery.setup_loader()
    3
    ...
    4
    5
    INSTALLED_APPS = [
    6
        ...
    7
        'djcelery',
    8
    ]
    9
    ...
    10
    BROKER_URL = "amqp://rabbitadmin:rabbitadmin@127.0.0.1:5672//djcelery"
    11
    CELERY_TIMEZONE = 'Asia/Shanghai'
    12
    CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/1"
    13
    CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
    14
    CELERY_RESULT_SERIALIZER = 'json'
    15
    CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
  • apps/celery.py

    1
    from __future__ import absolute_import, unicode_literals
    2
    3
    import os
    4
    from celery import Celery
    5
    from django.conf import settings
    6
    7
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'apps.settings')
    8
    9
    app = Celery('tasks')
    10
    app.config_from_object('django.conf:settings')
    11
    12
    app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)
    13
    14
    15
    @app.task(bind=True)
    16
    def debug_task(self):
    17
        print('Request: {0!r}'.format(self.request))
  • 测试启动celery

    1
    ~]# python manage.py celery worker -A apps -l info
    2
     -------------- celery@MacBook-Pro.local v3.1.26.post2 (Cipater)
    3
    ---- **** ----- 
    4
    --- * ***  * -- Darwin-18.7.0-x86_64-i386-64bit
    5
    -- * - **** --- 
    6
    - ** ---------- [config]
    7
    - ** ---------- .> app:         apps:0x108371898
    8
    - ** ---------- .> transport:   amqp://rabbitadmin:**@127.0.0.1:5672//djcelery
    9
    - ** ---------- .> results:     redis://127.0.0.1:6379/1
    10
    - *** --- * --- .> concurrency: 8 (prefork)
    11
    -- ******* ---- 
    12
    --- ***** ----- [queues]
    13
     -------------- .> celery           exchange=celery(direct) key=celery
    14
                    
    15
    16
    [tasks]
    17
      . apps.celery.debug_task
    18
    19
    [2019-11-11 15:22:17,460: INFO/MainProcess] Connected to amqp://rabbitadmin:**@127.0.0.1:5672//djcelery
    20
    [2019-11-11 15:22:17,472: INFO/MainProcess] mingle: searching for neighbors
    21
    [2019-11-11 15:22:18,498: INFO/MainProcess] mingle: all alone
  • 添加任务
    django-celery的任务自需要在app下添加tasks.py即可,名字不可变,该定义在app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)自动扫描生效

1
django-admin startapp tasks
2
3
tree tasks
4
tasks
5
├── __init__.py
6
├── __pycache__
7
│   ├── __init__.cpython-36.pyc
8
│   ├── admin.cpython-36.pyc
9
│   ├── apps.cpython-36.pyc
10
│   ├── celery.cpython-36.pyc
11
│   └── models.cpython-36.pyc
12
├── admin.py
13
├── apps.py
14
├── celery.py
15
├── migrations
16
│   ├── __init__.py
17
│   └── __pycache__
18
│       └── __init__.cpython-36.pyc
19
├── models.py
20
├── tests.py
21
└── views.py
22
23
# 而且我们要在setting.py中注册该app
1
from __future__ import absolute_import, unicode_literals
2
from celery import shared_task
3
4
5
@shared_task
6
def add(x, y):
7
    return x + y
8
9
10
@shared_task
11
def mul(x, y):
12
    return x * y

ORM写入数据库

1
python manage.py migrate
2
3
# 创建个超级用户
4
python manage.py createsuperuser

测试

  • 启动worker服务端

    1
    ~]# python manage.py celery worker -A apps -l info
    2
     
    3
     -------------- celery@MacBook-Pro.local v3.1.26.post2 (Cipater)
    4
    ---- **** ----- 
    5
    --- * ***  * -- Darwin-18.7.0-x86_64-i386-64bit
    6
    -- * - **** --- 
    7
    - ** ---------- [config]
    8
    - ** ---------- .> app:         apps:0x108361828
    9
    - ** ---------- .> transport:   amqp://rabbitadmin:**@127.0.0.1:5672//djcelery
    10
    - ** ---------- .> results:     redis://127.0.0.1:6379/1
    11
    - *** --- * --- .> concurrency: 8 (prefork)
    12
    -- ******* ---- 
    13
    --- ***** ----- [queues]
    14
     -------------- .> celery           exchange=celery(direct) key=celery
    15
                    
    16
    17
    [tasks]
    18
      . tasks.tasks.add
    19
      . tasks.tasks.mul
    20
      . apps.celery.debug_task
    21
    22
    [2019-11-11 15:35:21,437: INFO/MainProcess] Connected to amqp://rabbitadmin:**@127.0.0.1:5672//djcelery
    23
    [2019-11-11 15:35:21,465: INFO/MainProcess] mingle: searching for neighbors
    24
    [2019-11-11 15:35:22,494: INFO/MainProcess] mingle: all alone
  • 启动beat服务端

    1
    #]~ python manage.py celery beat -A tasks -l info
    2
    celery beat v3.1.26.post2 (Cipater) is starting.
    3
    __    -    ... __   -        _
    4
    Configuration ->
    5
        . broker -> amqp://rabbitadmin:**@127.0.0.1:5672//djcelery
    6
        . loader -> celery.loaders.app.AppLoader
    7
        . scheduler -> djcelery.schedulers.DatabaseScheduler
    8
    9
        . logfile -> [stderr]@%INFO
    10
        . maxinterval -> now (0s)
    11
    [2019-11-11 15:38:45,123: INFO/MainProcess] beat: Starting...
    12
    [2019-11-11 15:38:45,123: INFO/MainProcess] Writing entries (0)...
  • 网页控制任务

  • 任务执行查看
    beat服务端
    1
    [2019-11-11 15:47:45,259: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
    2
    [2019-11-11 15:47:45,260: INFO/MainProcess] Writing entries (1)...
    3
    [2019-11-11 15:47:50,259: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
    4
    [2019-11-11 15:47:55,260: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
    5
    [2019-11-11 15:48:00,260: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
    6
    [2019-11-11 15:48:05,261: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
    7
    [2019-11-11 15:48:10,261: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
    8
    [2019-11-11 15:48:15,262: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
    9
    [2019-11-11 15:48:20,263: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
    10
    [2019-11-11 15:48:25,263: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
    11
    [2019-11-11 15:48:30,264: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
    12
    [2019-11-11 15:48:35,264: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
    13
    [2019-11-11 15:48:40,265: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
    14
    [2019-11-11 15:48:45,266: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)

worker服务端

1
[2019-11-11 15:49:20,274: INFO/MainProcess] Task rest.tasks.add[fb38d77b-d6f0-40d3-bde3-0df454109d92] succeeded in 0.0012137379962950945s: 3
2
[2019-11-11 15:49:25,273: INFO/MainProcess] Received task: rest.tasks.add[c67474bf-3c82-4f4f-ad9f-20ed055e0f39]
3
[2019-11-11 15:49:25,275: INFO/MainProcess] Task rest.tasks.add[c67474bf-3c82-4f4f-ad9f-20ed055e0f39] succeeded in 0.0011863320105476305s: 3
4
[2019-11-11 15:49:30,273: INFO/MainProcess] Received task: rest.tasks.add[56120b88-07ef-47e1-8361-68ee08dff44f]
5
[2019-11-11 15:49:30,276: INFO/MainProcess] Task rest.tasks.add[56120b88-07ef-47e1-8361-68ee08dff44f] succeeded in 0.0012300249945838004s: 3
6
[2019-11-11 15:49:35,274: INFO/MainProcess] Received task: rest.tasks.add[e22b54a2-0ca6-4c37-bd0a-3d23a7a4bae2]
7
[2019-11-11 15:49:35,277: INFO/MainProcess] Task rest.tasks.add[e22b54a2-0ca6-4c37-bd0a-3d23a7a4bae2] succeeded in 0.0015451840008608997s: 3
8
[2019-11-11 15:49:40,275: INFO/MainProcess] Received task: rest.tasks.add[0ebcedc4-1c80-4a4c-8e43-d9023a5e37c0]
9
[2019-11-11 15:49:40,278: INFO/MainProcess] Task rest.tasks.add[0ebcedc4-1c80-4a4c-8e43-d9023a5e37c0] succeeded in 0.0013460790069075301s: 3
10
[2019-11-11 15:49:45,276: INFO/MainProcess] Received task: rest.tasks.add[ac169f86-72e0-4ea0-9aa4-8f290dda64c1]
11
[2019-11-11 15:49:45,278: INFO/MainProcess] Task rest.tasks.add[ac169f86-72e0-4ea0-9aa4-8f290dda64c1] succeeded in 0.0012133710115449503s: 3
12
[2019-11-11 15:49:50,277: INFO/MainProcess] Received task: rest.tasks.add[1ff6d6d1-e7ee-430e-ae64-e91f31dfab05]
13
[2019-11-11 15:49:50,279: INFO/MainProcess] Task rest.tasks.add[1ff6d6d1-e7ee-430e-ae64-e91f31dfab05] succeeded in 0.0012463609891710803s: 3
14
[2019-11-11 15:49:55,277: INFO/MainProcess] Received task: rest.tasks.add[ed65976b-5621-42de-86cb-6ba1e53db78c]
15
[2019-11-11 15:49:55,279: INFO/MainProcess] Task rest.tasks.add[ed65976b-5621-42de-86cb-6ba1e53db78c] succeeded in 0.001093872997444123s: 3
16
[2019-11-11 15:50:00,278: INFO/MainProcess] Received task: rest.tasks.add[d80f7a74-9e5f-4f03-92f0-cee6a9cfd4c6]
17
[2019-11-11 15:50:00,281: INFO/MainProcess] Task rest.tasks.add[d80f7a74-9e5f-4f03-92f0-cee6a9cfd4c6] succeeded in 0.0018399830005364493s: 3
18
[2019-11-11 15:50:05,278: INFO/MainProcess] Received task: rest.tasks.add[41a64336-d764-401f-bc16-d7cc3bf1431e]
19
[2019-11-11 15:50:05,280: INFO/MainProcess] Task rest.tasks.add[41a64336-d764-401f-bc16-d7cc3bf1431e] succeeded in 0.001239712000824511s: 3

backend存储

1
337) "celery-task-meta-8a9a52fd-cd47-4988-854e-7e9e635faad7"
2
338) "celery-task-meta-c09d2e9f-dca5-452b-97c0-454cc39536d3"
3
339) "celery-task-meta-e0dc9c88-7a8d-4094-8746-2eea2820ee98"
4
340) "celery-task-meta-898ed57b-a52f-4d39-a685-9fe0718356be"
5
341) "celery-task-meta-6404ed17-e420-4423-b0d6-2f7e861a912c"
6
342) "celery-task-meta-64ae32c3-5cff-4cf0-9590-17effc234978"
7
343) "celery-task-meta-ab126140-b65b-40ff-accd-ec59d1597dc6"
8
344) "celery-task-meta-d2fba024-8550-44bc-8d1b-d0ad7257e6bd"
9
127.0.0.1:6379[1]> get celery-task-meta-d2fba024-8550-44bc-8d1b-d0ad7257e6bd
10
"{\"status\": \"SUCCESS\", \"result\": 3, \"traceback\": null, \"children\": []}"

bug解决

backend为redis会出现报错,原因是库源代码写错了:

错误:

1
redis.exceptions.ResponseError: Command # 1 (SETEX b’celery-task-meta-734b07fc-944d-4416-a4c8-61834880f345’ b’\x80\x02}q\x00(X\x06\x00\x00\x00statu
2
sq\x01X\x07\x00\x00\x00SUCCESSq\x02X\x06\x00\x00\x00resultq\x03K\x06X\t\x00\x00\x00tracebackq\x04NX\x08\x00\x00\x00childrenq\x05]q\x06u.’ 86400) of
3
pipeline caused error: value is not an integer or out of range

库文件:

1
venv/lib/python3.6/site-packages/celery/backends/redis.py
2
3
4
    def _set(self, key, value):
5
        with self.client.pipeline() as pipe:
6
            if self.expires:
7
                # pipe.setex(key, value, self.expires) 源文件
8
                pipe.setex(key, self.expires, value) # 修改后
9
            else:
10
                pipe.set(key, value)
11
            pipe.publish(key, value)
12
            pipe.execute()
13
14
    # 方法定义,可以看出是源代码错误
15
    def setex(self, name, time, value):
16
        """
17
        Set the value of key ``name`` to ``value`` that expires in ``time``
18
        seconds. ``time`` can be represented by an integer or a Python
19
        timedelta object.
20
        """
21
        if isinstance(time, datetime.timedelta):
22
            time = int(time.total_seconds())
23
        return self.execute_command('SETEX', name, time, value)
CATALOG
  1. 1. 安装django
  2. 2. 安装django-celery
  3. 3. 配置
  4. 4. 测试
  5. 5. bug解决