ruoyi-vue-pro 开发指南 ruoyi-vue-pro 开发指南
      视频教程
        微服务版 (opens new window)
        作者博客 (opens new window)
        GitHub (opens new window)
        • 萌新必读

          • 简介
          • 交流群
          • 视频教程
          • 功能列表
          • 快速启动(后端项目)
          • 快速启动(前端项目)
          • 接口文档
          • 技术选型
          • 项目结构
          • 代码热加载
          • 一键改包
          • 删除功能
          • 内网穿透
          • 达梦数据库专属
        • 后端手册

          • 新建模块
          • 代码生成【单表】(新增功能)
          • 代码生成【主子表】
          • 代码生成(树表)
          • 功能权限
          • 数据权限
          • 用户体系
          • 三方登录
          • OAuth 2.0(SSO 单点登录)
          • SaaS 多租户【字段隔离】
          • SaaS 多租户【数据库隔离】
          • WebSocket 实时通信
          • 异常处理(错误码)
          • 参数校验、时间传参
          • 分页实现
          • VO 对象转换、数据翻译
          • 文件存储(上传下载)
          • Excel 导入导出
          • 操作日志、访问日志、异常日志
          • MyBatis 数据库
          • MyBatis 联表&分页查询
          • 多数据源(读写分离)、事务
          • Redis 缓存
          • 本地缓存
          • 异步任务
          • 分布式锁
          • 幂等性(防重复提交)
          • 请求限流(RateLimiter)
          • 单元测试
          • 验证码
          • 工具类 Util
          • 配置管理
          • 数据库文档
        • 中间件手册

          • 定时任务
          • 消息队列(内存)
          • 消息队列(Redis)
            • 消息队列(RocketMQ)
            • 消息队列(RabbitMQ)
            • 消息队列(Kafka)
            • 限流熔断
          • 工作流手册

            • 工作流演示
            • 功能开启
            • 工作流(达梦适配)
            • 审批接入(流程表单)
            • 审批接入(业务表单)
            • 流程设计器(BPMN)
            • 流程设计器(钉钉、飞书)
            • 选择审批人、发起人自选
            • 会签、或签、依次审批
            • 流程发起、取消、重新发起
            • 审批通过、不通过、驳回
            • 审批加签、减签
            • 审批转办、委派、抄送
            • 执行监听器、任务监听器
            • 流程表达式
            • 流程审批通知
          • 大屏手册

            • 报表设计器
            • 大屏设计器
          • 支付手册

            • 功能开启
            • 支付宝支付接入
            • 微信公众号支付接入
            • 微信小程序支付接入
            • 支付宝、微信退款接入
          • 会员手册

            • 功能开启
            • 微信公众号登录
            • 微信小程序登录
            • 会员用户、标签、分组
            • 会员等级、积分、签到
          • 商城手册

            • 商城演示
            • 功能开启
            • 商城装修
            • 【商品】商品分类
            • 【商品】商品属性
            • 【商品】商品 SPU 与 SKU
            • 【商品】商品评价
            • 【交易】购物车
            • 【交易】交易订单
            • 【交易】售后退款
            • 【交易】快递发货
            • 【交易】门店自提
            • 【交易】分销返佣
            • 【营销】优惠劵
            • 【营销】拼团活动
            • 【营销】秒杀活动
            • 【营销】砍价活动
            • 【营销】满减送
            • 【营销】限时折扣
            • 【营销】内容管理
            • 【统计】会员、商品、交易统计
          • ERP手册

            • ERP 演示
            • 功能开启
            • 【产品】产品信息、分类、单位
            • 【库存】产品库存、库存明细
            • 【库存】其它入库、其它出库
            • 【库存】库存调拨、库存盘点
            • 【采购】采购订单、入库、退货
            • 【销售】销售订单、出库、退货
            • 【财务】采购付款、销售收款
          • CRM手册

            • CRM 演示
            • 功能开启
            • 【线索】线索管理
            • 【客户】客户管理、公海客户
            • 【商机】商机管理、商机状态
            • 【合同】合同管理、合同提醒
            • 【回款】回款管理、回款计划
            • 【产品】产品管理、产品分类
            • 【通用】数据权限
            • 【通用】跟进记录、待办事项
          • 公众号手册

            • 功能开启
            • 公众号接入
            • 公众号粉丝
            • 公众号标签
            • 公众号消息
            • 自动回复
            • 公众号菜单
            • 公众号素材
            • 公众号图文
            • 公众号统计
          • 系统手册

            • 短信配置
            • 邮件配置
            • 站内信配置
            • 数据脱敏
            • 敏感词
            • 地区 & IP 库
          • 运维手册

            • 开发环境
            • Linux 部署
            • Docker 部署
            • Jenkins 部署
            • HTTPS 证书
            • 服务监控
          • 前端手册 Vue 3.x

            • 开发规范
            • 菜单路由
            • Icon 图标
            • 字典数据
            • 系统组件
            • 通用方法
            • 配置读取
            • CRUD 组件
            • 国际化
            • IDE 调试
            • 代码格式化
          • 前端手册 Vue 2.x

            • 开发规范
            • 菜单路由
            • Icon 图标
            • 字典数据
            • 系统组件
            • 通用方法
            • 配置读取
          • 更新日志

            • 【v2.1.0】开发中
            • 【v2.0.1】2024-03-01
            • 【v2.0.0】2024-01-26
            • 【v1.9.0】2023-12-01
            • 【v1.8.3】2023-10-24
          • 开发指南
          • 中间件手册
          芋道源码
          2022-04-03
          目录
          1. 集群消费
          1.1 使用场景
          1.2 实现源码
          1.3 实战案例
          2. 广播消费
          2.1 使用场景
          2.2 实现源码
          2.3 实战案例

          消息队列(Redis)

          yudao-spring-boot-starter-mq (opens new window) 技术组件,基于 Redis 实现分布式消息队列:

          • 使用 Stream (opens new window) 特性,提供【集群】消费的能力。
          • 使用 Pub/Sub (opens new window) 特性,提供【广播】消费的能力。

          疑问:什么是【广播】消费?什么是【集群】消费?

          参见《阿里云 —— 集群消费和广播消费 》 (opens new window)文档

          # 1. 集群消费

          集群消费,是指消息发送到 Redis 时,有且只会被一个消费者(应用 JVM 实例)收到,然后消费成功。如下图所示:

          集群消费

          友情提示:

          如果你需要使用到【集群】消费,必须使用 Redis 5.0.0 以上版本,因为 Stream 特性是在该版本之后才引入噢!

          # 1.1 使用场景

          集群消费在项目中的使用场景,主要是提供可靠的、可堆积的异步任务的能力。例如说:

          • 短信模块,使用它异步 (opens new window)发送短信。
          • 邮件模块,使用它异步 (opens new window)发送邮件。

          相比 《开发指南 —— 异步任务》 来说,Spring Async 在 JVM 实例重启时,会导致未执行完的任务丢失。而集群消费,因为消息是存储在 Redis 中,所以不会存在该问题。

          # 1.2 实现源码

          集群消费基于 Redis Stream 实现:

          • 实现 AbstractRedisStreamMessage 抽象类,定义【集群】消息。
          • 使用 RedisMQTemplate 的 #send(message) 方法,发送消息。
          • 实现 AbstractRedisStreamMessageListener 接口,消费消息。

          最终使用 YudaoRedisMQAutoConfiguration 配置类,扫描所有的 AbstractRedisStreamMessageListener 监听器,初始化对应的消费者。如下图所示:

          YudaoRedisMQAutoConfiguration

          # 1.3 实战案例

          以【短信发送】举例子,改造使用 Redis 作为消息队列,同时也是讲解集群消费的使用。如下图所示:

          实战案例

          # 1.3.0 引入依赖

          在 yudao-module-system-biz 模块中,引入 yudao-spring-boot-starter-mq 技术组件。如下所示:

          <dependency>
              <groupId>cn.iocoder.boot</groupId>
              <artifactId>yudao-spring-boot-starter-mq</artifactId>
          </dependency>
          

          # 1.3.1 Message 消息

          在 message 包下,修改 SmsSendMessage 类,短信发送消息。代码如下:

          @Data
          public class SmsSendMessage extends AbstractRedisStreamMessage { // 重点:需要继承 AbstractRedisStreamMessage 类
          
              /**
               * 短信日志编号
               */
              @NotNull(message = "短信日志编号不能为空")
              private Long logId;
              /**
               * 手机号
               */
              @NotNull(message = "手机号不能为空")
              private String mobile;
              /**
               * 短信渠道编号
               */
              @NotNull(message = "短信渠道编号不能为空")
              private Long channelId;
              /**
               * 短信 API 的模板编号
               */
              @NotNull(message = "短信 API 的模板编号不能为空")
              private String apiTemplateId;
              /**
               * 短信模板参数
               */
              private List<KeyValue<String, Object>> templateParams;
          
          }
          

          # 1.3.2 SmsProducer 生产者

          在 producer 包下,修改 SmsProducer 类,Sms 短信相关消息的生产者。代码如下:

          @Slf4j
          @Component
          public class SmsProducer {
          
              @Resource
              private RedisMQTemplate redisMQTemplate; // 重点:注入 RedisMQTemplate 对象
          
              /**
               * 发送 {@link SmsSendMessage} 消息
               *
               * @param logId 短信日志编号
               * @param mobile 手机号
               * @param channelId 渠道编号
               * @param apiTemplateId 短信模板编号
               * @param templateParams 短信模板参数
               */
              public void sendSmsSendMessage(Long logId, String mobile,
                                             Long channelId, String apiTemplateId, List<KeyValue<String, Object>> templateParams) {
                  SmsSendMessage message = new SmsSendMessage().setLogId(logId).setMobile(mobile);
                  message.setChannelId(channelId).setApiTemplateId(apiTemplateId).setTemplateParams(templateParams);
                  redisMQTemplate.send(message); // 重点:使用 RedisMQTemplate 发送消息
              }
          
          }
          

          # 1.3.3 SmsSendConsumer 消费者

          在 consumer 包下,修改 SmsSendConsumer 类,SmsSendMessage 的消费者。代码如下:

          @Component
          @Slf4j
          public class SmsSendConsumer extends AbstractRedisStreamMessageListener<SmsSendMessage> { // 重点:继承 AbstractRedisStreamMessageListener 类,并填写对应的 Message 类
          
              @Resource
              private SmsSendService smsSendService;
          
              @Override // 重点:实现 onMessage 方法
              public void onMessage(SmsSendMessage message) {
                  log.info("[onMessage][消息内容({})]", message);
                  smsSendService.doSendSms(message);
              }
          
          }
          

          # 1.3.4 简单测试

          ① Debug 启动后端项目,可以在 SmsProducer 和 SmsSendConsumer 上面打上断点,稍微调试下。

          ② 打开 SmsTemplateController.http 文件,使用 IDEA httpclient 发起请求,发送短信。如下图所示:

          简单测试

          如果 IDEA 控制台看到 [onMessage][消息内容 日志内容,说明消息的发送和消费成功。

          # 2. 广播消费

          广播消费,是指消息发送到 Redis 时,所有消费者(应用 JVM 实例)收到,然后消费成功。如下图所示:

          集群消费

          # 2.1 使用场景

          例如说,在应用中,缓存了数据字典等配置表在内存中,可以通过 Redis 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。

          又例如说,我们基于 WebSocket 实现了 IM 聊天,在我们给用户主动发送消息时,因为我们不知道用户连接的是哪个提供 WebSocket 的应用,所以可以通过 Redis 广播消费。每个应用判断当前用户是否是和自己提供的 WebSocket 服务连接,如果是,则推送消息给用户。

          # 2.2 实现源码

          广播消费基于 Redis Pub/Sub 实现:

          • 实现 AbstractChannelMessage 抽象类,定义【广播】消息。
          • 使用 RedisMQTemplate 的 #send(message) 方法,发送消息。
          • 实现 AbstractRedisChannelMessageListener 接口,消费消息。

          最终使用 YudaoRedisMQAutoConfiguration 配置类,扫描所有的 AbstractRedisChannelMessageListener 监听器,初始化对应的消费者。如下图所示:

          YudaoRedisMQAutoConfiguration

          # 2.3 实战案例

          参见 《开发指南 —— 本地缓存》

          消息队列(内存)
          消息队列(RocketMQ)

          ← 消息队列(内存) 消息队列(RocketMQ)→

          Theme by Vdoing | Copyright © 2019-2024 芋道源码 | MIT License
            ×