启动celery worker server
前台启动:
可以在一台机器上启动多个worker server,但是必须通过节点名称来来命名worker,参数是–hostname:
- %p: 节点全名
- %h: 主机名,包含域名
- %n: 仅主机名
- %d: 仅域名
- %i: prefork进程池索引号,主进程为0
- %I: 带分隔符的prefork进程池索引
使用supervisor时使用:%%h
重启:
远程控制
1 | from celery import Celery |
2 |
|
3 | app = Celery('proj', |
4 | broker = 'amqp://rabbitadmin:rabbitadmin@localhost:5672//test', |
5 | backend = 'redis://localhost:6379/0') |
6 |
|
7 | app.control.broadcast('rate_limit', arguments={'task_name': 'proj.tasks.add', 'rate_limit': '200/m'}) |
8 | app.control.broadcast('rate_limit', arguments={'task_name': 'proj.tasks.add', 'rate_limit': '200/m'}, reply=True) |
9 | app.control.broadcast('rate_limit', arguments={'task_name': 'proj.tasks.add', 'rate_limit': '200/m'}, reply=True, destination=['worker1@ZGXMacBook-Pro.local']) |
取消任务
命令行:
1 | res = add.delay(2,2) |
2 | res |
3 | <AsyncResult: c48ff814-ded7-42d1-878c-8493f14b9ce5> |
4 | res.revoke() |
5 |
|
6 | app.control.revoke('c48ff814-ded7-42d1-878c-8493f14b9ce5') |
7 | app.control.revoke('c48ff814-ded7-42d1-878c-8493f14b9ce5', terminate=True) |
8 | app.control.revoke('c48ff814-ded7-42d1-878c-8493f14b9ce5', terminate=True, signal='SIGKILL') |
9 | app.control.revoke(['c48ff814-ded7-42d1-878c-8493f14b9ce5', |
10 | 'f565793e-b041-4b2b-9ca4-dca22762a55d']) |
保存状态文件:
注意: 远程控制命令只支持broker是rabbitmq和redis
超时设置
时间限制是执行任务的进程被终止并被新进程替换之前运行的最大秒数,还可以启动软时间限制,这会引发一个异常,任务可以在硬时间限制杀死它之前捕获并且清理掉。
1 | from celery import Celery |
2 | from celery.exceptions import SoftTimeLimitExceeded |
3 |
|
4 | app = Celery('proj', |
5 | broker = 'amqp://rabbitadmin:rabbitadmin@localhost:5672//test', |
6 | backend = 'redis://localhost:6379/0') |
7 |
|
8 | @app.task |
9 | def mytask() |
10 | try: |
11 | do_work() |
12 | except SoftTimeLimitExceeded: |
13 | clean_up_in_hurry() |
1 | app.control.time_limit('proj.tasks.add', soft=60, hard=120, reply=True) |
队列设置
默认的队列是celery,改变默认的队列,启动foo,bar队列
增加消费:
1 | app.control.add_consumer('foo', reply=True) |
2 | app.control.add_consumer('foo', reply=True, destination=['worker1@ZGXMacBook-Pro.local']) |
1 | app.control.add_consumer( |
2 | queue='bar', |
3 | exchange='ex', |
4 | exchange_type='topic', |
5 | routing_key='media.#', |
6 | options={ |
7 | 'queue_durable': False, |
8 | 'exchange_durable': False, |
9 | }, |
10 | reply=True, |
11 | destination=['worker1@ZGXMacBook-Pro.local'] |
12 | ) |
取消订阅队列:
1 | app.control.cancel_consumer('foo', reply=True) |
2 | app.control.cancel_consumer('foo', reply=True, destination=['worker1@ZGXMacBook-Pro.local']) |
命令
列出活跃的队列:
1 | app.control.inspect().active_queues() |
2 | app.control.inspect(['worker1@ZGXMacBook-Pro.local']).active_queues() |
检查worker:
1 | i = app.control.inspect() |
2 | i = app.control.inspect(['worker1@ZGXMacBook-Pro.local']) |
列出注册的任务:
1 | i.registered() |
2 | {'worker1@ZGXMacBook-Pro.local': ['proj.tasks.add', |
3 | 'proj.tasks.mul', |
4 | 'proj.tasks.xsum'], |
5 | 'worker2@ZGXMacBook-Pro.local': ['proj.tasks.add [rate_limit=200/m]', |
6 | 'proj.tasks.mul', |
7 | 'proj.tasks.xsum']} |
列出当前正在执行的任务:
1 | i.active() |
2 | {'worker1@ZGXMacBook-Pro.local': [], 'worker2@ZGXMacBook-Pro.local': []} |
列出countdown任务:
1 | i.scheduled() |
2 | {'worker1@ZGXMacBook-Pro.local': [], |
3 | 'worker2@ZGXMacBook-Pro.local': [{'eta': '2018-08-20T05:37:23.531805+00:00', |
4 | 'priority': 6, |
5 | 'request': {'acknowledged': False, |
6 | 'args': '(2, 2)', |
7 | 'delivery_info': {'exchange': '', |
8 | 'priority': None, |
9 | 'redelivered': False, |
10 | 'routing_key': 'foo'}, |
11 | 'hostname': 'worker2@ZGXMacBook-Pro.local', |
12 | 'id': '1e4573f6-09e8-4425-8535-7a38104e4600', |
13 | 'kwargs': '{}', |
14 | 'name': 'proj.tasks.add', |
15 | 'time_start': None, |
16 | 'type': 'proj.tasks.add', |
17 | 'worker_pid': None}}]} |
列出保留任务:
接受到任务但是未执行
额外的命令:
1 | app.control.broadcast('shutdown') |
2 | app.control.ping(timeout=0.5) |
3 | app.control.enable_events() |
4 | app.control.disable_events() |
计划任务
1 | from celery import Celery |
2 | from celery.schedules import crontab |
3 |
|
4 | app = Celery('cron', |
5 | broker = 'amqp://rabbitadmin:rabbitadmin@localhost:5672//test', |
6 | backend = 'redis://localhost:6379/0') |
7 |
|
8 | @app.on_after_configure.connect |
9 | def setup_periodic_tasks(sender, **kwargs): |
10 | sender.add_periodic_task(10.0, test.s('hello'), name='print every 10') |
11 | sender.add_periodic_task(10.0, test.s('world'), name='every 10') |
12 | sender.add_periodic_task( |
13 | crontab(hour=14, minute=30), |
14 | test.s('Happy everyday!') |
15 | ) |
16 | app.conf.timezone = 'Asia/Shanghai' |
17 |
|
18 | @app.task |
19 | def test(arg): |
20 | print(arg) |
应用组合:
1 | from __future__ import absolute_import, unicode_literals |
2 | from celery import Celery |
3 | from celery.schedules import crontab |
4 |
|
5 | app = Celery('proj', |
6 | broker = 'amqp://rabbitadmin:rabbitadmin@localhost:5672//test', |
7 | backend = 'redis://localhost:6379/0', |
8 | include=['proj.tasks']) |
9 |
|
10 | app.conf.update( |
11 | result_expires=3600, |
12 | ) |
13 |
|
14 | app.conf.beat_schedule = { |
15 | 'add-eveny-30': { |
16 | 'task': 'proj.tasks.add', |
17 | 'schedule': 30.0, |
18 | 'args': (6,6) |
19 | }, |
20 | 'mul-on-crontab': { |
21 | 'task': 'proj.tasks.mul', |
22 | 'schedule': crontab(hour=15, minute=36, day_of_week=1), |
23 | 'args': (16,16), |
24 | }, |
25 | } |
26 |
|
27 | app.conf.timezone = 'Asia/Shanghai' |
任务路由
最简单的任务路由是使用task_create_missing_queues配置,当这个配置为on,一个在task_queues配置的被命名的队列将会被自动创建。
1 | task_routes = {'feed.tasks.import_feed': {'queue': 'feeds'}} |
可以使用通配符来匹配:
1 | app.conf.task_routes = {'feed.tasks.*': {'queue': 'feeds'}} |
multi queue:
1 | task_routes = ([ |
2 | ('feed.tasks.*', {'queue': 'feeds'}), |
3 | ('web.tasks.*', {'queue': 'web'}), |
4 | (re.compile(r'(video|image)\.tasks\..*'), {'queue': 'media'}), |
5 | ],) |
改变默认队列:
1 | app.conf.task_default_queue = 'default' |
手动路由:
1 | from kombu import Queue |
2 |
|
3 | app.conf.task_default_queue = 'default' |
4 | app.conf.task_queues = ( |
5 | Queue('default', routing_key='task.#'), |
6 | Queue('feed_tasks', routing_key='feed.#'), |
7 | Queue('image_tasks', exchange=Exchange('mediatasks', type='direct'), |
8 | routing_key='image.compress'), |
9 | ) |
10 | task_default_exchange = 'tasks' |
11 | task_default_exchange_type = 'topic' |
12 | task_default_routing_key = 'task.default' |
路由任务:
1 | task_routes = { |
2 | 'feeds.tasks.import_feed': { |
3 | 'queue': 'feed_tasks', |
4 | 'routing_key': 'feed.import', |
5 | } |
6 | } |
1 | from feeds.tasks import import_feed |
2 |
|
3 | import_feed.apply_async((2,2), queue='feed_tasks', routing_key='feed.import') |
rabbitmq api接口:
1 | ~] |
2 | -> connecting to amqp://rabbitadmin:**@localhost:5672//test. |
3 | -> connected. |
4 | 1> exchange.declare testexchange direct |
5 | ok. |
6 | 2> queue.declare testqueue |
7 | ok. queue:testqueue messages:0 consumers:0. |
8 | 3> queue.bind testqueue testexchange testkey |
9 | () |
10 | 5> basic.publish 'this is a message' testexchange testkey |
11 | <promise@0x10dbcbc18> |
12 | 6> basic.get testqueue |
13 | {'body': 'this is a message', |
14 | 'delivery_info': {'delivery_tag': 1, |
15 | 'exchange': 'testexchange', |
16 | 'message_count': 0, |
17 | 'redelivered': False, |
18 | 'routing_key': 'testkey'}, |
19 | 'properties': {'content_encoding': 'utf-8'}} |
20 | 7> basic.ack 1 |
21 | <promise@0x10dbcbc18> |
22 | 8> queue.delete testqueue |
23 | ok. 0 messages deleted. |
24 | 9> exchange.delete testexchange |
25 | () |
监控和管理
命令:
- shell: 进入一个python shell
celery -A proj shell
- status: 列出集群中活跃的节点
celery -A proj status
- result: 展示一个任务的结果
celery -A proj result -t tasks.add 4e196aa4-0141-4601-8138-7aa33db0f577
- purge: 清理消息
- 清空全部消息
celery -A proj purge
- 清空指定队列的消息
celery -A proj purge -Q celery,foo
- 清空除了指定队列消息
celery -A proj purge -X celery
- inspect active: 列出活跃的任务
celery -A proj inspect active
- inspect scheduled; 列出ETA任务
celery -A proj inspect scheduled
- inspect reserved; 列出接收到但未执行的任务,除了ETA任务
celery -A proj inspect reserved
- inspect revoked: 列出取消任务的历史记录
celery -A proj inspect revoked
- inspect registered: 列出被注册的任务
celery -A proj inspect registered
- inspect query_task; 通过任务id展示关于任务的消息
celery -A proj inspect query_task e9f6c8f0-fec9-4ae8-a8c6-cf8c8451d4f8
- control enable_events: 事件可用
celery -A proj control enable_events
- control disable_events: 事件不可用
celery -A proj control disable_events
- migrate: 从一个broker迁移任务到另一个broker
celery -A proj migrate redis://localhost amqp://localhost
实时web监控:
1 | pip install flower |
2 | celery -A proj flower |
3 | celery -A proj flower --port=5555 |
4 | celery flower --broker=amqp://rabbitadmin:**@localhost:5672//test |