Python 使用 RabbitMQ

RabbitMQ 简介

RabbitMQ 是一个开源的消息代理和队列服务器,用于通过轻量级的消息在分布式系统中的节点之间进行通信。它实现了先进的消息队列协议(AMQP)。
RabbitMQ 的核心原则是“生产者永远不会直接将消息发送到队列。相反,生产者始终将消息发送到exchange。”。消费者可以通过订阅或绑定到exchange来接收消息。

RabbitMQ 的主要特点

  • 可靠性:RabbitMQ 提供持久性、传递确认、发布确认和高可用性等各种特性来支持消息的可靠传递。
  • 弹性及可扩展性:RabbitMQ 可以进行分布式部署,支持消息路由、负载均衡和队列长度限制等特性,从而可适应大规模的消息处理需求。
  • 安全性:插件架构使得 RabbitMQ 可以提供 SSL 和 SASL 安全认证,实现加密和访问控制。
  • 多种协议支持:RabbitMQ 不仅支持 AMQP,还可以通过插件支持 STOMP、MQTT 和 HTTP 等其它协议。
  • 多种语言客户端:有 C,.NET,Java,Python 以及 Ruby 等的客户端支持。
  • 使用方便:管理工具或 HTTP API 用于管理和监视 RabbitMQ 服务器。

使用 Docker Compose 部署一个 RabbitMQ 服务

docker-compose. yml 配置如下:

version: '3'
services:
  rabbitmq:
    image: rabbitmq:management
    container_name: rabbitmq
    restart: always
    ports:
      - 5672:5672
      - 15672:15672
    environment:
      RABBITMQ_DEFAULT_VHOST: '/'
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: adminpassword

使用 docker-compose up -d 启动

这时候可以在浏览器输入: http://localhost:15672 进入 RabbitMQ 的管理后台
image.png

实现消息的订阅消费

创建一个RabbitMQ消息消费者

import pika

class RabbitMQReceiver:
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost', 5672, '/', pika.PlainCredentials('admin', 'adminpassword')))
        self.channel = self.connection.channel()
        self.channel.exchange_declare(exchange='exchange1', exchange_type='topic', durable=True)
        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue
        self.channel.queue_bind(exchange='exchange1', queue=self.callback_queue, routing_key='test_queue1')

    def callback(self, ch, method, properties, body):
        print(" [x] Received %r" % body)

    def start_consuming(self):
        self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.callback, auto_ack=True)
        print(' [*] Waiting for messages. To exit press CTRL+C')
        self.channel.start_consuming()


if __name__ == '__main__':
    rabbitmq_receiver = RabbitMQReceiver()
    rabbitmq_receiver.start_consuming()

在通过RabbitMQ进行消息传递时,交换机起着至关重要的分发消息的角色。

这个方法有3个参数:

  • exchange='exchange1':这是你要声明的交换机的名称,在这个例子中是'exchange1'。
  • exchange_type='topic':这是交换机的类型,可以是direct, topic, fanout, headers,默认是direct。在这里,我们设置了交换机类型为'topic'。
    - direct类型的交换机会根据消息的routing_key与binding_key进行完全匹配,若两者相等则将消息路由到对应的队列。
    - topic类型的交换机会根据routing_key与binding_key进行模糊匹配,支持两种通配符“”和“#”,其中“”用来匹配一个单词,“#”用来匹配零个或多个单词。
    - fanout类型的交换机会忽略routing_key,将消息路由到所有与它绑定的队列中。
    - headers类型的交换机不依赖routing_key,而是根据发送的消息内容中的headers属性进行匹配。
  • durable=True:这个参数决定了交换机的持久性。如果设置为True,当RabbitMQ服务重启后,该交换机依然存在。如果设置为False,当没有队列或交换机与此交换机绑定就会自动删除。

实现消息的发送

import pika
import json

class RabbitMQSender():
    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost', 5672, '/', pika.PlainCredentials('admin', 'adminpassword')))
        self.channel = self.connection.channel()

    def send(self, message):
        self.channel.exchange_declare(exchange='exchange1', exchange_type='topic', durable=True)
        self.channel.queue_declare(queue='test_queue1', durable=True)
        self.channel.queue_bind(exchange='exchange1', queue='test_queue1', routing_key='test_queue')
        self.channel.basic_publish(exchange='exchange1', routing_key='test_queue1', body=message)


if __name__ == '__main__':
    data = {'key': 'value', "a": 1}
    message = json.dumps(data)
    rabbitmq_sender = RabbitMQSender()
    rabbitmq_sender.send(message)
    print(" [x] Sent 'Hello World!'")

持久化

要在RabbitMQ中实现消息的持久化,可以通过以下两步来做:

  1. 队列持久化:在声明队列时,将”durable”参数设置为True。这样可以保证RabbitMQ重启后,队列还会存在。例如:
channel.queue_declare(queue='hello', durable=True)

这表明我们想确保队列“hello”是持久化的。

  1. 消息持久化:在发布消息时,通过将”properties”参数设置为pika.BasicProperties(delivery_mode = 2),可以使消息成为持久的。这里’delivery_mode’为2即代表消息持久化。例如:
channel.basic_publish(exchange='',

                      routing_key='hello',

                      body=message,

                      properties=pika.BasicProperties(delivery_mode = 2))

这表明我们想确保即使RabbitMQ服务重启后,消息仍然不会丢失。

一定要注意,虽然这样设置了,但消息的持久化还是不能100%地保证。因为当RabbitMQ接收到消息并且还没有保存就发生了崩溃,这种情况下,消息还是会丢失的。如果想要做到强一致性,你就需要将 RabbitMQ 的 publisher 可靠性(confirm) 和事务机制 组合使用。


已发布

分类

作者:

标签

评论

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注