文章目录[隐藏]
在实际的项目开发中,消息队列(Message Queue, MQ)的引入可以极大地方便不同语言编写的系统之间的解耦。举个例子,考虑到Python在人工智能领域的优越性,往往会采用Python来编写AI相关的处理。但同时,可能你的部分业务系统是基于Node. js开发的。这时,通过使用MQ,我们可以轻松构建Python服务作为消费者获取并处理消息,而Node. js作为消息发布者。这种异构语言下的消息机制,不仅充分保证了各自语言的优势发挥,同时也实现了系统之间的优雅解耦。
在 Python 使用 RabbitMQ 这篇博客中,我们详细介绍了如何使用RabbitMQ,并提供了使用Docker-compose在开发环境中部署RabbitMQ的教程。请注意,该部署方式仅适用于开发环境;对于生产环境而言,建议采用集群部署或者直接购买云服务。
而在今天的这篇文章中,我们将重点探讨如何在NestJS中使用RabbitMQ。
选择NestJS 的 RabbitMQ 库
官方建议使用,amqplib、amqp-connection-manager
我们采用的是 @golevelup/nestjs-rabbitmq, 它的特点 (具体请看官方 Readme):
- 更强的抽象和封装:@golevelup/nestjs-rabbitmq为RabbitMQ提供了一个更高级别的抽象,这使得开发者可以更容易地进行消息通信,而无需深入了解底层实现。此外,通过封装RabbitMQ的复杂性,它让开发者可以将更多的精力集中在业务逻辑上。
- 更好的集成:@golevelup/nestjs-rabbitmq 能够完美地与Nest. js框架整合在一起,提供了一种声明式的编程模型,开发者可以用装饰器来定义消息处理器,这使得代码更加简洁和易读。
- 更强的鲁棒性:@golevelup/nestjs-rabbitmq 在处理消息失败时提供了重试机制,可以在尝试失败后自动重新尝试,进一步提高了应用的可用性。
- 更好的错误处理:@golevelup/nestjs-rabbitmq 提供了更好的错误处理机制,包括错误日志记录,异常捕获等,使开发人员能更容易地排查和处理错误。
- 支持更先进的特性:@golevelup/nestjs-rabbitmq 支持像RPC这样的先进特性,使得它可以在更复杂的应用场景中轻松应对。
安装
npm install ---save @golevelup/nestjs-rabbitmq
or
yarn add @golevelup/nestjs-rabbitmq
在Nestjs 中使用
创建一个模块和service
nest g module rabbitmq
nest g service rabbitmq --no-spec
rabbitmq 初始化
在 rabbitmq.module.ts
中
import { Module } from '@nestjs/common';
import { RabbitmqService } from './rabbitmq.service';
import { RabbitMQModule } from "@golevelup/nestjs-rabbitmq";
@Module({
imports: [
RabbitMQModule.forRoot(RabbitMQModule, {
exchanges: [
{ name: 'exchange1', type: 'topic' },
],
uri: "amqp://admin:adminpassword@localhost:5672",
connectionInitOptions: { wait: false },
deserializer: (message: Buffer, msg: any) => {
// console.log("message", message.toJSON().data)
// const decodedMessage = JSON.parse(message.toString())
// console.log("decodedMessage", decodedMessage)
return message;
},
serializer: (msg: any) => {
// console.log("msg11", msg)
const encodedMessage = JSON.stringify(msg)
return Buffer.from(encodedMessage);
},
}),
],
providers: [RabbitmqService1, RabbitmqService2],
})
export class RabbitmqModule {}
订阅消费者
在 rabbitmq.service.ts
中
import { Injectable } from '@nestjs/common';
import { RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
@Injectable()
export class RabbitmqService {
@RabbitSubscribe({
exchange: "exchange1",
routingKey: "test_queue1",
queue: 'test_queue1'
})
public async pubSubHandler(content: Buffer, msg: {}) {
// console.log("content", content)
const message = JSON.parse(content.toString());
console.log("RabbitmqService2 received:", message);
}
}
发布消息
import { Injectable } from '@nestjs/common';
import { RabbitSubscribe, AmqpConnection } from '@golevelup/nestjs-rabbitmq';
@Injectable()
export class RabbitmqService {
constructor(private readonly amqpConnection: AmqpConnection) {}
async sendMethod() {
const message = { key: 'value111' };
await this.amqpConnection.publish('exchange1', 'test_queue2', message);
return {}
}
}
最后
因此,结合 Python 使用 RabbitMQ 的知识,你将能够毫无障碍地在NestJS和Python中使用RabbitMQ进行消息传递和通信。
通过阅读并理解这两篇文章,你将能够轻松地理解并掌握如何在NestJS和Python环境下与RabbitMQ进行交互。
发表回复