Jusene's Blog

rabbitmq python实践

字数统计: 2.3k阅读时长: 11 min
2018/05/24 Share

实现简单的队列通信

producer:

1
import pika
2
3
credential = pika.PlainCredentials('rabbitadmin','rabbitadmin')
4
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1',
5
                                                                port=5672,
6
                                                                virtual_host='/test',
7
                                                                credentials=credential))
8
channel = connection.channel()
9
10
#声明队列
11
channel.queue_declare(queue='hello')
12
13
channel.basic_publish(exchange='',
14
                    routing_key='hello',
15
                    body='Hello World!'
16
                    )
17
print('[x] Send "Hello World!"')
18
connection.close()

consumer:

1
import pika
2
3
credential = pika.PlainCredentials('rabbitadmin','rabbitadmin')
4
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1',
5
                                                                port=5672,
6
                                                                virtual_host='/test',
7
                                                                credentials=credential))
8
channel = connection.channel()
9
10
# 再次声明队列,确保队列存在
11
channel.queue_declare(queue='hello')
12
13
# 回调函数
14
def callback(ch, method ,properties, body):
15
    print('[x] Receive %s' % body)
16
17
channel.basic_consume(callback,
18
                    queue='hello',
19
                    no_ack=True)
20
print('[*] wait for message.to exit press CTRL+C')
21
channel.start_consuming()

Work Queues

RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多,可以开启多个consumer,publish发送的消息会round-robin被各个consumer接受。

producer:

1
import pika
2
import sys
3
4
credential = pika.PlainCredentials('rabbitadmin','rabbitadmin')
5
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1',
6
                                                                port=5672,
7
                                                                virtual_host='/test',
8
                                                                credentials=credential))
9
channel = connection.channel()
10
channel.queue_declare(queue='hello',durable=True)   #队列持久化,确保rabbitmq不会丢失队列
11
12
message = ' '.join(sys.argv[1:]) or "Hello World!"
13
channel.basic_publish(exchange='',
14
                    routing_key='hello',
15
                    body=message,
16
                    properties=pika.BasicProperties(
17
                        delivery_mode = 2 #make message persistent
18
                    )
19
                    )
20
print('[x] Send "%r"' % message)
21
connection.close()

consumer:

1
import pika
2
import time
3
4
credential = pika.PlainCredentials('rabbitadmin','rabbitadmin')
5
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1',
6
                                                                port=5672,
7
                                                                virtual_host='/test',
8
                                                                credentials=credential))
9
channel = connection.channel()
10
11
# 再次声明队列,确保队列存在
12
channel.queue_declare(queue='hello',durable=True)
13
14
# 回调函数
15
def callback(ch, method ,properties, body):
16
    print('[x] Receive %s' % body)
17
    time.sleep(body.count(b'.'))
18
    print('[x] Done')
19
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 消费端消费完消息,返回ack标识符
20
21
channel.basic_qos(prefetch_count=1)   # 告诉rabbitmq一次给一个consumer不要超过1个消息
22
channel.basic_consume(callback,
23
                    queue='hello')  # no_ack默认为False
24
print('[*] wait for message.to exit press CTRL+C')
25
channel.start_consuming()

以上代码可以实现当一个consumer未确认收到消息ack,rabbitmq将会选择另一个consumer来消费消息,直到收到ack消息。

发布订阅

之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,交换是件很简单的事。在一端从生产者那里收消息,并将它们推送到queue中。Exchange必须非常清楚的知道。他从生产者那里收到的消息,要发给谁? 他是应该被追加到一个具体的queue里,还是发送到多个queue里,或者它应该被丢弃。该规则由Exchange类型定义。

Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息。(一共有四种类型)

1、fanout: 所有bind到此exchange的queue都可以接收消息 (给所有人发消息)
2、direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 (给指定的一些queue发消息)
3、topic(话题):所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息 (给订阅话题的人发消息)
4、headers: 通过headers 来决定把消息发给哪些queue (通过消息头,决定发送给哪些队列)

fanout

应用场景:
例如:视频直播
例如:新浪微博

发送端发消息到exchange中,exchange会把消息放入队列中,客户端再到队列中收消息。

publish:

1
import pika
2
import sys
3
4
credential = pika.PlainCredentials('rabbitadmin','rabbitadmin')
5
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1',
6
                                                                port=5672,
7
                                                                virtual_host='/test',
8
                                                                credentials=credential))
9
channel = connection.channel()
10
channel.exchange_declare(exchange='logs',exchange_type='fanout')  #设置fanout交换机
11
12
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
13
channel.basic_publish(exchange='logs',
14
                      routing_key='',
15
                      body=message)
16
print(" [x] Sent %r" % message)
17
connection.close()

consumer:

1
import pika
2
3
credential = pika.PlainCredentials('rabbitadmin','rabbitadmin')
4
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1',
5
                                                                port=5672,
6
                                                                virtual_host='/test',
7
                                                                credentials=credential))
8
channel = connection.channel()
9
10
channel.exchange_declare(exchange='logs',exchange_type='fanout')
11
12
result = channel.queue_declare(exclusive=True) #声明排他性队列
13
queue_name = result.method.queue        # 生成随机队列名
14
15
channel.queue_bind(exchange='logs',queue=queue_name) #绑定交换机与队列
16
17
print('[*] Waiting for logs. To exit press CTRL+C')
18
19
def callback(ch,method,properties,body):
20
    print('[x] %r' % body)
21
22
channel.basic_consume(callback,
23
                        queue=queue_name,
24
                        no_ack=True)
25
channel.start_consuming()

exclusive queue: 排他性队列,创建一个只有自己可见的队列,即不允许其它用户访问
特点:

  1. 只对首次声明它的连接(Connection)可见
  2. 会在其连接断开的时候自动删除

direct

rabbitmq direct会根据routing key和binding key来选者发往那个队列,发送者将数据根据关键字发送到消息exchange,exchange根据key来决定将数据发往指定的队列。

producer:

1
#!/usr/bin/env python
2
# -*- coding=utf-8 -*-
3
4
import pika
5
import sys
6
7
credential = pika.PlainCredentials('rabbitadmin','rabbitadmin')
8
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1',
9
                                                                port=5672,
10
                                                                virtual_host='/test',
11
                                                                credentials=credential))
12
channel = connection.channel()
13
14
channel.exchange_declare(exchange="direct_logs",
15
                        exchange_type='direct')
16
17
serverity = sys.argv[1] if len(sys.argv) > 2 else 'info'
18
message = ' '.join(sys.argv[2:]) or 'Hello World!'
19
exchange.basic_publish(exchange='direct_logs',
20
                        routing_key=serverity,
21
                        body=message)
22
print('[x] send %r:%r' % (serverity,message))
23
connection.close()

consumer:

1
import pika
2
import sys
3
4
credential = pika.PlainCredentials('rabbitadmin','rabbitadmin')
5
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1',
6
                                                                port=5672,
7
                                                                virtual_host='/test',
8
                                                                credentials=credential))
9
channel = connection.channel()
10
11
channel.exchange_declare(exchange='direct_logs',
12
                        exchange_type='direct')
13
14
result = channel.queue_declare(exclusive=True)
15
queue_name = result.method.queue
16
17
severities = sys.argv[1:]
18
19
if not severities:
20
    sys.stderr.write('Usage: %s [info] [warning] [error]\n' % sys.argv[0])
21
    sys.exit(1)
22
23
for severity in severities:
24
    channel.queue_bind(exchange='direct_logs',
25
                        queue=queue_name,
26
                        routing_key=severity)
27
28
print('[*] Waiting for logs. To exit press CTRL+C')
29
30
def callback(ch, method, properties, body):
31
    print(' [x] %r:%r' % (method.routing_key, body))
32
33
channel.basic_consume(callback,
34
                    queue=queue_name,
35
                    no_ack=True)
36
37
channel.start_consuming()

topics

*: 可以代表任意一个词
#: 可以代表0个或任意个词

1
import pika
2
import sys
3
4
credential = pika.PlainCredentials('rabbitadmin','rabbitadmin')
5
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1',
6
                                                                port=5672,
7
                                                                virtual_host='/test',
8
                                                                credentials=credential))
9
channel = connection.channel()
10
11
channel.exchange_declare(exchange='topic_logs',
12
                        exchange_type='topic')
13
14
routing_keys = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
15
message = ' '.join(sys.argv[2:]) or 'Hello World!'
16
17
channel.baic_publish(exchange='topic_logs',
18
                    routing_key=routing_key,
19
                    body=message)
20
21
print("[x] sent %r:%r" % (routing_key,message))
22
connection.close()
1
import pika
2
import sys
3
4
credential = pika.PlainCredentials('rabbitadmin','rabbitadmin')
5
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1',
6
                                                                port=5672,
7
                                                                virtual_host='/test',
8
                                                                credentials=credential))
9
channel = connection.channel()
10
11
channel.exchange_declare(exchange='topic_logs',
12
                        exchange_type='topic')
13
14
result = channel.queue_declare(exclusive=True)
15
queue_name = result.method.queue
16
17
binding_keys = sys.argv[1:]
18
19
if not binding_keys:
20
    sys.stderr.write('Usage: %s [binding_key]...\n' % sys.argv[0])
21
    sys.exit(1)
22
23
for binding_key in binding_keys:
24
    channel.queue_bind(exchange='topic_logs',
25
                        queue=queue_name,
26
                        routing_key=binding_key)
27
28
print('[*] Waiting for logs. To exit press CTRL+C')
29
30
def callback(ch,method,properties,body):
31
    print('[x] %r:%r' % (method.routing_key,body))
32
33
channel.baic_consume(callback,queue=queue_name,no_ack=True)
34
channel.start_consuming()

RPC

我们从图中可以看出整个rpc调用其实也是很简单的,首先客户端需要发送具备唯一标示属性的到rpc_queue,server端从rpc queue中接受队列消息,执行内部函数定义,将带有标示属性和结果返回到另一个队列,客户端接手这个队列的消息,检查唯一属性,如果一样,那么这个数据就是远程函数执行的结果。

rpc server

1
import pika
2
import sys
3
4
credential = pika.PlainCredentials('rabbitadmin','rabbitadmin')
5
connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1',
6
                                                                port=5672,
7
                                                                virtual_host='/test',
8
                                                                credentials=credential))
9
channel = connection.channel()
10
channel.queue_declare(queue='rpc_queue')
11
def fib(n):
12
    if n == 0:
13
        return 0
14
    elif n == 1:
15
        return 1
16
    else:
17
        return fib(n-1) + fib(n-2)
18
19
def on_request(ch, method, properties, body):
20
    n = int(body)
21
    print(" [.] fib(%s)" % n)
22
    response = fib(n)
23
24
    ch.basic_publish(exchange='',
25
                    routing_key = properties.reply_to,
26
                    properties = pika.BasicProperties(correlation_id = properties.correlation_id),
27
                    body=str(response))
28
    ch.basic_ack(delivery_tag = method.delivery_tag)
29
30
channel.basic_qos(prefetch_count=1)
31
channel.basic_consume(on_request,queue='rpc_queue')
32
33
print(" [x] Awaiting RPC requests")
34
channel.start_consuming()
1
import pika
2
import uuid
3
4
class RpcClient(object):
5
    def __init__(self):
6
        credential = pika.PlainCredentials('rabbitadmin','rabbitadmin')
7
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1',
8
                                                                port=5672,
9
                                                                virtual_host='/test',
10
                                                                credentials=credential))
11
12
        self.channel = self.connection.channel()
13
        result = self.channel.queue_declare(exclusive=True)
14
        self.callback_queue = result.method.queue
15
16
        self.channel.basic_consume(self.on_response,no_ack=True,
17
                                    queue=self.callback_queue)
18
19
    def on_response(self,ch,method,props,body):
20
        if self.corr_id == props.correlation_id:
21
            self.response = body
22
23
    def call(self,n):
24
        self.response = None
25
        self.corr_id = str(uuid.uuid4())
26
        self.channel.basic_publish(exchange='',
27
                                    routing_key='rpc_queue',
28
                                    properties=pika.BasicProperties(
29
                                        reply_to = self.callback_queue,
30
                                        correlation_id = self.corr_id,
31
                                    ),
32
                                    body=str(n))
33
        while self.response is None:
34
            self.connection.process_data_events()
35
        return int(self.response)
36
37
rpc = RpcClient()
38
print('[x] Requesting fib')
39
response = rpc.call(40)
40
print('[.] get %r' % response)

扩展: 实现远程命令执行

RPCClient:

1
import pika
2
import uuid
3
import sys
4
5
class RPCClient(object):
6
    def __init__(self):
7
        credentials = pika.PlainCredentials('rabbitadmin', 'rabbitadmin')
8
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1',
9
                                                               port='5672',
10
                                                               virtual_host='/test',
11
                                                               credentials=credentials))
12
        self.channel = self.connection.channel()
13
        result = self.channel.queue_declare(exclusive=True)
14
        self.queue_name = self.method.queue
15
        self.channel.basic_consume(self.on_response, queue=self.queue_name,no_ack=True)
16
    def on_response(self, ch, method, properties, body):
17
        if properties.correlation_id == self.jid:
18
            self.response = body.decode()
19
    def on_request(self, cmd):
20
        self.jid  = str(uuid.uuid4())
21
        self.response = None
22
        self.channel.basic_publish(exchange='',
23
                                    routing_key='rpc_command',
24
                                    properties=pika.BaicProperties(reply_to=self.queue_name,correlation_id=self.jid),
25
                                    body=str(cmd))
26
        while self.response is None:
27
            self.connection.process_data_events()
28
        return self.response
29
30
if __name__ == "__main__":
31
    client = RPCClient()
32
    print('[x] Request cmd {}'.format(sys.argv[1]))
33
    response = client.on_request('{}'.format(sys.argv[1]))
34
    print('[.] response %s' % response)

RPCServer:

1
import pika
2
import subprocess
3
4
class RPCServer(object):
5
    def __init__(self):
6
        credentials = pika.PlainCredentials('rabbitadmin', 'rabbitadmin')
7
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1',
8
                                                               port='5672',
9
                                                               virtual_host='/test',
10
                                                               credentials=credentials))
11
        self.channel = self.connection.channel()
12
        self.channel.queue_declare(queue='rpc_command')
13
        self.channel.basic_qos(prefetch_count=5)
14
        self.channel.basic_consume(self.on_request,queue='rpc_command')
15
16
    def execute(self,cmd):
17
        result = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
18
        if not result.stderr.read():
19
            return result.stdout.read()
20
        else:
21
            return result.stderr.read()
22
    def on_request(self, ch, method, properties, body):
23
        print('[.] jid: %s, execute %s' % (properties.correlation_id,body.decode()))
24
        result = self.execute(body.decode())
25
        ch.basic_publish(exchange='',
26
                        routing_key=properties.reply_to,
27
                        properties=pika.BasicProperties(correlation_id=properties.correlation_id),
28
                        body=result)
29
        ch.basic_ack(delivery_tag=method.delivery_tag)
30
    def run(self):
31
        print('[x] Waiting RPC Request:')
32
        self.channel.start_consuming()
33
34
if __name__ == "__main__":
35
    server = RPCServer()
36
    server.run()
CATALOG
  1. 1. 实现简单的队列通信
  2. 2. Work Queues
  3. 3. 发布订阅
    1. 3.1. fanout
    2. 3.2. direct
    3. 3.3. topics
  4. 4. RPC