RabbitMQ 简介
RabbitMQ 是一个开源的消息代理和队列服务器,用于通过轻量级的消息在分布式系统中的节点之间进行通信。它实现了先进的消息队列协议(AMQP)。
RabbitMQ 的核心原则是“生产者永远不会直接将消息发送到队列。相反,生产者始终将消息发送到exchange。”。消费者可以通过订阅或绑定到exchange来接收消息。
RabbitMQ 的主要特点
- 可靠性:RabbitMQ 提供持久性、传递确认、发布确认和高可用性等各种特性来支持消息的可靠传递。
- 弹性及可扩展性:RabbitMQ 可以进行分布式部署,支持消息路由、负载均衡和队列长度限制等特性,从而可适应大规模的消息处理需求。
- 安全性:插件架构使得 RabbitMQ 可以提供 SSL 和 SASL 安全认证,实现加密和访问控制。
- 多种协议支持:RabbitMQ 不仅支持 AMQP,还可以通过插件支持 STOMP、MQTT 和 HTTP 等其它协议。
- 多种语言客户端:有 C,.NET,Java,Python 以及 Ruby 等的客户端支持。
- 使用方便:管理工具或 HTTP API 用于管理和监视 RabbitMQ 服务器。
使用 Docker Compose 部署一个 RabbitMQ 服务
docker-compose. yml 配置如下:
version: '3'
services:
rabbitmq:
image: rabbitmq:management
container_name: rabbitmq
restart: always
ports:
- 5672:5672
- 15672:15672
environment:
RABBITMQ_DEFAULT_VHOST: '/'
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: adminpassword
使用 docker-compose up -d
启动
这时候可以在浏览器输入: http://localhost:15672
进入 RabbitMQ 的管理后台
实现消息的订阅消费
创建一个RabbitMQ消息消费者
import pika
class RabbitMQReceiver:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', 5672, '/', pika.PlainCredentials('admin', 'adminpassword')))
self.channel = self.connection.channel()
self.channel.exchange_declare(exchange='exchange1', exchange_type='topic', durable=True)
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.queue_bind(exchange='exchange1', queue=self.callback_queue, routing_key='test_queue1')
def callback(self, ch, method, properties, body):
print(" [x] Received %r" % body)
def start_consuming(self):
self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
self.channel.start_consuming()
if __name__ == '__main__':
rabbitmq_receiver = RabbitMQReceiver()
rabbitmq_receiver.start_consuming()
在通过RabbitMQ进行消息传递时,交换机起着至关重要的分发消息的角色。
这个方法有3个参数:
- exchange='exchange1':这是你要声明的交换机的名称,在这个例子中是'exchange1'。
- exchange_type='topic':这是交换机的类型,可以是direct, topic, fanout, headers,默认是direct。在这里,我们设置了交换机类型为'topic'。
- direct类型的交换机会根据消息的routing_key与binding_key进行完全匹配,若两者相等则将消息路由到对应的队列。
- topic类型的交换机会根据routing_key与binding_key进行模糊匹配,支持两种通配符“”和“#”,其中“”用来匹配一个单词,“#”用来匹配零个或多个单词。
- fanout类型的交换机会忽略routing_key,将消息路由到所有与它绑定的队列中。
- headers类型的交换机不依赖routing_key,而是根据发送的消息内容中的headers属性进行匹配。 - durable=True:这个参数决定了交换机的持久性。如果设置为True,当RabbitMQ服务重启后,该交换机依然存在。如果设置为False,当没有队列或交换机与此交换机绑定就会自动删除。
实现消息的发送
import pika
import json
class RabbitMQSender():
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost', 5672, '/', pika.PlainCredentials('admin', 'adminpassword')))
self.channel = self.connection.channel()
def send(self, message):
self.channel.exchange_declare(exchange='exchange1', exchange_type='topic', durable=True)
self.channel.queue_declare(queue='test_queue1', durable=True)
self.channel.queue_bind(exchange='exchange1', queue='test_queue1', routing_key='test_queue')
self.channel.basic_publish(exchange='exchange1', routing_key='test_queue1', body=message)
if __name__ == '__main__':
data = {'key': 'value', "a": 1}
message = json.dumps(data)
rabbitmq_sender = RabbitMQSender()
rabbitmq_sender.send(message)
print(" [x] Sent 'Hello World!'")
持久化
要在RabbitMQ中实现消息的持久化,可以通过以下两步来做:
- 队列持久化:在声明队列时,将”durable”参数设置为True。这样可以保证RabbitMQ重启后,队列还会存在。例如:
channel.queue_declare(queue='hello', durable=True)
这表明我们想确保队列“hello”是持久化的。
- 消息持久化:在发布消息时,通过将”properties”参数设置为pika.BasicProperties(delivery_mode = 2),可以使消息成为持久的。这里’delivery_mode’为2即代表消息持久化。例如:
channel.basic_publish(exchange='',
routing_key='hello',
body=message,
properties=pika.BasicProperties(delivery_mode = 2))
这表明我们想确保即使RabbitMQ服务重启后,消息仍然不会丢失。
一定要注意,虽然这样设置了,但消息的持久化还是不能100%地保证。因为当RabbitMQ接收到消息并且还没有保存就发生了崩溃,这种情况下,消息还是会丢失的。如果想要做到强一致性,你就需要将 RabbitMQ 的 publisher 可靠性(confirm) 和事务机制 组合使用。
发表回复