NestJS 中使用 RabbitMQ

在实际的项目开发中,消息队列(Message Queue, MQ)的引入可以极大地方便不同语言编写的系统之间的解耦。举个例子,考虑到Python在人工智能领域的优越性,往往会采用Python来编写AI相关的处理。但同时,可能你的部分业务系统是基于Node. js开发的。这时,通过使用MQ,我们可以轻松构建Python服务作为消费者获取并处理消息,而Node. js作为消息发布者。这种异构语言下的消息机制,不仅充分保证了各自语言的优势发挥,同时也实现了系统之间的优雅解耦。

在 Python 使用 RabbitMQ 这篇博客中,我们详细介绍了如何使用RabbitMQ,并提供了使用Docker-compose在开发环境中部署RabbitMQ的教程。请注意,该部署方式仅适用于开发环境;对于生产环境而言,建议采用集群部署或者直接购买云服务。
而在今天的这篇文章中,我们将重点探讨如何在NestJS中使用RabbitMQ。

选择NestJS 的 RabbitMQ 库

官方建议使用,amqplib、amqp-connection-manager
我们采用的是 @golevelup/nestjs-rabbitmq, 它的特点 (具体请看官方 Readme):

  • 更强的抽象和封装:@golevelup/nestjs-rabbitmq为RabbitMQ提供了一个更高级别的抽象,这使得开发者可以更容易地进行消息通信,而无需深入了解底层实现。此外,通过封装RabbitMQ的复杂性,它让开发者可以将更多的精力集中在业务逻辑上。
  • 更好的集成:@golevelup/nestjs-rabbitmq 能够完美地与Nest. js框架整合在一起,提供了一种声明式的编程模型,开发者可以用装饰器来定义消息处理器,这使得代码更加简洁和易读。
  • 更强的鲁棒性:@golevelup/nestjs-rabbitmq 在处理消息失败时提供了重试机制,可以在尝试失败后自动重新尝试,进一步提高了应用的可用性。
  • 更好的错误处理:@golevelup/nestjs-rabbitmq 提供了更好的错误处理机制,包括错误日志记录,异常捕获等,使开发人员能更容易地排查和处理错误。
  • 支持更先进的特性:@golevelup/nestjs-rabbitmq 支持像RPC这样的先进特性,使得它可以在更复杂的应用场景中轻松应对。

安装

npm install ---save @golevelup/nestjs-rabbitmq

or

yarn add @golevelup/nestjs-rabbitmq 

在Nestjs 中使用

创建一个模块和service

nest g module rabbitmq 
nest g service rabbitmq --no-spec

rabbitmq 初始化

rabbitmq.module.ts

import { Module } from '@nestjs/common';
import { RabbitmqService } from './rabbitmq.service';
import { RabbitMQModule } from "@golevelup/nestjs-rabbitmq";

@Module({
  imports: [
    RabbitMQModule.forRoot(RabbitMQModule, {
      exchanges: [
        { name: 'exchange1', type: 'topic' },
      ],
      uri: "amqp://admin:adminpassword@localhost:5672",
      connectionInitOptions: { wait: false },
      deserializer: (message: Buffer, msg: any) => {
        // console.log("message", message.toJSON().data)
        // const decodedMessage = JSON.parse(message.toString())
        // console.log("decodedMessage", decodedMessage)
        return message;
      },
      serializer: (msg: any) => {
        // console.log("msg11", msg)
        const encodedMessage = JSON.stringify(msg)
        return Buffer.from(encodedMessage);
      },

    }),
  ],
  providers: [RabbitmqService1, RabbitmqService2],
})
export class RabbitmqModule {}

订阅消费者

rabbitmq.service.ts

import { Injectable } from '@nestjs/common';
import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';

@Injectable()
export class RabbitmqService {

    @RabbitSubscribe({
        exchange: "exchange1",
        routingKey: "test_queue1",
        queue: 'test_queue1'
      })
    public async pubSubHandler(content: Buffer, msg: {}) {
        // console.log("content", content)
        const message = JSON.parse(content.toString());
        console.log("RabbitmqService2 received:", message);
    }
}

发布消息

import { Injectable } from '@nestjs/common';
import { RabbitSubscribe, AmqpConnection } from '@golevelup/nestjs-rabbitmq';

@Injectable()
export class RabbitmqService {
	
	constructor(private readonly amqpConnection: AmqpConnection) {}

    async sendMethod() {
        const message = { key: 'value111' };
        await this.amqpConnection.publish('exchange1', 'test_queue2', message);
        return {}
    }

}

最后

因此,结合 Python 使用 RabbitMQ 的知识,你将能够毫无障碍地在NestJS和Python中使用RabbitMQ进行消息传递和通信。
通过阅读并理解这两篇文章,你将能够轻松地理解并掌握如何在NestJS和Python环境下与RabbitMQ进行交互。


已发布

分类

作者:

标签

评论

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注