Jusene's Blog

Celery 分布式任务队列(一)

字数统计: 2.4k阅读时长: 12 min
2018/08/05 Share

什么是任务队列

任务队列被用作在线程或机器之间分配工作的机制,任务队列的输入是一个被称为任务的工作单元,专用的worker进程不断监视任务队列以执行新的工作,而celery通过broker的机制调节client与worker之间的消息传递,要初始化一条消息,client需要向broker传递一条消息,celery负责将消息代理传递给woker执行。

简单的原理:

celery的执行过程也是异步过程,当需要执行的任务量巨大时,我们不能阻塞等待全部的任务的执行完成,而是只需要获得下发任务的唯一任务id,当需要的时候,我们可以通过这个任务id来获取我们需要的执行结果,所以这里也就需要一个中间件来存储任务结果,一般用的时rabbitmq或者redis,但经过自己的使用,redis来作为broker会出现不稳定的报错,所以还是建议rabbitmq使用。

celery的优点:

  • 简单:整个celery的框架工作流程明了,并不复杂
  • 高可用:worker和client将会重试连接失败或者失败的事件
  • 快速:一个单进程的celery每分钟可以处理上百万的任务
  • 灵活:提供了很强的扩展性,可以继承django等框架

安装celery

1
~]# pip install -U celery

rabbitmq

rabbitmq是celery默认的broker

1
app.conf.broker_url = "amqp://rabbitmq:rabbitmq@localhost:5672//test"
2
app.conf.result_backend = "amqp://rabbitmq:rabbitmq@localhost:5672//test"

redis

redis需要额外的redis库支持,所以需要额外的安装redis

1
pip install -U redis
2
pip install -U celery[redis]  #在安装celery的时候,安装以来redis
1
app.conf.broker_url = 'redis://:password@localhost:6379/0'
2
app.conf.broker_url = 'redis+socket:///path/to/redis.sock?virtual_host=db_number'
3
# 哨兵集群
4
app.conf.broker_url = 'sentinel://localhost:26379;sentinel://localhost:26380;sentinel://localhost:26381'
5
app.conf.broker_url = {"master_name": "cluster1"}
6
# 可见性超时时间,默认redis是1小时,在将消息重新发送到另一个worker之前等待worker确认的任务的秒数
7
app.conf.broker_transport_options = {'visibility_timeout': 3600}
8
# 存储result到backend
9
app.conf.result_backend = 'redis://localhost:6379/0'
10
# backend是哨兵集群
11
app.conf.result_backend = 'sentinel://localhost:26379;sentinel://localhost:26380;sentinel://localhost:26381'
12
app.conf.result_backend_transport_options = {'master_name': "mymaster"}

简单应用

1
from celery import Celery
2
3
app = Celery('tasks', broker='amqp://rabbitadmin:rabbitadmin@localhost:5672//test')
4
5
@app.task
6
def add(x,y):
7
    return x + y

启动celery worker server

1
~]# celery -A tasks worker -l info
2
3
 -------------- celery@ZGXMacBook-Pro.local v4.2.0rc4 (windowlicker)
4
---- **** ----- 
5
--- * ***  * -- Darwin-17.7.0-x86_64-i386-64bit 2018-08-06 16:43:28
6
-- * - **** --- 
7
- ** ---------- [config]
8
- ** ---------- .> app:         tasks:0x105e25390
9
- ** ---------- .> transport:   amqp://rabbitadmin:**@localhost:5672//test
10
- ** ---------- .> results:     disabled://
11
- *** --- * --- .> concurrency: 8 (prefork)
12
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
13
--- ***** ----- 
14
 -------------- [queues]
15
                .> celery           exchange=celery(direct) key=celery
16
                
17
18
[tasks]
19
  . ce.add
20
21
[2018-08-06 16:43:28,363: INFO/MainProcess] Connected to amqp://rabbitadmin:**@127.0.0.1:5672//test
22
[2018-08-06 16:43:28,375: INFO/MainProcess] mingle: searching for neighbors
23
[2018-08-06 16:43:29,413: INFO/MainProcess] mingle: all alone
24
[2018-08-06 16:43:29,428: INFO/MainProcess] celery@ZGXMacBook-Pro.local ready.

请求celery

1
from ce import add
2
3
add(4,4)   # local call
4
8     
5
6
add.delay(4,4)  # async call
7
<AsyncResult: 31f0e64f-56db-496f-a666-408491776e18>
8
9
add.delay(4,4).get()    # 因为没有配置result backend,所以无法获取异步执行结果
10
NotImplementedError: No result backend is configured.
11
Please see the documentation for more information.

配置result backend

1
from celery import Celery
2
3
app = Celery('tasks', broker='amqp://rabbitadmin:rabbitadmin@localhost:5672//test',backend='redis://localhost:6379/0')
4
5
@app.task
6
def add(x,y):
7
    return x + y

请求celery

1
from ce import add
2
3
add.delay(4,4).get()
4
8
5
6
result = add.delay(4,4)
7
result.ready()
8
True
9
result.get(timeout=5)
10
8
11
result.forget()
12
result.get()
13
14
result = add.delay(‘4’,4)
15
result.get(propagate=False)
16
TypeError('must be str, not int')
17
result.traceback       # 异常追踪

配置

celery的默认配置可以满足大多数用例,但是也是有许多配置可以按照需求进行配置的,可以直接在应用程序设置配置,也可以使用专用的配置模块。

1
app.config.task_serializer = 'json'

如果需要一次配置多个配置:

1
app.conf.update(
2
    task_serializer='json',
3
    accept_content='json',
4
    result_serializer='json',
5
    timezone='Asia/Shanghai',
6
)

对于大型项目建议直接使用专用的配置模块:

1
app.config_from_object('celeryconfig')

celeryconfig.py

1
broker_url = 'amqp://rabbitadmin:rabbitadmin@localhost:5672//test'
2
result_backend = 'redis://localhost:6379/0'
3
4
task_serializer='json'
5
accept_content='json'
6
result_serializer='json'
7
timezone='Asia/Shanghai'

将特定的行为的任务路由到专用的队列:

1
task_routes = {
2
    "tasks.add": "low"
3
}

限制任务执行的速率:

1
task_annotations = {
2
    "tasks.add": {"rate_limit": "10/m"}
3
}

如果broker使用的是rabbitmq或者redis,也可以直接设置worker的速率:

1
celery -A tasks control rate_limit ce.add 10/m
2
-> celery@ZGXMacBook-Pro.local: OK
3
        new rate limit set successfully

使用celery作为应用

1
proj/__init__.py
2
    /celery.py
3
    /tasks.py

proj/celery.py

1
from __future__ import absolute_import, unicode_literals
2
from celery import Celery
3
4
app = Celery('proj',
5
            broker = 'amqp://rabbitadmin:rabbitadmin@localhost:5672//test',
6
            backend = 'redis://localhost:6379/0',
7
            include=['proj.tasks'])
8
9
app.conf.update(
10
    result_expires=3600,
11
)
12
13
if __name__ == "__main__":
14
    app.start()

proj/tasks.py

1
from __future__ import absolute_import, unicode_literals
2
from .celery import app
3
4
@app.task
5
def add(x, y):
6
    return x + y
7
8
@app.task
9
def mul(x, y):
10
    return x * y 
11
12
@app.task
13
def xsum(numbers):
14
    return sum(numbers)

启动app

1
~]# celery -A proj worker -l info
2
 
3
 -------------- celery@ZGXMacBook-Pro.local v4.2.1 (windowlicker)
4
---- **** ----- 
5
--- * ***  * -- Darwin-17.7.0-x86_64-i386-64bit 2018-08-06 22:27:39
6
-- * - **** --- 
7
- ** ---------- [config]
8
- ** ---------- .> app:         proj:0x10afaf048
9
- ** ---------- .> transport:   amqp://rabbitadmin:**@localhost:5672//test
10
- ** ---------- .> results:     redis://localhost:6379/0
11
- *** --- * --- .> concurrency: 8 (prefork)
12
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
13
--- ***** ----- 
14
 -------------- [queues]
15
                .> celery           exchange=celery(direct) key=celery
16
                
17
18
[tasks]
19
  . proj.tasks.add
20
  . proj.tasks.mul
21
  . proj.tasks.xsum
22
23
[2018-08-06 22:27:40,062: INFO/MainProcess] Connected to amqp://rabbitadmin:**@127.0.0.1:5672//test
24
[2018-08-06 22:27:40,076: INFO/MainProcess] mingle: searching for neighbors
25
[2018-08-06 22:27:41,113: INFO/MainProcess] mingle: all alone
26
[2018-08-06 22:27:41,128: INFO/MainProcess] celery@ZGXMacBook-Pro.local ready.

后台启动

1
~]# celery multi start app1 -A proj -l info 
2
celery multi v4.2.1 (windowlicker)
3
> Starting nodes...
4
        > app1@ZGXMacBook-Pro.local: OK
5
~]# celery multi restart app1 -A proj -l info
6
~]# celery multi stop app1 -A proj -l info
7
~]# celery multi stopwait app1 -A proj -l info  # 取保所有执行的tasks已完成才退出

注意: celery multi不存储关于worker的任何信息,所以当我们需要重启的时候相同的命令行选项,当停止的时候需要相同的pidfile和logfile,所以好的习惯是记得创建pidfile和logfile

1
~]# celery multi start app2 -A proj -l info --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n%I.log

multi命令提供为不同的worker指定参数的方法:

1
~]# celery multi start 10 -A proj -l info -Q:1-3 images,video -Q:4,5 data -Q default -L:4-5 debug

调用task

1
from proj import tasks
2
3
tasks.add.delay(1,2)
4
tasks.add.apply_async((1,2))
5
tasks.add.apply_async((1,2), queue='test', countdown=10) # 指定队列,延迟10秒后发送
6
tasks.add.apply_async((2,2), link=other_task.s()) # link选项可以将前一个apply_async的执行结果,作为其他任务的arg选项签名,fork出子进程执行
7
tasks.add.apply_async((2,2), link=add.s(8)) 
8
tasks.add.apply_async((2,2), link=other_task(immutable=True)) # immutable 执行结果不采用,不可变签名
9
tasks.add(1,2)  # 本地调用
10
11
res = add.apply_async((2,2), link=add.s(10))
12
res.get()
13
4
14
res.children[0].get()
15
14
16
list(res.collect())
17
[(<AsyncResult: 19e1af56-9880-4696-8de4-23724e4521f8>, 4),
18
 (<AsyncResult: 6718daf1-aaa9-4ce3-9c5a-124d829d4c54>, 14)]
19
20
s=add.s(2,2)
21
s.link(mul.s(4))  
22
23
# 错误处理
24
add.s(2,2).on_error(log_error.s()).delay()
25
ass.apply_async((2,2), link_error=log_error.s())

一个任务只能在单一的状态,这个状态可以是:

1
PENDING -> STARTED -> SUCCESS

设计工作流

签名以一种方式封装单个任务调用的参数和执行选项,以便可以将其传递给函数,甚至序列化并通过网络发送。

1
from proj import tasks
2
3
s1 = tasks.add.s((2,2), countdown=10)
4
res = si.delay()
5
res.get()
6
4
7
8
~s1 # 等于s1.delay().get()
9
4
10
11
# incomplete partial
12
s2 = tasks.add.s(2) # tasks.add(?, 2)
13
res = s2.delay(8)
14
res.get()
15
10
  • sig.apply_async(args=(), kwargs={}, **options)
  • sig.delay(args, *kwargs)

Groups

1
from celery import group
2
from proj.tasks import add
3
4
group(add.s(i,i) for i in range(10))().get()
5
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
6
7
group(add.s(i,i) for i in range(10))
8
group([proj.tasks.add(0, 0), add(1, 1), add(2, 2), add(3, 3), add(4, 4), add(5, 5), add(6, 6), add(7, 7), add(8, 8), add(9, 9)])
9
10
group(add.s(i,i) for i in range(10))()
11
<GroupResult: 1783520b-0b09-41cb-9fd5-924ba8c2fb66 [0ca7b0e7-5e4f-4494-aaf9-88cc73bc7e34, 58ce55c7-5d60-426b-a638-e9c8fe0232d2, c5c6741c-b4b5-4910-84d8-6e46bb8fa9e8, e26c0a3d-ff2b-41a7-b75e-6a3a9a0f86a5, 5f6f1068-0149-418f-8549-f1242ac52f6e, eaf2c00f-4861-4454-b13f-91063dbe09d4, f51526a6-4f16-44cc-8848-e30028e72f03, e66614fa-da40-4251-b223-e2c0440bb647, 6f042fca-38c7-4c8d-b8f9-db5b000f45b9, 891f9415-6c61-4900-99c0-dee7a72a5ce9]>
12
13
# Partial group
14
g = group(add.s(i) for i in range(10))
15
g(10).get()
16
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

Chains

1
from celery import chain
2
from proj.tasks import add, mul
3
4
# (4+4) * 8
5
chain(add.s(4, 4) | mul.s(8))().get()
6
# (16+4)*8
7
chain(add.s(4) | mul.s(8))(16).get()
8
# ((4+16)*2+4)*8
9
chain(add.s(4,16) | mul.s(2) | (add.s(4) | mul.s(8))).delay().get()
10
# 不可变签名
11
res = (add.si(2,2) | mul.si(5,5) | add.si(1,1)).apply_async()
12
res.get()
13
2
14
res.parent.get()
15
25
16
res.parent.parent.get()
17
4

Chords

1
from celery import chord
2
from proj.tasks import add,xsum
3
4
chord((add.s(i,i) for i in range(10)), xsum.s())().get()  # 当组内的任务执行完成,回调给后面的任务,前面任务返回的是一个列表
5
90
6
7
chord((add.s(i,i) for i in range(10)), xsum.si(10))().get() # 不可变签名,前面的回调直接忽略
8
10
9
10
# 错误处理
11
@app.task
12
def on_chord_error(request, exc, traceback):
13
    print('Task {0} raised error: {1}'.format(request.id, exc))
14
15
res = (group(add.s(i, i) for i in range(10)) | xsum.s().on_error(on_chord_error.s()))).delay()

Graphs

1
res = chain(add.s(4,4), mul.s(8), mul.s(10)).apply_async()
2
res.parent.parent.graph
3
99adc762-a879-43d8-8941-3769678df678(2)
4
     ef90cf6a-471b-4021-87db-03feed054916(1)
5
          764d630d-8183-463f-8fac-d7132acca01e(0)
6
ef90cf6a-471b-4021-87db-03feed054916(1)
7
     764d630d-8183-463f-8fac-d7132acca01e(0)
8
764d630d-8183-463f-8fac-d7132acca01e(0)

Map&Starmap

1
add.starmap(zip(range(10), range(10))).apply_async(countdown=10).get()
2
# 等于
3
group(add.s(i, i) for i in range(10)).apply_async(countdown=10).get()
4
# 或者
5
~add.starmap(zip(range(10), range(10)))

Chunks

1
add.chunks(zip(range(100), range(100)), 10).apply_async().get()
2
[[0, 2, 4, 6, 8, 10, 12, 14, 16, 18],
3
 [20, 22, 24, 26, 28, 30, 32, 34, 36, 38],
4
 [40, 42, 44, 46, 48, 50, 52, 54, 56, 58],
5
 [60, 62, 64, 66, 68, 70, 72, 74, 76, 78],
6
 [80, 82, 84, 86, 88, 90, 92, 94, 96, 98],
7
 [100, 102, 104, 106, 108, 110, 112, 114, 116, 118],
8
 [120, 122, 124, 126, 128, 130, 132, 134, 136, 138],
9
 [140, 142, 144, 146, 148, 150, 152, 154, 156, 158],
10
 [160, 162, 164, 166, 168, 170, 172, 174, 176, 178],
11
 [180, 182, 184, 186, 188, 190, 192, 194, 196, 198]]
12
# 或者
13
~add.chunks(zip(range(100), range(100)), 10)
CATALOG
  1. 1. 什么是任务队列
  2. 2. 安装celery
    1. 2.1. rabbitmq
    2. 2.2. redis
  3. 3. 简单应用
  4. 4. 配置
  5. 5. 使用celery作为应用
  6. 6. 设计工作流