实现简单的队列通信
producer:
1 | import pika |
2 | |
3 | credential = pika.PlainCredentials('rabbitadmin','rabbitadmin') |
4 | connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', |
5 | port=5672, |
6 | virtual_host='/test', |
7 | credentials=credential)) |
8 | channel = connection.channel() |
9 | |
10 | #声明队列 |
11 | channel.queue_declare(queue='hello') |
12 | |
13 | channel.basic_publish(exchange='', |
14 | routing_key='hello', |
15 | body='Hello World!' |
16 | ) |
17 | print('[x] Send "Hello World!"') |
18 | connection.close() |
consumer:
1 | import pika |
2 | |
3 | credential = pika.PlainCredentials('rabbitadmin','rabbitadmin') |
4 | connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', |
5 | port=5672, |
6 | virtual_host='/test', |
7 | credentials=credential)) |
8 | channel = connection.channel() |
9 | |
10 | # 再次声明队列,确保队列存在 |
11 | channel.queue_declare(queue='hello') |
12 | |
13 | # 回调函数 |
14 | def callback(ch, method ,properties, body): |
15 | print('[x] Receive %s' % body) |
16 | |
17 | channel.basic_consume(callback, |
18 | queue='hello', |
19 | no_ack=True) |
20 | print('[*] wait for message.to exit press CTRL+C') |
21 | channel.start_consuming() |
Work Queues
RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多,可以开启多个consumer,publish发送的消息会round-robin被各个consumer接受。
producer:
1 | import pika |
2 | import sys |
3 | |
4 | credential = pika.PlainCredentials('rabbitadmin','rabbitadmin') |
5 | connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', |
6 | port=5672, |
7 | virtual_host='/test', |
8 | credentials=credential)) |
9 | channel = connection.channel() |
10 | channel.queue_declare(queue='hello',durable=True) #队列持久化,确保rabbitmq不会丢失队列 |
11 | |
12 | message = ' '.join(sys.argv[1:]) or "Hello World!" |
13 | channel.basic_publish(exchange='', |
14 | routing_key='hello', |
15 | body=message, |
16 | properties=pika.BasicProperties( |
17 | delivery_mode = 2 #make message persistent |
18 | ) |
19 | ) |
20 | print('[x] Send "%r"' % message) |
21 | connection.close() |
consumer:
1 | import pika |
2 | import time |
3 | |
4 | credential = pika.PlainCredentials('rabbitadmin','rabbitadmin') |
5 | connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', |
6 | port=5672, |
7 | virtual_host='/test', |
8 | credentials=credential)) |
9 | channel = connection.channel() |
10 | |
11 | # 再次声明队列,确保队列存在 |
12 | channel.queue_declare(queue='hello',durable=True) |
13 | |
14 | # 回调函数 |
15 | def callback(ch, method ,properties, body): |
16 | print('[x] Receive %s' % body) |
17 | time.sleep(body.count(b'.')) |
18 | print('[x] Done') |
19 | ch.basic_ack(delivery_tag = method.delivery_tag) # 消费端消费完消息,返回ack标识符 |
20 | |
21 | channel.basic_qos(prefetch_count=1) # 告诉rabbitmq一次给一个consumer不要超过1个消息 |
22 | channel.basic_consume(callback, |
23 | queue='hello') # no_ack默认为False |
24 | print('[*] wait for message.to exit press CTRL+C') |
25 | channel.start_consuming() |
以上代码可以实现当一个consumer未确认收到消息ack,rabbitmq将会选择另一个consumer来消费消息,直到收到ack消息。
发布订阅
之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,交换是件很简单的事。在一端从生产者那里收消息,并将它们推送到queue中。Exchange必须非常清楚的知道。他从生产者那里收到的消息,要发给谁? 他是应该被追加到一个具体的queue里,还是发送到多个queue里,或者它应该被丢弃。该规则由Exchange类型定义。
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息。(一共有四种类型)
1、fanout: 所有bind到此exchange的queue都可以接收消息 (给所有人发消息)
2、direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息 (给指定的一些queue发消息)
3、topic(话题):所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息 (给订阅话题的人发消息)
4、headers: 通过headers 来决定把消息发给哪些queue (通过消息头,决定发送给哪些队列)
fanout
应用场景:
例如:视频直播
例如:新浪微博
发送端发消息到exchange中,exchange会把消息放入队列中,客户端再到队列中收消息。
publish:
1 | import pika |
2 | import sys |
3 | |
4 | credential = pika.PlainCredentials('rabbitadmin','rabbitadmin') |
5 | connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', |
6 | port=5672, |
7 | virtual_host='/test', |
8 | credentials=credential)) |
9 | channel = connection.channel() |
10 | channel.exchange_declare(exchange='logs',exchange_type='fanout') #设置fanout交换机 |
11 | |
12 | message = ' '.join(sys.argv[1:]) or "info: Hello World!" |
13 | channel.basic_publish(exchange='logs', |
14 | routing_key='', |
15 | body=message) |
16 | print(" [x] Sent %r" % message) |
17 | connection.close() |
consumer:
1 | import pika |
2 | |
3 | credential = pika.PlainCredentials('rabbitadmin','rabbitadmin') |
4 | connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', |
5 | port=5672, |
6 | virtual_host='/test', |
7 | credentials=credential)) |
8 | channel = connection.channel() |
9 | |
10 | channel.exchange_declare(exchange='logs',exchange_type='fanout') |
11 | |
12 | result = channel.queue_declare(exclusive=True) #声明排他性队列 |
13 | queue_name = result.method.queue # 生成随机队列名 |
14 | |
15 | channel.queue_bind(exchange='logs',queue=queue_name) #绑定交换机与队列 |
16 | |
17 | print('[*] Waiting for logs. To exit press CTRL+C') |
18 | |
19 | def callback(ch,method,properties,body): |
20 | print('[x] %r' % body) |
21 | |
22 | channel.basic_consume(callback, |
23 | queue=queue_name, |
24 | no_ack=True) |
25 | channel.start_consuming() |
exclusive queue: 排他性队列,创建一个只有自己可见的队列,即不允许其它用户访问
特点:
- 只对首次声明它的连接(Connection)可见
- 会在其连接断开的时候自动删除
direct
rabbitmq direct会根据routing key和binding key来选者发往那个队列,发送者将数据根据关键字发送到消息exchange,exchange根据key来决定将数据发往指定的队列。
producer:
1 | #!/usr/bin/env python |
2 | # -*- coding=utf-8 -*- |
3 | |
4 | import pika |
5 | import sys |
6 | |
7 | credential = pika.PlainCredentials('rabbitadmin','rabbitadmin') |
8 | connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', |
9 | port=5672, |
10 | virtual_host='/test', |
11 | credentials=credential)) |
12 | channel = connection.channel() |
13 | |
14 | channel.exchange_declare(exchange="direct_logs", |
15 | exchange_type='direct') |
16 | |
17 | serverity = sys.argv[1] if len(sys.argv) > 2 else 'info' |
18 | message = ' '.join(sys.argv[2:]) or 'Hello World!' |
19 | exchange.basic_publish(exchange='direct_logs', |
20 | routing_key=serverity, |
21 | body=message) |
22 | print('[x] send %r:%r' % (serverity,message)) |
23 | connection.close() |
consumer:
1 | import pika |
2 | import sys |
3 | |
4 | credential = pika.PlainCredentials('rabbitadmin','rabbitadmin') |
5 | connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', |
6 | port=5672, |
7 | virtual_host='/test', |
8 | credentials=credential)) |
9 | channel = connection.channel() |
10 | |
11 | channel.exchange_declare(exchange='direct_logs', |
12 | exchange_type='direct') |
13 | |
14 | result = channel.queue_declare(exclusive=True) |
15 | queue_name = result.method.queue |
16 | |
17 | severities = sys.argv[1:] |
18 | |
19 | if not severities: |
20 | sys.stderr.write('Usage: %s [info] [warning] [error]\n' % sys.argv[0]) |
21 | sys.exit(1) |
22 | |
23 | for severity in severities: |
24 | channel.queue_bind(exchange='direct_logs', |
25 | queue=queue_name, |
26 | routing_key=severity) |
27 | |
28 | print('[*] Waiting for logs. To exit press CTRL+C') |
29 | |
30 | def callback(ch, method, properties, body): |
31 | print(' [x] %r:%r' % (method.routing_key, body)) |
32 | |
33 | channel.basic_consume(callback, |
34 | queue=queue_name, |
35 | no_ack=True) |
36 | |
37 | channel.start_consuming() |
topics
*: 可以代表任意一个词
#: 可以代表0个或任意个词
1 | import pika |
2 | import sys |
3 | |
4 | credential = pika.PlainCredentials('rabbitadmin','rabbitadmin') |
5 | connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', |
6 | port=5672, |
7 | virtual_host='/test', |
8 | credentials=credential)) |
9 | channel = connection.channel() |
10 | |
11 | channel.exchange_declare(exchange='topic_logs', |
12 | exchange_type='topic') |
13 | |
14 | routing_keys = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info' |
15 | message = ' '.join(sys.argv[2:]) or 'Hello World!' |
16 | |
17 | channel.baic_publish(exchange='topic_logs', |
18 | routing_key=routing_key, |
19 | body=message) |
20 | |
21 | print("[x] sent %r:%r" % (routing_key,message)) |
22 | connection.close() |
1 | import pika |
2 | import sys |
3 | |
4 | credential = pika.PlainCredentials('rabbitadmin','rabbitadmin') |
5 | connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', |
6 | port=5672, |
7 | virtual_host='/test', |
8 | credentials=credential)) |
9 | channel = connection.channel() |
10 | |
11 | channel.exchange_declare(exchange='topic_logs', |
12 | exchange_type='topic') |
13 | |
14 | result = channel.queue_declare(exclusive=True) |
15 | queue_name = result.method.queue |
16 | |
17 | binding_keys = sys.argv[1:] |
18 | |
19 | if not binding_keys: |
20 | sys.stderr.write('Usage: %s [binding_key]...\n' % sys.argv[0]) |
21 | sys.exit(1) |
22 | |
23 | for binding_key in binding_keys: |
24 | channel.queue_bind(exchange='topic_logs', |
25 | queue=queue_name, |
26 | routing_key=binding_key) |
27 | |
28 | print('[*] Waiting for logs. To exit press CTRL+C') |
29 | |
30 | def callback(ch,method,properties,body): |
31 | print('[x] %r:%r' % (method.routing_key,body)) |
32 | |
33 | channel.baic_consume(callback,queue=queue_name,no_ack=True) |
34 | channel.start_consuming() |
RPC
我们从图中可以看出整个rpc调用其实也是很简单的,首先客户端需要发送具备唯一标示属性的到rpc_queue,server端从rpc queue中接受队列消息,执行内部函数定义,将带有标示属性和结果返回到另一个队列,客户端接手这个队列的消息,检查唯一属性,如果一样,那么这个数据就是远程函数执行的结果。
rpc server
1 | import pika |
2 | import sys |
3 | |
4 | credential = pika.PlainCredentials('rabbitadmin','rabbitadmin') |
5 | connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', |
6 | port=5672, |
7 | virtual_host='/test', |
8 | credentials=credential)) |
9 | channel = connection.channel() |
10 | channel.queue_declare(queue='rpc_queue') |
11 | def fib(n): |
12 | if n == 0: |
13 | return 0 |
14 | elif n == 1: |
15 | return 1 |
16 | else: |
17 | return fib(n-1) + fib(n-2) |
18 | |
19 | def on_request(ch, method, properties, body): |
20 | n = int(body) |
21 | print(" [.] fib(%s)" % n) |
22 | response = fib(n) |
23 | |
24 | ch.basic_publish(exchange='', |
25 | routing_key = properties.reply_to, |
26 | properties = pika.BasicProperties(correlation_id = properties.correlation_id), |
27 | body=str(response)) |
28 | ch.basic_ack(delivery_tag = method.delivery_tag) |
29 | |
30 | channel.basic_qos(prefetch_count=1) |
31 | channel.basic_consume(on_request,queue='rpc_queue') |
32 | |
33 | print(" [x] Awaiting RPC requests") |
34 | channel.start_consuming() |
1 | import pika |
2 | import uuid |
3 | |
4 | class RpcClient(object): |
5 | def __init__(self): |
6 | credential = pika.PlainCredentials('rabbitadmin','rabbitadmin') |
7 | self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', |
8 | port=5672, |
9 | virtual_host='/test', |
10 | credentials=credential)) |
11 | |
12 | self.channel = self.connection.channel() |
13 | result = self.channel.queue_declare(exclusive=True) |
14 | self.callback_queue = result.method.queue |
15 | |
16 | self.channel.basic_consume(self.on_response,no_ack=True, |
17 | queue=self.callback_queue) |
18 | |
19 | def on_response(self,ch,method,props,body): |
20 | if self.corr_id == props.correlation_id: |
21 | self.response = body |
22 | |
23 | def call(self,n): |
24 | self.response = None |
25 | self.corr_id = str(uuid.uuid4()) |
26 | self.channel.basic_publish(exchange='', |
27 | routing_key='rpc_queue', |
28 | properties=pika.BasicProperties( |
29 | reply_to = self.callback_queue, |
30 | correlation_id = self.corr_id, |
31 | ), |
32 | body=str(n)) |
33 | while self.response is None: |
34 | self.connection.process_data_events() |
35 | return int(self.response) |
36 | |
37 | rpc = RpcClient() |
38 | print('[x] Requesting fib') |
39 | response = rpc.call(40) |
40 | print('[.] get %r' % response) |
扩展: 实现远程命令执行
RPCClient:
1 | import pika |
2 | import uuid |
3 | import sys |
4 | |
5 | class RPCClient(object): |
6 | def __init__(self): |
7 | credentials = pika.PlainCredentials('rabbitadmin', 'rabbitadmin') |
8 | self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', |
9 | port='5672', |
10 | virtual_host='/test', |
11 | credentials=credentials)) |
12 | self.channel = self.connection.channel() |
13 | result = self.channel.queue_declare(exclusive=True) |
14 | self.queue_name = self.method.queue |
15 | self.channel.basic_consume(self.on_response, queue=self.queue_name,no_ack=True) |
16 | def on_response(self, ch, method, properties, body): |
17 | if properties.correlation_id == self.jid: |
18 | self.response = body.decode() |
19 | def on_request(self, cmd): |
20 | self.jid = str(uuid.uuid4()) |
21 | self.response = None |
22 | self.channel.basic_publish(exchange='', |
23 | routing_key='rpc_command', |
24 | properties=pika.BaicProperties(reply_to=self.queue_name,correlation_id=self.jid), |
25 | body=str(cmd)) |
26 | while self.response is None: |
27 | self.connection.process_data_events() |
28 | return self.response |
29 | |
30 | if __name__ == "__main__": |
31 | client = RPCClient() |
32 | print('[x] Request cmd {}'.format(sys.argv[1])) |
33 | response = client.on_request('{}'.format(sys.argv[1])) |
34 | print('[.] response %s' % response) |
RPCServer:
1 | import pika |
2 | import subprocess |
3 | |
4 | class RPCServer(object): |
5 | def __init__(self): |
6 | credentials = pika.PlainCredentials('rabbitadmin', 'rabbitadmin') |
7 | self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', |
8 | port='5672', |
9 | virtual_host='/test', |
10 | credentials=credentials)) |
11 | self.channel = self.connection.channel() |
12 | self.channel.queue_declare(queue='rpc_command') |
13 | self.channel.basic_qos(prefetch_count=5) |
14 | self.channel.basic_consume(self.on_request,queue='rpc_command') |
15 | |
16 | def execute(self,cmd): |
17 | result = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) |
18 | if not result.stderr.read(): |
19 | return result.stdout.read() |
20 | else: |
21 | return result.stderr.read() |
22 | def on_request(self, ch, method, properties, body): |
23 | print('[.] jid: %s, execute %s' % (properties.correlation_id,body.decode())) |
24 | result = self.execute(body.decode()) |
25 | ch.basic_publish(exchange='', |
26 | routing_key=properties.reply_to, |
27 | properties=pika.BasicProperties(correlation_id=properties.correlation_id), |
28 | body=result) |
29 | ch.basic_ack(delivery_tag=method.delivery_tag) |
30 | def run(self): |
31 | print('[x] Waiting RPC Request:') |
32 | self.channel.start_consuming() |
33 | |
34 | if __name__ == "__main__": |
35 | server = RPCServer() |
36 | server.run() |