想做一个web界面的任务管理器,一直以来都是celery来做服务端,自己用脚本来发送异步请求,最近也在学习Django,就踩坑来试试django与celery的结合。
celery相关内容:
这里需要说明下,经测试选用celery3.x版本可以实现,而根据官网celery4.x版本没有结果返回。
官网文档:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html
安装django
1 | pip install django |
1 | django-admin version |
2 | 2.2.6 |
创建project和app的过程省略
安装django-celery
1 | pip install django-celery |
2 | |
3 | 安装celery的版本:celery==3.1.26.post2 |
4 | 安装django-celery的版本:django-celery==3.3.1 |
配置
apps/settings.py
1
import djcelery
2
djcelery.setup_loader()
3
...
4
5
INSTALLED_APPS = [
6
...
7
'djcelery',
8
]
9
...
10
BROKER_URL = "amqp://rabbitadmin:rabbitadmin@127.0.0.1:5672//djcelery"
11
CELERY_TIMEZONE = 'Asia/Shanghai'
12
CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/1"
13
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
14
CELERY_RESULT_SERIALIZER = 'json'
15
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
apps/celery.py
1
from __future__ import absolute_import, unicode_literals
2
3
import os
4
from celery import Celery
5
from django.conf import settings
6
7
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'apps.settings')
8
9
app = Celery('tasks')
10
app.config_from_object('django.conf:settings')
11
12
app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)
13
14
15
16
def debug_task(self):
17
print('Request: {0!r}'.format(self.request))
测试启动celery
1
~]# python manage.py celery worker -A apps -l info
2
-------------- celery@MacBook-Pro.local v3.1.26.post2 (Cipater)
3
---- **** -----
4
--- * *** * -- Darwin-18.7.0-x86_64-i386-64bit
5
-- * - **** ---
6
- ** ---------- [config]
7
- ** ---------- .> app: apps:0x108371898
8
- ** ---------- .> transport: amqp://rabbitadmin:**@127.0.0.1:5672//djcelery
9
- ** ---------- .> results: redis://127.0.0.1:6379/1
10
- *** --- * --- .> concurrency: 8 (prefork)
11
-- ******* ----
12
--- ***** ----- [queues]
13
-------------- .> celery exchange=celery(direct) key=celery
14
15
16
[tasks]
17
. apps.celery.debug_task
18
19
[2019-11-11 15:22:17,460: INFO/MainProcess] Connected to amqp://rabbitadmin:**@127.0.0.1:5672//djcelery
20
[2019-11-11 15:22:17,472: INFO/MainProcess] mingle: searching for neighbors
21
[2019-11-11 15:22:18,498: INFO/MainProcess] mingle: all alone
添加任务
django-celery的任务自需要在app下添加tasks.py即可,名字不可变,该定义在app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)
自动扫描生效
1 | django-admin startapp tasks |
2 | |
3 | tree tasks |
4 | tasks |
5 | ├── __init__.py |
6 | ├── __pycache__ |
7 | │ ├── __init__.cpython-36.pyc |
8 | │ ├── admin.cpython-36.pyc |
9 | │ ├── apps.cpython-36.pyc |
10 | │ ├── celery.cpython-36.pyc |
11 | │ └── models.cpython-36.pyc |
12 | ├── admin.py |
13 | ├── apps.py |
14 | ├── celery.py |
15 | ├── migrations |
16 | │ ├── __init__.py |
17 | │ └── __pycache__ |
18 | │ └── __init__.cpython-36.pyc |
19 | ├── models.py |
20 | ├── tests.py |
21 | └── views.py |
22 | |
23 | # 而且我们要在setting.py中注册该app |
1 | from __future__ import absolute_import, unicode_literals |
2 | from celery import shared_task |
3 | |
4 | |
5 | @shared_task |
6 | def add(x, y): |
7 | return x + y |
8 | |
9 | |
10 | @shared_task |
11 | def mul(x, y): |
12 | return x * y |
ORM写入数据库
1 | python manage.py migrate |
2 | |
3 | # 创建个超级用户 |
4 | python manage.py createsuperuser |
测试
启动worker服务端
1
~]# python manage.py celery worker -A apps -l info
2
3
-------------- celery@MacBook-Pro.local v3.1.26.post2 (Cipater)
4
---- **** -----
5
--- * *** * -- Darwin-18.7.0-x86_64-i386-64bit
6
-- * - **** ---
7
- ** ---------- [config]
8
- ** ---------- .> app: apps:0x108361828
9
- ** ---------- .> transport: amqp://rabbitadmin:**@127.0.0.1:5672//djcelery
10
- ** ---------- .> results: redis://127.0.0.1:6379/1
11
- *** --- * --- .> concurrency: 8 (prefork)
12
-- ******* ----
13
--- ***** ----- [queues]
14
-------------- .> celery exchange=celery(direct) key=celery
15
16
17
[tasks]
18
. tasks.tasks.add
19
. tasks.tasks.mul
20
. apps.celery.debug_task
21
22
[2019-11-11 15:35:21,437: INFO/MainProcess] Connected to amqp://rabbitadmin:**@127.0.0.1:5672//djcelery
23
[2019-11-11 15:35:21,465: INFO/MainProcess] mingle: searching for neighbors
24
[2019-11-11 15:35:22,494: INFO/MainProcess] mingle: all alone
启动beat服务端
1
#]~ python manage.py celery beat -A tasks -l info
2
celery beat v3.1.26.post2 (Cipater) is starting.
3
__ - ... __ - _
4
Configuration ->
5
. broker -> amqp://rabbitadmin:**@127.0.0.1:5672//djcelery
6
. loader -> celery.loaders.app.AppLoader
7
. scheduler -> djcelery.schedulers.DatabaseScheduler
8
9
. logfile -> [stderr]@%INFO
10
. maxinterval -> now (0s)
11
[2019-11-11 15:38:45,123: INFO/MainProcess] beat: Starting...
12
[2019-11-11 15:38:45,123: INFO/MainProcess] Writing entries (0)...
网页控制任务
- 任务执行查看
beat服务端1
[2019-11-11 15:47:45,259: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
2
[2019-11-11 15:47:45,260: INFO/MainProcess] Writing entries (1)...
3
[2019-11-11 15:47:50,259: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
4
[2019-11-11 15:47:55,260: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
5
[2019-11-11 15:48:00,260: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
6
[2019-11-11 15:48:05,261: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
7
[2019-11-11 15:48:10,261: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
8
[2019-11-11 15:48:15,262: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
9
[2019-11-11 15:48:20,263: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
10
[2019-11-11 15:48:25,263: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
11
[2019-11-11 15:48:30,264: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
12
[2019-11-11 15:48:35,264: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
13
[2019-11-11 15:48:40,265: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
14
[2019-11-11 15:48:45,266: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
worker服务端
1 | [2019-11-11 15:49:20,274: INFO/MainProcess] Task rest.tasks.add[fb38d77b-d6f0-40d3-bde3-0df454109d92] succeeded in 0.0012137379962950945s: 3 |
2 | [2019-11-11 15:49:25,273: INFO/MainProcess] Received task: rest.tasks.add[c67474bf-3c82-4f4f-ad9f-20ed055e0f39] |
3 | [2019-11-11 15:49:25,275: INFO/MainProcess] Task rest.tasks.add[c67474bf-3c82-4f4f-ad9f-20ed055e0f39] succeeded in 0.0011863320105476305s: 3 |
4 | [2019-11-11 15:49:30,273: INFO/MainProcess] Received task: rest.tasks.add[56120b88-07ef-47e1-8361-68ee08dff44f] |
5 | [2019-11-11 15:49:30,276: INFO/MainProcess] Task rest.tasks.add[56120b88-07ef-47e1-8361-68ee08dff44f] succeeded in 0.0012300249945838004s: 3 |
6 | [2019-11-11 15:49:35,274: INFO/MainProcess] Received task: rest.tasks.add[e22b54a2-0ca6-4c37-bd0a-3d23a7a4bae2] |
7 | [2019-11-11 15:49:35,277: INFO/MainProcess] Task rest.tasks.add[e22b54a2-0ca6-4c37-bd0a-3d23a7a4bae2] succeeded in 0.0015451840008608997s: 3 |
8 | [2019-11-11 15:49:40,275: INFO/MainProcess] Received task: rest.tasks.add[0ebcedc4-1c80-4a4c-8e43-d9023a5e37c0] |
9 | [2019-11-11 15:49:40,278: INFO/MainProcess] Task rest.tasks.add[0ebcedc4-1c80-4a4c-8e43-d9023a5e37c0] succeeded in 0.0013460790069075301s: 3 |
10 | [2019-11-11 15:49:45,276: INFO/MainProcess] Received task: rest.tasks.add[ac169f86-72e0-4ea0-9aa4-8f290dda64c1] |
11 | [2019-11-11 15:49:45,278: INFO/MainProcess] Task rest.tasks.add[ac169f86-72e0-4ea0-9aa4-8f290dda64c1] succeeded in 0.0012133710115449503s: 3 |
12 | [2019-11-11 15:49:50,277: INFO/MainProcess] Received task: rest.tasks.add[1ff6d6d1-e7ee-430e-ae64-e91f31dfab05] |
13 | [2019-11-11 15:49:50,279: INFO/MainProcess] Task rest.tasks.add[1ff6d6d1-e7ee-430e-ae64-e91f31dfab05] succeeded in 0.0012463609891710803s: 3 |
14 | [2019-11-11 15:49:55,277: INFO/MainProcess] Received task: rest.tasks.add[ed65976b-5621-42de-86cb-6ba1e53db78c] |
15 | [2019-11-11 15:49:55,279: INFO/MainProcess] Task rest.tasks.add[ed65976b-5621-42de-86cb-6ba1e53db78c] succeeded in 0.001093872997444123s: 3 |
16 | [2019-11-11 15:50:00,278: INFO/MainProcess] Received task: rest.tasks.add[d80f7a74-9e5f-4f03-92f0-cee6a9cfd4c6] |
17 | [2019-11-11 15:50:00,281: INFO/MainProcess] Task rest.tasks.add[d80f7a74-9e5f-4f03-92f0-cee6a9cfd4c6] succeeded in 0.0018399830005364493s: 3 |
18 | [2019-11-11 15:50:05,278: INFO/MainProcess] Received task: rest.tasks.add[41a64336-d764-401f-bc16-d7cc3bf1431e] |
19 | [2019-11-11 15:50:05,280: INFO/MainProcess] Task rest.tasks.add[41a64336-d764-401f-bc16-d7cc3bf1431e] succeeded in 0.001239712000824511s: 3 |
backend存储
1 | 337) "celery-task-meta-8a9a52fd-cd47-4988-854e-7e9e635faad7" |
2 | 338) "celery-task-meta-c09d2e9f-dca5-452b-97c0-454cc39536d3" |
3 | 339) "celery-task-meta-e0dc9c88-7a8d-4094-8746-2eea2820ee98" |
4 | 340) "celery-task-meta-898ed57b-a52f-4d39-a685-9fe0718356be" |
5 | 341) "celery-task-meta-6404ed17-e420-4423-b0d6-2f7e861a912c" |
6 | 342) "celery-task-meta-64ae32c3-5cff-4cf0-9590-17effc234978" |
7 | 343) "celery-task-meta-ab126140-b65b-40ff-accd-ec59d1597dc6" |
8 | 344) "celery-task-meta-d2fba024-8550-44bc-8d1b-d0ad7257e6bd" |
9 | 127.0.0.1:6379[1]> get celery-task-meta-d2fba024-8550-44bc-8d1b-d0ad7257e6bd |
10 | "{\"status\": \"SUCCESS\", \"result\": 3, \"traceback\": null, \"children\": []}" |
bug解决
backend为redis会出现报错,原因是库源代码写错了:
错误:
1 | redis.exceptions.ResponseError: Command # 1 (SETEX b’celery-task-meta-734b07fc-944d-4416-a4c8-61834880f345’ b’\x80\x02}q\x00(X\x06\x00\x00\x00statu |
2 | sq\x01X\x07\x00\x00\x00SUCCESSq\x02X\x06\x00\x00\x00resultq\x03K\x06X\t\x00\x00\x00tracebackq\x04NX\x08\x00\x00\x00childrenq\x05]q\x06u.’ 86400) of |
3 | pipeline caused error: value is not an integer or out of range |
库文件:
1 | venv/lib/python3.6/site-packages/celery/backends/redis.py |
2 | |
3 | |
4 | def _set(self, key, value): |
5 | with self.client.pipeline() as pipe: |
6 | if self.expires: |
7 | # pipe.setex(key, value, self.expires) 源文件 |
8 | pipe.setex(key, self.expires, value) # 修改后 |
9 | else: |
10 | pipe.set(key, value) |
11 | pipe.publish(key, value) |
12 | pipe.execute() |
13 | |
14 | # 方法定义,可以看出是源代码错误 |
15 | def setex(self, name, time, value): |
16 | """ |
17 | Set the value of key ``name`` to ``value`` that expires in ``time`` |
18 | seconds. ``time`` can be represented by an integer or a Python |
19 | timedelta object. |
20 | """ |
21 | if isinstance(time, datetime.timedelta): |
22 | time = int(time.total_seconds()) |
23 | return self.execute_command('SETEX', name, time, value) |