Jusene's Blog

Celery 分布式任务队列(二)

字数统计: 1.7k阅读时长: 9 min
2018/08/17 Share

启动celery worker server

前台启动:

1
~]# celery -A proj worker -l info

可以在一台机器上启动多个worker server,但是必须通过节点名称来来命名worker,参数是–hostname:

1
~]# celery worker -A proj --loglevel=info --concurrency=10 -n worker1@%h
2
~]# celery worker -A proj --loglevel=info --concurrency=10 -n worker2@%h
  • %p: 节点全名
  • %h: 主机名,包含域名
  • %n: 仅主机名
  • %d: 仅域名
  • %i: prefork进程池索引号,主进程为0
  • %I: 带分隔符的prefork进程池索引

使用supervisor时使用:%%h

重启:

1
~]# celery multi start 1 -A proj -l info -c4 --pidfile=/var/run/celery/%n.pid
2
~]# celery multi restart 1 --pidfile=/var/run/celery/%n.pid # 将会发送TERM信号并且开始一个新实例

远程控制

1
from celery import Celery
2
3
app = Celery('proj',
4
                broker = 'amqp://rabbitadmin:rabbitadmin@localhost:5672//test',
5
                backend = 'redis://localhost:6379/0')
6
7
app.control.broadcast('rate_limit', arguments={'task_name': 'proj.tasks.add', 'rate_limit': '200/m'}) # without reply
8
app.control.broadcast('rate_limit', arguments={'task_name': 'proj.tasks.add', 'rate_limit': '200/m'}, reply=True) # with reply
9
app.control.broadcast('rate_limit', arguments={'task_name': 'proj.tasks.add', 'rate_limit': '200/m'}, reply=True, destination=['worker1@ZGXMacBook-Pro.local'])

取消任务

命令行:

1
~]# celery -A proj control revoke <task_id>
1
res = add.delay(2,2)
2
res
3
<AsyncResult: c48ff814-ded7-42d1-878c-8493f14b9ce5>
4
res.revoke()
5
6
app.control.revoke('c48ff814-ded7-42d1-878c-8493f14b9ce5') # 取消任务,但是不会中断该任务的执行
7
app.control.revoke('c48ff814-ded7-42d1-878c-8493f14b9ce5', terminate=True) # 取消任务,会中断该任务的执行, 发送SIGTERM信号
8
app.control.revoke('c48ff814-ded7-42d1-878c-8493f14b9ce5', terminate=True, signal='SIGKILL') # 取消任务,会中断该任务的执行,发送SIGKILL信号
9
app.control.revoke(['c48ff814-ded7-42d1-878c-8493f14b9ce5',
10
                    'f565793e-b041-4b2b-9ca4-dca22762a55d'])

保存状态文件:

1
~]# celery -A proj worker -l info --statedb=/var/run/celery/%n.state

注意: 远程控制命令只支持broker是rabbitmq和redis

超时设置

时间限制是执行任务的进程被终止并被新进程替换之前运行的最大秒数,还可以启动软时间限制,这会引发一个异常,任务可以在硬时间限制杀死它之前捕获并且清理掉。

1
from celery import Celery
2
from celery.exceptions import SoftTimeLimitExceeded
3
4
app = Celery('proj',
5
                broker = 'amqp://rabbitadmin:rabbitadmin@localhost:5672//test',
6
                backend = 'redis://localhost:6379/0')
7
8
@app.task
9
def mytask()
10
    try:
11
        do_work()
12
    except SoftTimeLimitExceeded:
13
        clean_up_in_hurry()
1
app.control.time_limit('proj.tasks.add', soft=60, hard=120, reply=True)

队列设置

默认的队列是celery,改变默认的队列,启动foo,bar队列

1
~]# celery worker -A proj -l info -Q foo,bar

增加消费:

1
~]# celery -A proj control add_consumer foo  #订阅foo队列
2
~]# celery -A proj control add_consumer foo -d worker1@%h
1
app.control.add_consumer('foo', reply=True)
2
app.control.add_consumer('foo', reply=True, destination=['worker1@ZGXMacBook-Pro.local'])
1
app.control.add_consumer(
2
    queue='bar',
3
    exchange='ex',
4
    exchange_type='topic',
5
    routing_key='media.#',
6
    options={
7
        'queue_durable': False,
8
        'exchange_durable': False,
9
    },
10
    reply=True,
11
    destination=['worker1@ZGXMacBook-Pro.local']
12
)

取消订阅队列:

1
~]# celery -A proj control cancel_consumer foo  #退订foo队列
2
~]# celery -A proj control cancel_consumer foo -d worker1@%h
1
app.control.cancel_consumer('foo', reply=True)
2
app.control.cancel_consumer('foo', reply=True, destination=['worker1@ZGXMacBook-Pro.local'])

命令

列出活跃的队列:

1
~]# celery -A proj inspect active_queues
2
~]# celery -A proj inspect active_queues -d worker1@ZGXMacBook-Pro.local
1
app.control.inspect().active_queues()
2
app.control.inspect(['worker1@ZGXMacBook-Pro.local']).active_queues()

检查worker:

1
i = app.control.inspect()  # 检查所有的节点
2
i = app.control.inspect(['worker1@ZGXMacBook-Pro.local'])

列出注册的任务:

1
i.registered()
2
{'worker1@ZGXMacBook-Pro.local': ['proj.tasks.add',
3
  'proj.tasks.mul',
4
  'proj.tasks.xsum'],
5
 'worker2@ZGXMacBook-Pro.local': ['proj.tasks.add [rate_limit=200/m]',
6
  'proj.tasks.mul',
7
  'proj.tasks.xsum']}

列出当前正在执行的任务:

1
i.active()
2
{'worker1@ZGXMacBook-Pro.local': [], 'worker2@ZGXMacBook-Pro.local': []}

列出countdown任务:

1
i.scheduled()
2
{'worker1@ZGXMacBook-Pro.local': [],
3
 'worker2@ZGXMacBook-Pro.local': [{'eta': '2018-08-20T05:37:23.531805+00:00',
4
   'priority': 6,
5
   'request': {'acknowledged': False,
6
    'args': '(2, 2)',
7
    'delivery_info': {'exchange': '',
8
     'priority': None,
9
     'redelivered': False,
10
     'routing_key': 'foo'},
11
    'hostname': 'worker2@ZGXMacBook-Pro.local',
12
    'id': '1e4573f6-09e8-4425-8535-7a38104e4600',
13
    'kwargs': '{}',
14
    'name': 'proj.tasks.add',
15
    'time_start': None,
16
    'type': 'proj.tasks.add',
17
    'worker_pid': None}}]}

列出保留任务:
接受到任务但是未执行

1
i.reserved()

额外的命令:

1
app.control.broadcast('shutdown')
2
app.control.ping(timeout=0.5)
3
app.control.enable_events()
4
app.control.disable_events()

计划任务

1
from celery import Celery
2
from celery.schedules import crontab
3
4
app = Celery('cron',
5
            broker = 'amqp://rabbitadmin:rabbitadmin@localhost:5672//test',
6
            backend = 'redis://localhost:6379/0')
7
8
@app.on_after_configure.connect
9
def setup_periodic_tasks(sender, **kwargs):
10
    sender.add_periodic_task(10.0, test.s('hello'), name='print every 10')
11
    sender.add_periodic_task(10.0, test.s('world'), name='every 10')
12
    sender.add_periodic_task(
13
        crontab(hour=14, minute=30),
14
        test.s('Happy everyday!')
15
    )
16
app.conf.timezone = 'Asia/Shanghai'
17
18
@app.task
19
def test(arg):
20
    print(arg)
1
~]# celery worker -A schdu -l info -B

应用组合:

1
from __future__ import absolute_import, unicode_literals
2
from celery import Celery
3
from celery.schedules import crontab 
4
5
app = Celery('proj',
6
            broker = 'amqp://rabbitadmin:rabbitadmin@localhost:5672//test',
7
            backend = 'redis://localhost:6379/0',
8
            include=['proj.tasks'])
9
10
app.conf.update(
11
    result_expires=3600,
12
)
13
14
app.conf.beat_schedule = { 
15
    'add-eveny-30': {
16
        'task': 'proj.tasks.add',
17
        'schedule': 30.0,
18
        'args': (6,6)
19
    },  
20
    'mul-on-crontab': {
21
        'task': 'proj.tasks.mul',
22
        'schedule': crontab(hour=15, minute=36, day_of_week=1),
23
        'args': (16,16),
24
    },  
25
}
26
27
app.conf.timezone = 'Asia/Shanghai'

任务路由

最简单的任务路由是使用task_create_missing_queues配置,当这个配置为on,一个在task_queues配置的被命名的队列将会被自动创建。

1
task_routes = {'feed.tasks.import_feed': {'queue': 'feeds'}}

可以使用通配符来匹配:

1
app.conf.task_routes = {'feed.tasks.*': {'queue': 'feeds'}}

multi queue:

1
task_routes = ([
2
    ('feed.tasks.*', {'queue': 'feeds'}),
3
    ('web.tasks.*', {'queue': 'web'}),
4
    (re.compile(r'(video|image)\.tasks\..*'), {'queue': 'media'}),
5
],)

改变默认队列:

1
app.conf.task_default_queue = 'default'

手动路由:

1
from kombu import Queue
2
3
app.conf.task_default_queue = 'default'
4
app.conf.task_queues = (
5
    Queue('default', routing_key='task.#'),
6
    Queue('feed_tasks', routing_key='feed.#'),
7
    Queue('image_tasks',   exchange=Exchange('mediatasks', type='direct'),
8
                           routing_key='image.compress'),
9
)
10
task_default_exchange = 'tasks'
11
task_default_exchange_type = 'topic'
12
task_default_routing_key = 'task.default'

路由任务:

1
task_routes = {
2
    'feeds.tasks.import_feed': {
3
        'queue': 'feed_tasks',
4
        'routing_key': 'feed.import',
5
    }
6
}
1
from feeds.tasks import import_feed
2
3
import_feed.apply_async((2,2), queue='feed_tasks', routing_key='feed.import')

rabbitmq api接口:

1
~]# celery -A proj amqp
2
-> connecting to amqp://rabbitadmin:**@localhost:5672//test.
3
-> connected.
4
1> exchange.declare testexchange direct
5
ok.
6
2> queue.declare testqueue
7
ok. queue:testqueue messages:0 consumers:0.
8
3> queue.bind testqueue testexchange testkey
9
()
10
5> basic.publish 'this is a message' testexchange testkey
11
<promise@0x10dbcbc18>
12
6> basic.get testqueue
13
{'body': 'this is a message',
14
 'delivery_info': {'delivery_tag': 1,
15
                   'exchange': 'testexchange',
16
                   'message_count': 0,
17
                   'redelivered': False,
18
                   'routing_key': 'testkey'},
19
 'properties': {'content_encoding': 'utf-8'}}
20
7> basic.ack 1
21
<promise@0x10dbcbc18>
22
8> queue.delete testqueue
23
ok. 0 messages deleted.
24
9> exchange.delete testexchange
25
()

监控和管理

命令:

  • shell: 进入一个python shell celery -A proj shell
  • status: 列出集群中活跃的节点 celery -A proj status
  • result: 展示一个任务的结果 celery -A proj result -t tasks.add 4e196aa4-0141-4601-8138-7aa33db0f577
  • purge: 清理消息
    • 清空全部消息 celery -A proj purge
    • 清空指定队列的消息 celery -A proj purge -Q celery,foo
    • 清空除了指定队列消息 celery -A proj purge -X celery
  • inspect active: 列出活跃的任务 celery -A proj inspect active
  • inspect scheduled; 列出ETA任务 celery -A proj inspect scheduled
  • inspect reserved; 列出接收到但未执行的任务,除了ETA任务 celery -A proj inspect reserved
  • inspect revoked: 列出取消任务的历史记录 celery -A proj inspect revoked
  • inspect registered: 列出被注册的任务 celery -A proj inspect registered
  • inspect query_task; 通过任务id展示关于任务的消息 celery -A proj inspect query_task e9f6c8f0-fec9-4ae8-a8c6-cf8c8451d4f8
  • control enable_events: 事件可用 celery -A proj control enable_events
  • control disable_events: 事件不可用 celery -A proj control disable_events
  • migrate: 从一个broker迁移任务到另一个broker celery -A proj migrate redis://localhost amqp://localhost

实时web监控:

1
pip install flower
2
celery -A proj flower
3
celery -A proj flower --port=5555
4
celery flower --broker=amqp://rabbitadmin:**@localhost:5672//test
1
celery -A proj events
CATALOG
  1. 1. 启动celery worker server
  2. 2. 远程控制
  3. 3. 取消任务
  4. 4. 超时设置
  5. 5. 队列设置
  6. 6. 命令
  7. 7. 计划任务
  8. 8. 任务路由
  9. 9. 监控和管理