想做一个web界面的任务管理器,一直以来都是celery来做服务端,自己用脚本来发送异步请求,最近也在学习Django,就踩坑来试试django与celery的结合。
celery相关内容:
这里需要说明下,经测试选用celery3.x版本可以实现,而根据官网celery4.x版本没有结果返回。
官网文档:http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html
安装django
1 | pip install django |
1 | django-admin version |
2 | 2.2.6 |
创建project和app的过程省略
安装django-celery
1 | pip install django-celery |
2 | |
3 | 安装celery的版本:celery==3.1.26.post2 |
4 | 安装django-celery的版本:django-celery==3.3.1 |
配置
apps/settings.py
1import djcelery2djcelery.setup_loader()3...45INSTALLED_APPS = [6...7'djcelery',8]9...10BROKER_URL = "amqp://rabbitadmin:rabbitadmin@127.0.0.1:5672//djcelery"11CELERY_TIMEZONE = 'Asia/Shanghai'12CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/1"13CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'14CELERY_RESULT_SERIALIZER = 'json'15CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']apps/celery.py
1from __future__ import absolute_import, unicode_literals23import os4from celery import Celery5from django.conf import settings67os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'apps.settings')89app = Celery('tasks')10app.config_from_object('django.conf:settings')1112app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)13141516def debug_task(self):17print('Request: {0!r}'.format(self.request))测试启动celery
1~]# python manage.py celery worker -A apps -l info2-------------- celery@MacBook-Pro.local v3.1.26.post2 (Cipater)3---- **** -----4--- * *** * -- Darwin-18.7.0-x86_64-i386-64bit5-- * - **** ---6- ** ---------- [config]7- ** ---------- .> app: apps:0x1083718988- ** ---------- .> transport: amqp://rabbitadmin:**@127.0.0.1:5672//djcelery9- ** ---------- .> results: redis://127.0.0.1:6379/110- *** --- * --- .> concurrency: 8 (prefork)11-- ******* ----12--- ***** ----- [queues]13-------------- .> celery exchange=celery(direct) key=celery141516[tasks]17. apps.celery.debug_task1819[2019-11-11 15:22:17,460: INFO/MainProcess] Connected to amqp://rabbitadmin:**@127.0.0.1:5672//djcelery20[2019-11-11 15:22:17,472: INFO/MainProcess] mingle: searching for neighbors21[2019-11-11 15:22:18,498: INFO/MainProcess] mingle: all alone添加任务
django-celery的任务自需要在app下添加tasks.py即可,名字不可变,该定义在app.autodiscover_tasks(lambda :settings.INSTALLED_APPS)自动扫描生效
1 | django-admin startapp tasks |
2 | |
3 | tree tasks |
4 | tasks |
5 | ├── __init__.py |
6 | ├── __pycache__ |
7 | │ ├── __init__.cpython-36.pyc |
8 | │ ├── admin.cpython-36.pyc |
9 | │ ├── apps.cpython-36.pyc |
10 | │ ├── celery.cpython-36.pyc |
11 | │ └── models.cpython-36.pyc |
12 | ├── admin.py |
13 | ├── apps.py |
14 | ├── celery.py |
15 | ├── migrations |
16 | │ ├── __init__.py |
17 | │ └── __pycache__ |
18 | │ └── __init__.cpython-36.pyc |
19 | ├── models.py |
20 | ├── tests.py |
21 | └── views.py |
22 | |
23 | # 而且我们要在setting.py中注册该app |
1 | from __future__ import absolute_import, unicode_literals |
2 | from celery import shared_task |
3 | |
4 | |
5 | @shared_task |
6 | def add(x, y): |
7 | return x + y |
8 | |
9 | |
10 | @shared_task |
11 | def mul(x, y): |
12 | return x * y |
ORM写入数据库
1 | python manage.py migrate |
2 | |
3 | # 创建个超级用户 |
4 | python manage.py createsuperuser |
测试
启动worker服务端
1~]# python manage.py celery worker -A apps -l info23-------------- celery@MacBook-Pro.local v3.1.26.post2 (Cipater)4---- **** -----5--- * *** * -- Darwin-18.7.0-x86_64-i386-64bit6-- * - **** ---7- ** ---------- [config]8- ** ---------- .> app: apps:0x1083618289- ** ---------- .> transport: amqp://rabbitadmin:**@127.0.0.1:5672//djcelery10- ** ---------- .> results: redis://127.0.0.1:6379/111- *** --- * --- .> concurrency: 8 (prefork)12-- ******* ----13--- ***** ----- [queues]14-------------- .> celery exchange=celery(direct) key=celery151617[tasks]18. tasks.tasks.add19. tasks.tasks.mul20. apps.celery.debug_task2122[2019-11-11 15:35:21,437: INFO/MainProcess] Connected to amqp://rabbitadmin:**@127.0.0.1:5672//djcelery23[2019-11-11 15:35:21,465: INFO/MainProcess] mingle: searching for neighbors24[2019-11-11 15:35:22,494: INFO/MainProcess] mingle: all alone启动beat服务端
1#]~ python manage.py celery beat -A tasks -l info2celery beat v3.1.26.post2 (Cipater) is starting.3__ - ... __ - _4Configuration ->5. broker -> amqp://rabbitadmin:**@127.0.0.1:5672//djcelery6. loader -> celery.loaders.app.AppLoader7. scheduler -> djcelery.schedulers.DatabaseScheduler89. logfile -> [stderr]@%INFO10. maxinterval -> now (0s)11[2019-11-11 15:38:45,123: INFO/MainProcess] beat: Starting...12[2019-11-11 15:38:45,123: INFO/MainProcess] Writing entries (0)...网页控制任务


- 任务执行查看
beat服务端1[2019-11-11 15:47:45,259: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)2[2019-11-11 15:47:45,260: INFO/MainProcess] Writing entries (1)...3[2019-11-11 15:47:50,259: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)4[2019-11-11 15:47:55,260: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)5[2019-11-11 15:48:00,260: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)6[2019-11-11 15:48:05,261: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)7[2019-11-11 15:48:10,261: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)8[2019-11-11 15:48:15,262: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)9[2019-11-11 15:48:20,263: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)10[2019-11-11 15:48:25,263: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)11[2019-11-11 15:48:30,264: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)12[2019-11-11 15:48:35,264: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)13[2019-11-11 15:48:40,265: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)14[2019-11-11 15:48:45,266: INFO/MainProcess] Scheduler: Sending due task test (rest.tasks.add)
worker服务端
1 | [2019-11-11 15:49:20,274: INFO/MainProcess] Task rest.tasks.add[fb38d77b-d6f0-40d3-bde3-0df454109d92] succeeded in 0.0012137379962950945s: 3 |
2 | [2019-11-11 15:49:25,273: INFO/MainProcess] Received task: rest.tasks.add[c67474bf-3c82-4f4f-ad9f-20ed055e0f39] |
3 | [2019-11-11 15:49:25,275: INFO/MainProcess] Task rest.tasks.add[c67474bf-3c82-4f4f-ad9f-20ed055e0f39] succeeded in 0.0011863320105476305s: 3 |
4 | [2019-11-11 15:49:30,273: INFO/MainProcess] Received task: rest.tasks.add[56120b88-07ef-47e1-8361-68ee08dff44f] |
5 | [2019-11-11 15:49:30,276: INFO/MainProcess] Task rest.tasks.add[56120b88-07ef-47e1-8361-68ee08dff44f] succeeded in 0.0012300249945838004s: 3 |
6 | [2019-11-11 15:49:35,274: INFO/MainProcess] Received task: rest.tasks.add[e22b54a2-0ca6-4c37-bd0a-3d23a7a4bae2] |
7 | [2019-11-11 15:49:35,277: INFO/MainProcess] Task rest.tasks.add[e22b54a2-0ca6-4c37-bd0a-3d23a7a4bae2] succeeded in 0.0015451840008608997s: 3 |
8 | [2019-11-11 15:49:40,275: INFO/MainProcess] Received task: rest.tasks.add[0ebcedc4-1c80-4a4c-8e43-d9023a5e37c0] |
9 | [2019-11-11 15:49:40,278: INFO/MainProcess] Task rest.tasks.add[0ebcedc4-1c80-4a4c-8e43-d9023a5e37c0] succeeded in 0.0013460790069075301s: 3 |
10 | [2019-11-11 15:49:45,276: INFO/MainProcess] Received task: rest.tasks.add[ac169f86-72e0-4ea0-9aa4-8f290dda64c1] |
11 | [2019-11-11 15:49:45,278: INFO/MainProcess] Task rest.tasks.add[ac169f86-72e0-4ea0-9aa4-8f290dda64c1] succeeded in 0.0012133710115449503s: 3 |
12 | [2019-11-11 15:49:50,277: INFO/MainProcess] Received task: rest.tasks.add[1ff6d6d1-e7ee-430e-ae64-e91f31dfab05] |
13 | [2019-11-11 15:49:50,279: INFO/MainProcess] Task rest.tasks.add[1ff6d6d1-e7ee-430e-ae64-e91f31dfab05] succeeded in 0.0012463609891710803s: 3 |
14 | [2019-11-11 15:49:55,277: INFO/MainProcess] Received task: rest.tasks.add[ed65976b-5621-42de-86cb-6ba1e53db78c] |
15 | [2019-11-11 15:49:55,279: INFO/MainProcess] Task rest.tasks.add[ed65976b-5621-42de-86cb-6ba1e53db78c] succeeded in 0.001093872997444123s: 3 |
16 | [2019-11-11 15:50:00,278: INFO/MainProcess] Received task: rest.tasks.add[d80f7a74-9e5f-4f03-92f0-cee6a9cfd4c6] |
17 | [2019-11-11 15:50:00,281: INFO/MainProcess] Task rest.tasks.add[d80f7a74-9e5f-4f03-92f0-cee6a9cfd4c6] succeeded in 0.0018399830005364493s: 3 |
18 | [2019-11-11 15:50:05,278: INFO/MainProcess] Received task: rest.tasks.add[41a64336-d764-401f-bc16-d7cc3bf1431e] |
19 | [2019-11-11 15:50:05,280: INFO/MainProcess] Task rest.tasks.add[41a64336-d764-401f-bc16-d7cc3bf1431e] succeeded in 0.001239712000824511s: 3 |
backend存储
1 | 337) "celery-task-meta-8a9a52fd-cd47-4988-854e-7e9e635faad7" |
2 | 338) "celery-task-meta-c09d2e9f-dca5-452b-97c0-454cc39536d3" |
3 | 339) "celery-task-meta-e0dc9c88-7a8d-4094-8746-2eea2820ee98" |
4 | 340) "celery-task-meta-898ed57b-a52f-4d39-a685-9fe0718356be" |
5 | 341) "celery-task-meta-6404ed17-e420-4423-b0d6-2f7e861a912c" |
6 | 342) "celery-task-meta-64ae32c3-5cff-4cf0-9590-17effc234978" |
7 | 343) "celery-task-meta-ab126140-b65b-40ff-accd-ec59d1597dc6" |
8 | 344) "celery-task-meta-d2fba024-8550-44bc-8d1b-d0ad7257e6bd" |
9 | 127.0.0.1:6379[1]> get celery-task-meta-d2fba024-8550-44bc-8d1b-d0ad7257e6bd |
10 | "{\"status\": \"SUCCESS\", \"result\": 3, \"traceback\": null, \"children\": []}" |
bug解决
backend为redis会出现报错,原因是库源代码写错了:
错误:
1 | redis.exceptions.ResponseError: Command # 1 (SETEX b’celery-task-meta-734b07fc-944d-4416-a4c8-61834880f345’ b’\x80\x02}q\x00(X\x06\x00\x00\x00statu |
2 | sq\x01X\x07\x00\x00\x00SUCCESSq\x02X\x06\x00\x00\x00resultq\x03K\x06X\t\x00\x00\x00tracebackq\x04NX\x08\x00\x00\x00childrenq\x05]q\x06u.’ 86400) of |
3 | pipeline caused error: value is not an integer or out of range |
库文件:
1 | venv/lib/python3.6/site-packages/celery/backends/redis.py |
2 | |
3 | |
4 | def _set(self, key, value): |
5 | with self.client.pipeline() as pipe: |
6 | if self.expires: |
7 | # pipe.setex(key, value, self.expires) 源文件 |
8 | pipe.setex(key, self.expires, value) # 修改后 |
9 | else: |
10 | pipe.set(key, value) |
11 | pipe.publish(key, value) |
12 | pipe.execute() |
13 | |
14 | # 方法定义,可以看出是源代码错误 |
15 | def setex(self, name, time, value): |
16 | """ |
17 | Set the value of key ``name`` to ``value`` that expires in ``time`` |
18 | seconds. ``time`` can be represented by an integer or a Python |
19 | timedelta object. |
20 | """ |
21 | if isinstance(time, datetime.timedelta): |
22 | time = int(time.total_seconds()) |
23 | return self.execute_command('SETEX', name, time, value) |