以rcmq为例,如何学习一个新的中间件

1.csdn和官网,搜搜简介,初步了解这玩意是干什么的,它的常见应用场景:

比如rcmq是一个 队列模型的高实时高可靠高并发的分布式消息队列,场景应用场景是:解耦,削峰,异步

2.搜搜入门安装教程,先尝试在云服务器部署它

3.了解基础概念和架构:比如rcmq的结构:nameserver,broker,生产者消费者,rcmq的四种消息模型以及其应用场景。

4.实际上手实操一下

5.看进阶应用的教程,常见问题的解决(消息堆积,顺序消息,重复消费,消息不丢失),思考在自己项目中的应用场景

6.深入了解原理,结合源码,总结笔记(其实就是八股了)

官网-初识rcmq

定时/延时消息 | RocketMQ (apache.org)

入门-了解rcmq

RocketMQ保姆级教程 - 掘金 (juejin.cn)

rocketmq详解(全)-CSDN博客

安装使用-上手rcmq

RocketMQ的下载与安装(全网最细保姆级别教学)rocketmq下载安装舒一笑的博客-CSDN博客

RocketMQ保姆级教程 - 掘金 (juejin.cn)

项目应用拓展

八股-深入了解

RocketMQ常见问题总结 | JavaGuide(Java面试 + 学习指南)

rocketmq/docs/cn/FAQ.md at master · apache/rocketmq · GitHub

RocketMQ经典高频面试题大全(附答案)_rocketmq面试题-CSDN博客

RocketMQ消息短暂而又精彩的一生 - 掘金 (juejin.cn)

RocketMQ 如何实现高性能消息读写? - 掘金 (juejin.cn)

image-20231016184022318

应用

RocketMQ与SpringBoot整合进行生产级二次封装 - 掘金 (juejin.cn)

消息队列是什么

含义

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,
​
高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。
​
目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等
​
消息队列是一种"先进先出"的数据结构

应用场景

应用解耦

问题描述

系统的耦合性越高,容错性就越低,以电商应用为例,用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常

解耦

使用消息队列解耦,系统的耦合性就会下降了,比如物流系统发生故障,需要几分钟才能修复,在这段时间内,物流系统要处理的数据被缓存到消息队列中,用户的下单操作正常完成。当物流系统恢复后,补充处理存在消息队列中的订单消息即可,终端系统感知不到物流系统发生过几分钟故障

流量削峰

问题描述

应用系统如果遇到系统请求流量的瞬间猛增,有可能将系统压垮,有了消
息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大
大提高系统的稳定性

削峰含义

一般情况,为了保证系统的稳定性,如果系统负载超过阈值,就会阻止用户请求,而如果使用消息队列将请求缓存起来,等待系统处理完毕后通知用户下单完毕,这方法虽然会耗时,但出现系统不能下单的情况

场景描述

秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为了解决这个问题,一般需要

在应用前端加入消息队列。这样做的好处有

1.可以控制活动的人数

2.可以缓解短时间内高流量压垮应用

3.用户请求,服务器接收后,首先写入消息队列,假如消息队列长度超过最大数量,则直接

抛弃用户请求或跳转到错误页面

4.秒杀业务根据消息队列中的请求信息,再做后续处理

数据分发

数据分发含义

通过消息队列可以让数据在多个系统之间更加方便流通。只需要将数据发
送到消息队列,数据使用方直接在消息队列中获取数据即可

A系统产生数据,发送到MQ
BCD哪个系统需要,自己去MQ消费即可
如果某个系统不需要数据,取消对MQ消息的消费即可
新系统要数据,直接从MQ消费即可

PS:数据异构也能通过消息队列实现

异步处理

场景描述

用户注册后,需要发注册邮件和注册短信。传统的做法有两种

1.串行方式   2. 并行方式

串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信,以上三个任务

完成后,返回给客户端

并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个

任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

RocketMQ 是什么

RocketMQ 是一个 队列模型 的消息中间件,具有高性能、高可靠、高实时、分布式 的特点。它是一个采用 Java 语言开发的分布式的消息系统

MQ对比

常见的MQ产品宏观对比

image-20231015224549960

RocketMQ 优点:

1.单机吞吐量:十万级 2.可用性:非常高,分布式架构 3.消息可靠性:经过参数优化配置,消息可以做到 0 丢失 4.功能支持:MQ 功能较为完善,还是分布式的,扩展性好 5.支持 10 亿级别的消息堆积,不会因为堆积导致性能下降 6.源码是 Java,方便结合公司自己的业务进行二次开发 天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况 7.RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验

RocketMQ 缺点:

1.没有在 MQ 核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码

2支持的客户端语言不多,目前是Java及c++,其中c++不成熟

为什么选择rocketmq

1 消息可靠性,稳定性,抗住过阿狸双十一,适合电商下单,支付(回调时发消息改变订单状态,取消订单时定时任务回滚)这种堆消息丢失敏感的场景

2.单机吞吐量高,支持大量消息堆积,适合高并发的电商场景进行业务削峰

3.分布式架构,可用性高

rabbitmq二开困难,吞吐量较于其他两个较低,但是实现简单,延时低,吞吐量不高的简单业务场景可以使用。大量消息积压时性能直线下降

kafka一般用于大数据业务处理,日志采集之类的业务,有一些功能没提供。当你的业务场景中,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高。所以,Kafka 不太适合在线业务场景。

技术选型因素:功能能否覆盖需求,系统稳定性,吞吐量,性能,学习成本

核心概念

image-20240419205505460

RocketMQ 整体架构设计主要分为四大部分,分别是:Producer、Consumer、Broker、NameServer。

为了更贴合实际,图画的都是集群部署,像 Broker 还画了主从。

■ Producer:就是消息生产者,可以集群部署。它会先和 NameServer 集群中的随机一台建立长连接,得知当前要发送的 Topic 存在哪台 Broker Master上,然后再与其建立长连接,支持多种负载平衡模式发送消息,默认通过轮询去每个队列生产数据。

■ Consumer:消息消费者,也可以集群部署。它也会先和 NameServer 集群中的随机一台建立长连接,得知当前要消息的 Topic 存在哪台 Broker Master、Slave上,然后它们建立长连接

支持集群消费和广播消费消息。广播模式下,一条消息会被同一个消费组中的所有消费者消费 ,集群模式下消息只会被一个消费者消费。

■ Broker:主要负责消息的存储、查询消费。Broker 会向集群中的每一台 NameServer 注册自己的路由信息。

支持主从部署,一个 Master 可以对应多个 Slave,Master 支持读写,Slave 只支持读,slave定时从master同步数据(同步/异步刷盘)。 即使master宕机了,slave还是只能被读(提供消费服务)

■ NameServer :

一个 注册中心 ,主要提供两个功能:Broker 管理路由信息管理Broker 会将自己的信息注册到 NameServer 中,消费者和生产者就从 NameServer 中根据要查找的topic查询broker-topic路由表,和查找到的 Broker 进行通信(生产者和消费者定期会向 NameServer 去查询相关的 Broker 的信息)

通常也是集群部署,但是各 NameServer 之间不会互相通信(去中心化,没有主节点),每个broker都会跟所有nameserver保持长连接,并且每隔三十秒心跳发送心跳,心跳包含了自身的topic配置信息。

nameserver的作用:管理broker和路由信息,让broker和生产者消费组解耦,生产者和消费组不需要关注broker的添加删除变动,只需要与nameserver交流就行,这样broker就可以比较灵活的进行变动了。

■ Topic: 是一种消息的逻辑分类,比如说你有订单类的消息,也有库存类的消息,那么就需要进行分类,一个是订单 Topic 存放订单相关的消息,一个是库存 Topic 存储库存相关的消息。

■Message:消息的载体。一个 Message 必须指定 topic,相当于寄信的地址。Message 还有一个可选的 tag 设置,以便消费端可以基于 tag 进行过滤消息

■Tag:可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。

————————————————

消息队列存在的问题

消息队列起到解耦、削峰、数据分发的作用,同时也存在着系统可用性降低、系统复杂度提高、一致性问题 这三个方面缺点。

系统可用性降低 : 系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响.

系统复杂度提高 : MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。如何保证

消息没有被重复消费、怎么处理消息丢失情况、如何保证消息传递的顺序性。

一致性问题 : A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理完成、D处理失败、如何保证消息数据处理的一致性。

工作流程

说完核心概念,再来说一下核心的工作流程,这里我先画了一张图。

image-20240419205515646

通过这张图就可以很清楚的知道,RocketMQ大致的工作流程:

  • 先启动 NameServer 集群,各 NameServer 之间无任何数据交互,Broker 启动之后会向所有 NameServer 定期(每 30s)发送心跳包,包括:IP、Port、TopicInfo,NameServer 会定期扫描 Broker 存活列表,如果超过 120s 没有心跳则移除此 Broker 相关信息,代表下线。

    这样每个 NameServer 就知道集群所有 Broker 的相关信息

  • Producer在启动之后会跟会NameServer建立长连接,定期从NameServer中获取Broker的信息,当发送消息的时候,会根据消息发送需要的topic去找对应的Broker地址,如果有的话,就向这台Broker发送请求;没有找到的话,就看根据是否允许自动创建topic来决定是否发送消息。

  • Broker在接收到Producer的消息之后,会将消息存起来,持久化,如果有从节点的话,也会主动同步给从节点,实现数据的备份

  • Consumer启动之后也会跟会NameServer建立长连接,定期从NameServer中获取Broker和对应topic的信息,然后根据自己需要订阅的topic信息找到对应的Broker的地址,然后跟Broker建立连接,获取消息,进行消费

消息模型

消息队列有两种模型:队列模型和发布/订阅模型。

队列模型

这是最初的一种消息队列模型,对应着消息队列“发-存-收”的模型。生产者往某个队列里面发送消息,一个队列可以存储多个生产者的消息,一个队列也可以有多个消费者,但是消费者之间是竞争关系,也就是说每条消息只能被一个消费者消费。

image-20240419205521018

发布/订阅模型

如果需要将一份消息数据分发给多个消费者,并且每个消费者都要求收到全量的消息。很显然,队列模型无法满足这个需求。解决的方式就是发布/订阅模型。

在发布 - 订阅模型中,消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic)。

发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”

image-20240419205525840

与队列不同的是:一份消息可以被多个消费者消费

rocketmq的消息模型

RocketMQ的消息模型(Message Model)主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息,典型的发布-订阅模式

broker与Topic的关系:

Broker 在实际部署过程中对应一台服务器(RocketMQ服务端),每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker,Broker和Topic是多对多的关系。

Topic和MessageQueue的关系:

Topic是一个逻辑上的概念。图中的MessageQueue,即消息队列,是用于存储消息的物理地址,一个队列只能归属一个topic,一个topic可以有多个队列

每个Topic中的消息地址可以存储于多个MessageQueue 中,MessageQueue又可以分布在不同的Broker上。

消息生成与消息消费

image-20240419205530986

rocketmq的消息生产:

生产者集群生产消息发送给Topic中的某一个队列中

rocketmq的消息消费(集群模式):

以消费者组作为消费单元,一个主题可以有多个消费者组同时订阅,一个消费者组中可以有多个消费者,它们加一起消费了某个主题的全部消息,同时,一个消费者组也能够订阅多个主题。

在消费时,RocketMQ是以队列为基本单位进行消息分配的,一个队列可以被多个消费者组消费,但是一个队列中的消息只能被某个消费者组中的某一个消费者消费(默认集群模式),但是一个消费者可以同时消费多个队列中的消息。消费者和队列是一对多的关系

一般而言,消费者的数量和队列的数量一致时,消费性能最好(能最大利用队列和消费者)。如果消费者小于队列数量,会出现消费者消费多个队列的情况,如果消费者大于队列数量,将会出现空闲的消费者。

Broker中维护了某个消费者组在某个队列上的消费偏移量(offset),如果消费者组中的某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费,消费成功之后offset向前推进,避免offset归零消息重复消费。

为什么一个队列只能被消费组内的一个消费者消费?

为了保证队列内消息的消费顺序,而且我们知道消费组消费消息的实现是offset+1,想要一个队列被多个同消费组消费者消费,想做到组内不重复消费消息,那就只能约定不同的步长,但这样维护起来就很麻烦

一个topic多个队列的好处?

一个主题允许有多个队列的设计的最大好处就是提高并发能力。生产者组的多个生产者可以同时向多个队列投递消息,而消费者组的多个消费者可以同时从多个队列拉取消息消费。

消费位移

image-20240419205536348

在RocketMQ中,offset(消费位移)用来管理每个消费队列的不同消费组的消费进度。每次消费者组消费完会返回一个成功的响应,然后再把维护的offset向前推进,这样就不会出现刚刚消费过的消息再一次被消费了。

对于同一个队列,每个消费组对应该队列的消费者都有一个该队列上的offset

对offset的管理可以分为本地模式和远程模式,本地模式是以文本文件的形式存储在客户端,而远程模式是将数据保存到broker端,对应的数据结构分别为LocalFileOffsetStore和RemoteBrokerOffsetStore。

当消费模式为广播模式时,offset使用本地模式存储,因为每条消息会被所有的消费者消费,每个消费者管理自己的消费进度,各个消费者之间不存在消费进度的交集(不需要有人当接盘侠)。

当消费模式为集群消费时,则使用远程模式管理offset,消息同样会被多个消费者组消费,但不同的是每个消费者只负责消费其中部分队列中的消息,添加或删除消费者,都容易造成消费进度冲突,因此需要集中管理在Broker中。

消息类型

1.普通消息

2.定时消息

3.顺序消息

4.事务消息

普通消息

一般用于微服务解耦,数据集成,异步处理

要求可靠传输,对消息的处理时机处理顺序没有特别要求

image-20240419205541732

生命周期

  • 初始化:消息被生产者构建并完成初始化,待发送到服务端

  • 待消费:发送到服务端,对消费者可见,等待消费者消费

  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。

    此时服务端会等待消费者完成消费并提交消费结果,

    如果等待超时,rocketmq服务端会重试

  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)

    RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除:RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

定时/延时消息

在分布式定时调度触发(定时消息)、任务超时处理(延时消息)等场景,需要实现精准、可靠的定时事件触发。使用 RocketMQ 的定时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。定时消息仅支持在 MessageType 为 Delay 的主题内使用,即定时消息只能发送至类型为定时消息的主题中,发送的消息的类型必须和主题的类型一致。

基于定时消息的超时任务处理具备如下优势:

  • 灵活性高: 无需遵循固定的时间间隔,可以根据具体业务需求随时调整触发时间,实现任意精度的事件触发。(5.x支持)

  • 精度高: 确到毫秒级的定时触发

  • 避免重复触发问题:基于消息通知的方式,无需业务层面去重,消息的幂等性由消息系统来保证。(相比于分布式场景下的quartz,得用分布式锁去重)

  • 高性能可扩展:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。RocketMQ 的定时消息基于消息通知实现,具有高并发和水平扩展的能力

image-20240419205547936

生命周期

除了多加的定时中状态和原本的待消费状态,其他实现跟普通消息一样

  • 初始化:消息被生产者构建并完成初始化,待发送到服务端

  • 定时中:消息被发送到服务端,和普通消息不同的是,服务端不会直接构建消息索引,而是会将定时消息单独存储在定时存储系统中,等待定时时刻到达。

  • 待消费:定时时刻到达后,服务端将消息重新写入普通存储引擎,对下游消费者可见,等待消费者消费的状态。

  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ 会对消息进行重试处理。

  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除:Apache RocketMQ 按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

定时消息的实现逻辑需要先经过定时存储等待触发,定时时间到达后才会被投递给消费者。因此,如果将大量定时消息的定时时间设置为同一时刻,则到达该时刻后会有大量消息同时需要被处理,会造成系统压力过大,导致消息分发延迟,影响定时精度

原理

Broker默认会有一个延迟消息专属的Topic,下面有18个队列,每个延迟级别对应一个队列。如果Broker接收到的是延迟消息,会改写消息的Topic和queueId,将消息暂时统一写入延迟队列中,然后由ScheduleMessageService线程对延迟队进行扫描,将到期需要交付的消息从CommitLog中读出来,然后恢复消息原本的Topic和queueId等属性,重新写回CommitLog,然后Consumer就可以正常消费了。

顺序消息

顺序消息仅支持使用 MessageType 为 FIFO 的主题,即顺序消息只能发送至类型为顺序消息的主题中,发送的消息的类型必须和主题的类型一致。和普通消息发送相比,顺序消息发送必须要设置消息组。(推荐实现 MessageQueueSelector 的方式,见下文)。要保证消息的顺序性需要单一生产者串行发送。

单线程使用 MessageListenerConcurrently 可以顺序消费,多线程环境下使用 MessageListenerOrderly 才能顺序消费

事务消息

基于RocketMQ分布式事务 - 完整示例 - 知乎 (zhihu.com)

解析 RocketMQ 业务消息——“事务消息” - 知乎 (zhihu.com)

概念介绍

  • 事务消息:RocketMQ 提供类似 XA 或 Open XA 的分布式事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致;

  • 半事务消息:暂不能投递的消息,生产者已经成功地将消息发送到了 RocketMQ 服务端,但是 RocketMQ 服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半事务消息;

  • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ 服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该询问过程即消息回查。

生命周期

image-20240419205559640

  • 初始化:半事务消息被生产者构建并完成初始化,待发送到服务端

  • 事务待提交:半事务消息被发送到服务端,和普通消息不同,并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见;

等待超时后会消息回查

  • 消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止;

  • 提交待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见,等待被消费者获取并消费;

  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,RocketMQ 会对消息进行重试处理。具体信息,请参见消息重试;

  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败);RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。

  • 消息删除:当消息存储时长到期或存储空间不足时,RocketMQ 会按照滚动机制清理最早保存的消息数据,将消息从物理文件中删除。

事务消息基本流程

事务消息交互流程如下图所示:

image-20240419205605715

  1. 生产者将消息发送至 RocketMQ 服务端;

  2. RocketMQ 服务端将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息被标记为“暂不能投递”,这种状态下的消息即为半事务消息;

  3. 生产者开始执行本地事务逻辑;

  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit 或是 Rollback),服务端收到确认结果后处理逻辑如下:

  • 二次确认结果为 Commit:服务端将半事务消息标记为可投递,并投递给消费者;

  • 二次确认结果为 Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。

在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查;

生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果;(为了消息回查,需要事务日志表)

生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行处理。

MQ的事务消息是解决生产者的单机业务事务与消息发送的一致性。即解决生产者做完核心业务后MQ消息发送不成功,或者MQ消息发送成功后生产者核心业务失败的一致性问题。

要完成整体业务流程的分布式事务还要配合其他机制

实现细节

image-20240419205612628

根据发送事务消息的基本流程的需要,实现分为三个主要流程:接收处理 Half 消息、Commit 或 Rollback 命令处理、事务消息 check。

处理 Half 消息

发送方第一阶段发送 Half 消息到 Broker 后,Broker 处理 Half 消息。Broker 流程参考下图:

image-20240419205616657

具体流程是首先把消息转换 Topic 为 RMQ_SYS_TRANS_HALF_TOPIC,其余消息内容不变,写入 Half 队列。具体实现参考 SendMessageProcessor 的逻辑处理。

Commit 或 Rollback 命令处理

发送方完成本地事务后,继续发送 Commit 或 Rollback 到 Broker。由于当前事务已经完结,Broker 需要删除原有的 Half 消息,由于 RocketMQ 的 appendOnly 特性,Broker通过 OP 消息实现标记删除。Broker 流程参考下图:

image-20240419205621720

  • Commit。Broker 写入 OP 消息,OP 消息的 body 指定 Commit 消息的 queueOffset,标记之前 Half 消息已被删除;同时,Broker 读取原 Half 消息,把 Topic 还原,重新写入 CommitLog,消费者则可以拉取消费;

  • Rollback。Broker 同样写入 OP 消息,流程和 Commit 一样。但后续不会读取和还原 Half 消息。这样消费者就不会消费到该消息。

具体实现在 EndTransactionProcessor 中。

事务消息 check

如果发送端事务时间执行过程,发送 UNKNOWN 命令,或者 Broker/发送端重启发布等原因,流程 2 的标记删除的 OP 消息可能会缺失,因此增加了事务消息 check 流程,该流程是在异步线程定期执行(transactionCheckInterval 默认 30s 间隔),针对这些缺失 OP 消息的 Half 消息进行 check 状态。具体参考下图:

image-20240419205626595

事务消息 check 流程扫描当前的 OP 消息队列,读取已经被标记删除的 Half 消息的 queueOffset。如果发现某个 Half 消息没有 OP 消息对应标记,并且已经超时(transactionTimeOut 默认 6 秒),则读取该 Half 消息重新写入 half 队列,并且发送 check 命令到原发送方检查事务状态;如果没有超时,则会等待后读取 OP 消息队列,获取新的 OP 消息。

另外,为了避免发送方的异常导致长期无法确定事务状态,如果某个 Half 消息的 bornTime 超过最大保留时间(transactionCheckMaxTimeInMs 默认 12 小时),则会自动跳过此消息,不再 check。

具体实现参考:

TransactionalMessageServiceImpl#check 方法

实战

1.创建事务消息型Topic

2.事务消息发送

事务消息相比普通消息发送时需要修改以下几点:

  • 发送事务消息前,需要开启事务并关联本地的事务执行。

  • 为保证事务一致性,在构建生产者时,必须设置事务检查器和预绑定事务消息发送的主题列表,客户端内置的事务检查器会对绑定的事务主题做异常状态恢复。

image-20240419205645165

3.消费

当事务消息 commit 之后,这条消息其实就是一条投递到用户 Topic 的普通消息而已。所以对于消费者来说,和普通消息的消费没有区别。

注意:

  1. 避免大量未决事务导致超时:在事务提交阶段异常的情况下发起事务回查,保证事务一致性;但生产者应该尽量避免本地事务返回未知结果;大量的事务检查会导致系统性能受损,容易导致事务处理延迟;

  2. 事务消息的 Group ID 不能与其他类型消息的 Group ID 共用:与其他类型的消息不同,事务消息有回查机制,回查时服务端会根据 Group ID 去查询生产者客户端;

  3. 事务超时机制:半事务消息被生产者发送服务端后,如果在指定时间内服务端无法确认提交或者回滚状态,则消息默认会被回滚。

生产者和消费者

不建议单一进程创建大量生产者

rcmq支持同一个生产者向多个主题发送消息。对于生产者的创建和初始化,建议遵循够用即可、最大化复用原则

不建议频繁创建和销毁生产者

RocketMQ 的生产者是可以重复利用的底层资源,类似数据库的连接池。因此不需要在每次发送消息时动态创建生产者,且在发送结束后销毁生产者。这样频繁的创建销毁会在服务端产生大量短连接请求,严重影响系统性能。

生产者

消息生产者向消息队列写入数据

默认消息生产类DefaultMQProducer

消费者分类

消费者可以分为两类

  • DefaultMQPushConsumer:系统控制读取操作

  • DefaultMQPullConsumer:使用者自主控制读取操作 但是这两个的本质都是通过Consumer轮询Broker然后再拉取消息,即pull模式

PushConsumer

高度封装的消费者类型,消费消息仅仅通过消费监听器监听并返回结果。消息的获取、消费状态提交以及消费重试都通过 RocketMQ 的客户端 SDK 完成。

PushConsumer 的消费监听器执行结果分为以下三种情况:

  • 返回消费成功:以 Java SDK 为例,返回ConsumeResult.SUCCESS,表示该消息处理成功,服务端按照消费结果更新消费进度。

  • 返回消费失败:以 Java SDK 为例,返回ConsumeResult.FAILURE,表示该消息处理失败,需要根据消费重试逻辑判断是否进行重试消费。

  • 出现非预期失败:例如抛异常等行为,该结果按照消费失败处理,需要根据消费重试逻辑判断是否进行重试消费

使用 PushConsumer 消费者消费时,不允许使用以下方式处理消息,否则 RocketMQ 无法保证消息的可靠性。

  • 错误方式一:消息还未处理完成,就提前返回消费成功结果。此时如果消息消费失败,RocketMQ 服务端是无法感知的,因此不会进行消费重试。

  • 错误方式二:在消费监听器内将消息再次分发到自定义的其他线程,消费监听器提前返回消费结果。此时如果消息消费失败,RocketMQ 服务端同样无法感知,因此也不会进行消费重试。

  • PushConsumer 严格限制了消息同步处理及每条消息的处理超时时间,适用于以下场景:

    • 消息处理时间可预估:如果不确定消息处理耗时,经常有预期之外的长时间耗时的消息,PushConsumer 的可靠性保证会频繁触发消息重试机制造成大量重复消息。

    • 无异步化、高级定制场景:PushConsumer 限制了消费逻辑的线程模型,由客户端 SDK 内部按最大吞吐量触发消息处理。该模型开发逻辑简单,但是不允许使用异步化和自定义处理流程。

使用DefaultMQPushConsumer

public class consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2");//设置group名
        consumer.setNamesrvAddr("192.168.108.128:9876");//设置Nameserver地址
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//设置读取位置,从最小的offset开始
        consumer.subscribe("TopicTest2", "*");//生命接受的Topic名以及过滤tags
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            System.out.println(Thread.currentThread().getName() + " Receive Message: " + list);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        System.out.println("consumer start...");
        consumer.start();
    }
}

在程序中创建好了consumer对象之后,需要对Group、MessageModel、NameServer、Topic进行设置:

1.Group主要将多个Consumer组织到一起,提高并发能力 2,RocketMQ支持两种MessageModel消息模式:Clustering和BroadCasting。Clustering模式下,在同一个group中的消费者只会消费消息的一部分,即group内所有消费者的消费消息的总和是Topic内容整体。Broadcasting即广播模式,消息会同时发布到group内的每一个consumer。 3.NameServer需要配置ip和端口号,多个NameServer用分号(;)隔开 4.Topic是消息队列的标识,表明消费者需要消费哪些消息,并且可以通过消息的tags来进行过滤,如:consumer.subscribe(“TopicTest2”, “tag1||tag2||tag3”);此处使用 * 表示消费topic中的所有消息

SimpleConsumer

SimpleConsumer 是一种接口原子型的消费者类型,消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成。

一个来自官网的例子:

// 消费示例:使用 SimpleConsumer 消费普通消息,主动获取消息处理并提交。 
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "YourTopic";
FilterExpression filterExpression = new FilterExpression("YourFilterTag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
        // 设置消费者分组。
        .setConsumerGroup("YourConsumerGroup")
        // 设置接入点。
        .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("YourEndpoint").build())
        // 设置预绑定的订阅关系。
        .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
        // 设置从服务端接受消息的最大等待时间
        .setAwaitDuration(Duration.ofSeconds(1))
        .build();
try {
    // SimpleConsumer 需要主动获取消息,并处理。
    List<MessageView> messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
    messageViewList.forEach(messageView -> {
        System.out.println(messageView);
        // 消费处理完成后,需要主动调用 ACK 提交消费结果。
        try {
            simpleConsumer.ack(messageView);
        } catch (ClientException e) {
            logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);
        }
    });
} catch (ClientException e) {
    // 如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
    logger.error("Failed to receive message", e);
}

SimpleConsumer 适用于以下场景:

  • 消息处理时长不可控:如果消息处理时长无法预估,经常有长时间耗时的消息处理情况。建议使用 SimpleConsumer 消费类型,可以在消费时自定义消息的预估处理时长,若实际业务中预估的消息处理时长不符合预期,也可以通过接口提前修改。

  • 需要异步化、批量消费等高级定制场景:SimpleConsumer 在 SDK 内部没有复杂的线程封装,完全由业务逻辑自由定制,可以实现异步分发、批量消费等高级定制场景。

  • 需要自定义消费速率:SimpleConsumer 是由业务逻辑主动调用接口获取消息,因此可以自由调整获取消息的频率,自定义控制消费速率。

PullConsumer

使用DefaultMQPullConsumer

public class PullConsumer {
    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
 
    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
         consumer.start();
 		//根据Topic名获取队列
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");
        for (MessageQueue mq : mqs) {
            //获取消息的offset
            long Offset = Consumer.fetchConsumeOffset(mq, true);
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult =
                        consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    //根据不同的效应进行处理
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
 
        consumer.shutdown();
    }
 
    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null)
            return offset;
 
        return 0;
    }
 
    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSE_TABLE.put(mq, offset);
    }
 }

使用Pull模式拉取消息额外做了三件事

获取MessageQueue并遍历。因为一个Topic可能包含多个MessageQueue,因此需要对MessageQueue进行遍历才能获取; 维护Offsetstore。从MessageQueue拉去消息时,需要传入Offset值,随着MessageQueue的增长,需要消费者拉取broker存储的Offset; 根据不同响应状态进行处理。拉去请求会返回FOUND(获取消息)、NO_MATCHED_MSG(没有合适消息)、NO_NEW_MSG(没有新消息)、OFFSET_ILLEGAL(不合法的Offset)

消费方式

消费方式也分为两类

1.push(推)

2.pull(拉)

push

push,顾名思义,就是推的意思。就是当MQ收到生产者产生的消息的时候,会主动将消息推送到消费者进行消费,这种模式就叫push,也就是MQ将消息推给到消费者的意思。

优点:响应快,消息一来就push给消费者

缺点:消息量比较大时,消费者压力大,因为消费者无法控制MQ推消息的速度,有可能忙不过来

pull

push是MQ主动给消费者推消息,那么pull呢?刚好跟push相反,就是消费者主动去MQ中拉取消息。

优点:消费者压力小,可以自己决定何时去拉取消息

缺点:实时性较低,而且需要开发人员主动维护消息消费进度(因为消息消费是消费者主动发起的)

rocketMQ的消费方式

RocketMQ对于push模式做到了实时和压力的平衡,这主要是因为RocketMQ的push模式其实算是一个“伪push”模式,真正底层的实现还是基于pull。

既然底层是pull,那么RokcetMQ在实现消费者的逻辑的时候,就可以很容易实现控制压力的效果

但是如何通过pull实现push的时实的优点呢? 答:长轮询机制

轮询与长轮询

轮询

轮询是指不管服务端数据有无更新,客户端每隔定长时间请求拉取一次数据,可能有更新数据返回,也可能什么都没有。

轮询pull的问题

  • 对于消息而言,会一直产生,这就要求消费者不停地间隔一定时间去拉取消息,即使没有消息也需要去请求,就会造成大量无用的请求,浪费大量耗费服务器内存和宽带资源

  • 低实时性

长轮询

长轮询也是客户端请求服务端,如果服务端有数据,那么就立马返回,客户端再次请求;当服务端不存在数据的时候,服务端并不会给客户端响应,而是将请求给hold住,当服务端有数据的时候才会给客户端响应,返回数据。

优点

  • 解决轮询带来的频繁无用请求问题

  • 一旦新的数据到了,那么消费者能立马就可以获取到新的数据,保证了实时性

push模式消费的实现

消费流程

image-20240419205655536

  • ①消费者有一个后台线程,会去处理拉取消息

  • ②先去判断有没有过多消息没有消费处理,如果有的话,那么就间隔一定时间再次从①开始执行拉取消息的逻辑

  • ③消费者没有过多消息没有消费,那么就会直接向MQ发送拉取消息的请求,有消息就返回,没有消息就hold住请求,等有新的消息到的时候才返回

  • ④消费者获取到消息之后,会去找用户自定义的消息处理逻辑的实现(MessageListener的实现)去消费消息,同时会再次拉取消息,继续从①开始执行逻辑

Broker将Consumer发送过来的拉取请求保持住,然后Consumer循环查看队列中是否有消息,如果没有消息则等待5秒,然后再去查看队列是否有消息,如果有消息则拉取,如果没有消息,就继续等待。当超过设置的BrokerSuspendMaxTimeMillis时(默认时15s,在默认情况下Consumer会等待3次),返回空结果。此时消息的控制权在Consumer上,大大减少了Consumer客户端的消息处理压力

源码分析

1、消费者拉取消息控制压力

当消费者准备去拉消息的时候,会先去判断当前消费者消费的压力再决定是否去拉取消息。

RocketMQ提供了两种判断消费压力逻辑,一种是基于还未消费的消息的数量的大小,还有一种是基于还未消费的消息所占内存的大小。

控制压力源码

  • 判断还未消息的数量,数量太多就等会再执行重新执行拉取消息的逻辑

  • 判断还未消息的大小,如果还未消息的消息占用的内存过大,就等会再执行重新执行拉取消息的逻辑

总的一句话就是,当消费者消费的压力过大时,就不会去拉取消息,而是等待一定的时间再去执行拉取消息的逻辑,如果压力还是很大,就还继续等,如此循环,直到消费者的消费压力小于阈值的时候,才会真正的发送请求到MQ中拉取消息。

2、MQ将请求hold住

当服务端未找到消息时,就将请求进行挂起,存起来

请求hold住源码

拉取不到消息时,会调用PullRequestHoldService的suspendPullRequest方法讲请求存储起来。PullRequestHoldService是用来存储拉取请求的类。

PullRequestHoldService

suspendPullRequest方法会将请求分类,放到ManyPullRequest里,然后用一个ConcurrentHashMap进行存储

3、MQ收到消息响应给消费者

NotifyMessageArrivingListener

前面提到,消费者发起拉取消息请求发现没消息,请求被hold住等待MQ消息准备好(可见)

当生产者发送的消息达到MQ的时候,MQ会回调NotifyMessageArrivingListener的arriving方法,之后就会调用PullRequestHoldService的notifyMessageArriving方法,MQ会重新处理拉取消息的逻辑,此时就能找到最新来的那条消息,从而将最新的消息通过网络返回给消费者。

notifyMessageArriving和返回消息逻辑

最后

所以从以上的分析可以看出,RocketMQ对于push的消费方式的实现是基于长轮询机制来实现的,同时平衡了时实和压力,这其实就很nice了。

最后我想说一句,其实不论是pull还是push,又或是轮询和长轮询,其实都是一种理论或者说是一种思想,不单单是MQ的东西,就比如在Nacos中,也使用了push和长轮询机制。但是这些理论在不同产品的具体实现,实现方式可能不太一样,但都是大同小异,所以当你懂了这些思想,再看其它框架的源码,其实就很容易了。

消费者分组和生产者分组

# 生产者分组

RocketMQ 服务端 5.x 版本开始,生产者是匿名的,无需管理生产者分组(ProducerGroup);对于历史版本服务端 3.x 和 4.x 版本,已经使用的生产者分组可以废弃无需再设置,且不会对当前业务产生影响。

# 消费者分组

消费者分组是多个消费行为一致的消费者的负载均衡分组。消费者分组不是具体实体而是一个逻辑资源。通过消费者分组实现消费性能的水平扩展以及高可用容灾。

消费者分组中的订阅关系、投递顺序性、消费重试策略是一致的。

  • 订阅关系:Apache RocketMQ 以消费者分组的粒度管理订阅关系,实现订阅关系的管理和追溯。

  • 投递顺序性:Apache RocketMQ 的服务端将消息投递给消费者消费时,支持顺序投递和并发投递,投递方式在消费者分组中统一配置。

  • 消费重试策略: 消费者消费消息失败时的重试策略,包括重试次数、死信队列设置等。

RocketMQ 服务端 5.x 版本:上述消费者的消费行为从关联的消费者分组中统一获取,因此,同一分组内所有消费者的消费行为必然是一致的,客户端无需关注

RocketMQ 如何保证高性能读写

RocketMQ 如何实现高性能消息读写? - 掘金 (juejin.cn)

【精选】操作系统 | 内存文件映射 —— 文件到内存的映射-CSDN博客

RocketMQ源码解析-零拷贝 - 掘金 (juejin.cn)

为什么能实现高性能IO

0. Reactor 网络模型

RocketMQ 的 Broker 端基于 Netty 实现了主从 Reactor 模型

1.顺序读写

CommitLog存放消息是顺序写入,随机读

每个ConsumeQueue是顺序写,顺序读

(虽然是commitLog随机读,但整体还是从旧到新有序读,只要随机的那块区域还在Page Cache的热点范围内,还是可以充分利用Page Cache。)

RocketMQ 充分利用了 Page Cache,它 CommitLog 和 ConsumeQueue 在整体上看都是顺序读写。这样,能够利用上Pache cache预读的数据,不会产生大量缺页(Page Fault)中断而再次读取磁盘,减少磁盘IO次数。

同时顺序读写也保证了磁盘的读写性能(省去了大部分的寻址时间)

2.MMap,

rcmq和kafka不同,用的是mmap没用sendfile,多了一次数据拷贝和切换消耗

mmap: 把磁盘文件直接映射到用户空间里的虚拟内存,这样就省去了从内核缓冲区复制到用户空间的过程

也就是建立文件物理磁盘地址和用户内存空间的映射(并没有把文件载入,只是映射,读写时再载入),可以减少数据从内核到用户空降的拷贝过程

3.Page Cache

消息写入CommitLog文件的时候,并不是直接写入磁盘文件的,而是先进入OS的PageCache内存缓存中,然后再由OS的后台线程选一个时间,异步化的将OS PageCache内存缓冲中的数据刷入底层的磁盘文件。

在采用磁盘文件顺序写+OS PageCache写入+OS异步刷盘的策略,基本上可以让消息写入CommitLog的性能接近直接写入内存,所以正是如此,才可以让Broker高吞吐的处理每秒大量的消息写入。

image-20240901103219090

什么是DMA

DMA(Direct Memory Access) 直接内存访问

在 I/O 设备和内存进行数据传输的时候,数据传输的工作全部交由 DMA 控制器负责,而 CPU 这时候就可以抽身去处理别的事务。

PS:磁盘算外存,是输入输出设备(IO设备)

使用DMA之前

image-20240419205709241从上面的图可以看到,CPU 要参与整个数据读取、搬运的整个生命周期,而且在磁盘读取数据的过程中,CPU 是阻塞的,不能做其他任何的事情。

使用DMA之后

image-20240419205714181

在磁盘和 CPU 之间增加一个 DMA,负责磁盘数据搬运的工作

释放了 CPU 在磁盘数据搬运的阻塞等待,这时 CPU 可以解放双手做其他的事情了!

什么是零拷贝

MMAP

内存映射,就是将磁盘文件映射到用户空间的一段内存区域中,用户进程可以直接操作这段虚拟地址进行文件的读写等操作,系统都会自动会回写到对应的文件磁盘上。

调用 mmap 进行内存映射时,操作系统其实只是建立虚拟内存地址至物理地址的映射表,而实际并没有加载任何文件至内存中,只有当真正读写这段数据的时候,在 MMU 地址映射表中找不到逻辑指针对应的物理地址的时候,会发生缺页中断,将文件加载到内存。

image-20240419205718924

mmap 与 read、write 都是 unix/linux 下的指令函数,但是与之相比 mmap 通过内存直接操作磁盘文件,减少了用户空间与内核空间之间的一次数据拷贝

那么问题又来了,解决了用户态与内核态之间的数据传输,但是在内核缓冲到socket缓冲区之间还存在数据拷贝。

解决方法:系统调用函数 sendfile(),它可以替代前面的 read()write() 这两个系统调用,这样就可以减少一次系统调用,也就减少了 2 次上下文切换的开销

在网卡支持 SG-DMA 技术的情况下,网卡的 SG-DMA 控制器就可以直接将内核缓存中的数据拷贝到网卡的缓冲区里,此过程不需要将数据从操作系统内核缓冲区拷贝到 socket 缓冲区中,这样就减少了一次数据拷贝。

显而易见,零拷贝技术与传统文件传输的方式相比,那是提升了一个质的飞跃呀。原本需要 4 次上下文的切换、4 次数据拷贝,现在 2 次上下文的切换和数据拷贝

RocketMQ的刷盘机制

上面我讲了那么多的 RocketMQ 的架构和设计原理,你有没有好奇

Topic 中的 队列是以什么样的形式存在的?

队列中的消息又是如何进行存储持久化的呢?

我在上文中提到的 同步刷盘异步刷盘 又是什么呢?它们会给持久化带来什么样的影响呢?

下面我将给你们一一解释。

# 同步刷盘和异步刷盘

image-20240419205726290

如上图所示,在同步刷盘中需要等待一个刷盘成功的 ACK ,同步刷盘对 MQ 消息可靠性来说是一种不错的保障,但是 性能上会有较大影响 ,一般地适用于金融等特定业务场景。

而异步刷盘往往是开启一个线程去异步地执行刷盘操作。消息刷盘采用后台异步线程提交的方式进行, 降低了读写延迟 ,提高了 MQ 的性能和吞吐量,一般适用于如发验证码等对于消息保证要求不太高的业务场景。

一般地,异步刷盘只有在 Broker 意外宕机的时候会丢失部分数据,你可以设置 Broker 的参数 FlushDiskType 来调整你的刷盘策略(ASYNC_FLUSH 或者 SYNC_FLUSH)。

PS:默认都是先写入内存中的磁盘缓冲区Page Cache,然后再同步/异步刷到磁盘里

# 同步复制和异步复制

上面的同步刷盘和异步刷盘是在单个结点层面的,而同步复制和异步复制主要是指的 Borker 主从模式下,主节点返回消息给客户端的时候是否需要同步从节点。

  • 同步复制:也叫 “同步双写”,也就是说,只有消息同步双写到主从节点上时才返回写入成功

  • 异步复制:消息写入主节点之后就直接返回写入成功

随之的性能也会下降,所以需要程序员根据特定业务场景去选择适应的主从复制方案。

那么,异步复制会不会也像异步刷盘那样影响消息的可靠性呢?

答案是不会的,因为两者就是不同的概念,对于消息可靠性是通过不同的刷盘策略保证的,而像异步同步复制策略仅仅是影响到了 可用性 。为什么呢?其主要原因RocketMQ 是不支持自动主从切换的,当主节点挂掉之后,生产者就不能再给这个主节点生产消息了

比如这个时候采用异步复制的方式,在主节点还未发送完需要同步的消息的时候主节点挂掉了,这个时候从节点就少了一部分消息。但是此时生产者无法再给主节点生产消息了,消费者可以自动切换到从节点进行消费(仅仅是消费),所以在主节点挂掉的时间只会产生主从结点短暂的消息不一致的情况,降低了可用性,而当主节点重启之后,从节点那部分未来得及复制的消息还会继续复制。

在单主从架构中,如果一个主节点挂掉了,那么也就意味着整个系统不能再生产了。那么这个可用性的问题能否解决呢?一个主从不行那就多个主从的呗,别忘了在我们最初的架构图中,每个 Topic 是分布在不同 Broker 中的。


image-20240419205733569

但是这种复制方式同样也会带来一个问题,那就是无法保证 严格顺序 。在上文中我们提到了如何保证的消息顺序性是通过将一个语义的消息发送在同一个队列中,使用 Topic 下的队列来保证顺序性的。如果此时我们主节点 A 负责的是订单 A 的一系列语义消息,然后它挂了,这样其他节点是无法代替主节点 A 的,如果我们任意节点都可以存入任何消息,那就没有顺序性可言了。

而在 RocketMQ 中采用了 Dledger 解决这个问题。他要求在写入消息的时候,要求至少消息复制到半数以上的节点之后,才给客⼾端返回写⼊成功,并且它是⽀持通过选举来动态切换主节点的。这里我就不展开说明了,读者可以自己去了解。

也不是说 Dledger 是个完美的方案,至少在 Dledger 选举过程中是无法提供服务的,而且他必须要使用三个节点或以上,如果多数节点同时挂掉他也是无法保证可用性的,而且要求消息复制半数以上节点的效率和直接异步复制还是有一定的差距的。

存储机制

还记得上面我们一开始的三个问题吗?到这里第三个问题已经解决了。

但是,在 Topic 中的 队列是以什么样的形式存在的?队列中的消息又是如何进行存储持久化的呢? 还未解决,其实这里涉及到了 RocketMQ 是如何设计它的存储结构了。我首先想大家介绍 RocketMQ 消息存储架构中的三大角色——CommitLogConsumeQueueIndexFile

  1. CommitLog消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容,消息内容不是定长的。单个文件大小默认 1G ,文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。

  2. CommitLog消息主体以及元数据的存储主体,存储 Producer 端写入的消息主体内容,消息内容不是定长的。单个文件大小默认 1G ,文件名长度为 20 位,左边补零,剩余为起始偏移量,比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。

  3. IndexFile:索引文件的应用场景其实比较局限,是为了提供按照 Message Key 查询消息的能力。索引文件可以通过 Message Key,查询到消息在 CommitLog 中的物理偏移量,进而从 CommitLog 中查询消息。

    逻辑结构类似 HashMap,HashMap 以 Key-Value 形式存储数据,那么索引文件的存储格式也是 Key-Value

    Key:Message Key。索引文件的 Key 其实是 Message Key 经过 hash 得到的一个 Integer, ​ Value:physical offset。索引文件的 Value 主要是消息在 CommitLog 中的绝对物理偏移量。

总结来说,整个消息存储的结构,最主要的就是 CommitLoqConsumeQueue 。而 ConsumeQueue 你可以大概理解为 Topic 中的队列。

image-20240419205738668

RocketMQ 采用的是 混合型的存储结构 ,即为 Broker 单个实例下所有的队列共用一个日志数据文件来存储消息。有意思的是在同样高并发的 Kafka 中会为每个 Topic 分配一个存储文件。这就有点类似于我们有一大堆书需要装上书架,RockeMQ 是不分书的种类直接成批的塞上去的,而 Kafka 是将书本放入指定的分类区域的。

RocketMQ 为什么要这么做呢?原因是 提高数据的写入效率 ,不分 Topic 意味着我们有更大的几率获取 成批 的消息进行数据写入,但也会带来一个麻烦就是读取消息的时候需要遍历整个大文件,这是非常耗时的。

所以,在 RocketMQ 中又使用了 ConsumeQueue 作为每个队列的索引文件来 提升读取消息的效率。我们可以直接根据队列的消息序号,计算出索引的全局位置(索引序号*索引固定⻓度 20),然后直接读取这条索引,再根据索引中记录的消息的全局位置,找到消息。

讲到这里,你可能对 RockeMQ 的存储架构还有些模糊,没事,我们结合着图来理解一下。

image-20240419205743600

如果上面没看懂的读者一定要认真看下面的流程分析!

首先,在最上面的那一块就是我刚刚讲的你现在可以直接 ConsumerQueue 理解为 Queue

在图中最左边说明了红色方块代表被写入的消息,虚线方块代表等待被写入的。左边的生产者发送消息会指定 TopicQueueId 和具体消息内容,而在 Broker 中管你是哪门子消息,他直接 全部顺序存储到了 CommitLog。而根据生产者指定的 TopicQueueId 将这条消息本身在 CommitLog 的偏移(offset),消息本身大小,和 tag 的 hash 值存入对应的 ConsumeQueue 索引文件中。而在每个队列中都保存了 ConsumeOffset 即每个消费者组的消费位置(我在架构那里提到了,忘了的同学可以回去看一下),而消费者拉取消息进行消费的时候只需要根据 ConsumeOffset 获取下一个未被消费的消息就行了。

上述就是我对于整个消息存储架构的大概理解(这里不涉及到一些细节讨论,比如稀疏索引等等问题),希望对你有帮助。

因为有一个知识点因为写嗨了忘讲了,想想在哪里加也不好,所以我留给大家去思考 🤔🤔 一下吧。

为什么 CommitLog 文件要设计成固定大小的长度呢?提醒:内存映射机制

消息清理

由于消息是存磁盘的,但是磁盘空间是有限的,所以对于磁盘上的消息是需要清理的。

4.6版本凌晨4点默认会删除48小时不再使用的CommitLog文件

  • 检查这个文件最后访问时间

  • 判断是否大于过期时间

  • 指定时间删除,默认凌晨4点

常见问题解决

消息重复消费

影响消息正常发送和消费的重要原因是网络的不确定性。

引起重复消费的原因

1.生产者

生产者发送消息到broker持久化,broker返回应答给生成者时网络波动或者生产者宕机导致应答发送失败,生产者意识到消息发送失败进行重试

2.消费者

正常情况下在consumer真正消费完消息后应该发送ack,ack通知broker该消息已正常消费

当ack因为网络原因无法发送到broker,broker会认为词条消息没有被消费,此后会开启消息重投机制把消息再次投递到consumer

解决方案

一般都在消费者里做幂等性方案

幂等有两种实现1.业务幂等 2.消息去重

业务幂等指本来多次调用和一次调用的结果都是一样的,这样就不需要再去重了

1.数据库消息记录表唯一索引

2.redis setnx+唯一ID去重

发送时把唯一ID放进消息里,消费者消费时到redis里查是否存在该ID

消息顺序消费

如何让RocketMQ保证消息的顺序消费

首先多个queue只能保证单个queue里的顺序,queue是典型的FIFO,天然顺序。多个queue同时消费是无法绝对保证消息的有序性的

局部顺序消息

只需要保证某一组消息被顺序消费

那么就对消息分组,保证每组消息被发送到一个队列里

追问:怎么保证消息发到同一个queue? Rocket MQ给我们提供了MessageQueueSelector接口,可以自己重写里面的接口,实现自己的算法,举个最简单的例子:判断i % 2 == 0,那就都放到queue1里,否则放到queue2里。

for (int i = 0; i < 5; i++) {
    Message message = new Message("orderTopic", ("hello!" + i).getBytes());
    producer.send(
        // 要发的那条消息
        message,
        // queue 选择器 ,向 topic中的哪个queue去写消息
        new MessageQueueSelector() {
            // 手动 选择一个queue
            @Override
            public MessageQueue select(
                // 当前topic 里面包含的所有queue
                List<MessageQueue> mqs,
                // 具体要发的那条消息
                Message msg,
                // 对应到 send() 里的 args,也就是2000前面的那个0
                Object arg) {
                // 向固定的一个queue里写消息,比如这里就是向第一个queue里写消息
                if (Integer.parseInt(arg.toString()) % 2 == 0) {
                    return mqs.get(0);
                } else {
                    return mqs.get(1);
                }
            }
        },
        // 自定义参数:0
        // 2000代表2000毫秒超时时间
        i, 2000);
}

image-20240419205749666

全局顺序消息

要保证全局顺序消息, 需要先把 Topic 的读写队列数设置为 一,然后Producer Consumer 的并发设置,也要是一。简单来说,为了保证整个 Topic全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理 ,这时候就完全牺牲RocketMQ的高并发、高吞吐的特性了。

image-20240419205758879

消息不丢失

如何保证消息不丢失

首先在如下三个部分都可能会出现丢失消息的情况:

Producer端 Broker端 Consumer端

Producer端

在生产阶段,主要通过请求确认机制,来保证消息的可靠传递

  • 1、同步发送的时候,要注意处理响应结果和异常。如果返回响应OK,表示消息成功发送到了Broker,如果响应失败,或者发生其它异常,都应该重试。 我们可以等待返回结果,根据返回结果决定是否重试

  • 2、异步发送的时候,应该在回调方法里检查,如果发送失败或者异常,都应该进行重试。我们可以在回调方法里处理

  • oneway不等待结果返回,会丢失消息,在不需要确保消息发送且需要高性能时才使用

  • 3.通过事务消息的回查机制保证消息发送不丢失

生产者发送消息失败怎么办?

rcmq客户端默认有重试机制,发送普通消息时默认重试

如果要发送有序消息,rcmq默认不重试,这种情况可以捕获异常重试/执行自定义逻辑

通过sendresult中的sendstatus判断是否发送成功

通过自定义实现SendCallbackonSuccess()方法和onException()方法

rocketmq发送消息的三种方式_rocketmq syncsend-CSDN博客

Broker端 1.修改刷盘策略为同步刷盘,防止异步刷盘过程中broker宕机丢失消息(此时消息只在内存中没刷盘,断电就丢失)。默认情况下是异步刷盘的 flushDiskType = SYNC_FLUSH

2.集群部署,主从模式,消息在slave有备份,保证高可用。 Consumer端

我们能做到的:

1.完全处理完消费业务逻辑后再进行手动ack确认

因为消息队列维护了消费的位置,逻辑执行失败了,没有确认,再去队列拉取消息,就还是之前的一条。

rocketmq帮忙保证的

消费重试机制

消费重试

消费重试是什么?

消费者在消费某条消息失败并返回消费失败状态给broker后,broker服务端会把消息转移到重试队列里,根据重试策略重新向客户端投递该消息超过一次定数后若还未消费成功,则该消息将不再继续重试,直接被发送到死信队列中

什么时候认为消费失败需要重试

客户端返回消费失败 ReconsumeLater,抛出非预期异常,或消息处理超时(包括在 PushConsumer 中排队超时),

只要服务端服务端一定时间内没收到消费成的响应,就会重新投递消息给消费者。

重试频次

消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

####

消息状态

package org.apache.rocketmq.client.consumer.listener;
public enum ConsumeConcurrentlyStatus {
  /**
  * Success consumption
  */
  CONSUME_SUCCESS,
  /**
  * Failure consumption,later try to consume
  */
  RECONSUME_LATER;
}

可以看到,消息的状态分为成功或者失败需要重试

重试场景

1.异常重试

2.超时重试

异常重试

消息正常到了消费者,但是消费者处理逻辑失败,抛出异常

在消息监听器实现中手写返回逻辑

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {

                for(MessageExt msg :msgs){
                    System.out.println("msg="+msg.getMsgId());
                    System.out.println("date="+new Date());
                    System.out.println("ReconsumeTimes="+msg.getReconsumeTimes());
                    System.out.println();
                }
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                //  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

由于重试的实现broker已经实现了,客户端只需要不处理成功就不返回CommitSuccess导致offset移位就行

超时重试

当consumer业务逻辑处理时间过长,没返回消费状态给broker。 broker认为超时就会重试

(跟异常重试其实是一样的,broker超时没收到成功信息就会一直重试)

自定义消息重试次数

消费者可以通过getReconsumeTimes 获取消息的重试次数

可以自定义设置超过3次直接返回Commit Success

 if(msgs.get(0).getReconsumeTimes() >= 3){
          // 重试3次后,不再进行重试
          return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       }

重试队列和死信队列

重试队列

消费端一直不回传消费成功结果,MQ认为消息没有收到,Consumer下一次拉取,Broker依然会发送该消息,

所以,任何异常都要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,这样MQ会将消息放到重试队列;(返回了LATER才会把消息放到重试队列)

重试队列名称为:%RETRY%+consumergroup

如果异常一直不返回LATER,会一直重试吗?

死信队列

什么是死信队列

消费重试达到最大重试次数后依然消费失败,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中;这个队列就是死信队列

死信队列的特征

1、死信队列中的消息不会再被消费者正常消费,即DLQ对于消费者是不可见的

2、死信存储有效期与正常消息相同,均为 3 天(commitlog文件的过期时间),3 天后会被自动删除

3、死信队列就是一个特殊的Topic,名称为%DLQ%consumerGroup@consumerGroup,即每个消费者组都有一个死信队列

4、如果⼀个消费者组未产生死信消息,则不会为其创建相应的死信队列

死信队列的处理

1、实际上,当⼀条消息进入死信队列,就意味着系统中某些地方出现了问题,从而导致消费者无法正常消费该消息,比如代码中原本就存在Bug。

2、因此,对于死信消息,通常需要开发人员进行特殊处理;关键的步骤是要排查可疑因素,解决代码中可能存在的Bug,然后再将原来的死信消息再次进行投递消费。

消息堆积

下游消费系统如果宕机了,导致几百万条消息在消息中间件里积压,此时怎么处理?

你们线上是否遇到过消息积压的生产故障?如果没遇到过,你考虑一下如何应对?

1.首先要找到是什么原因导致的消息堆积,是Producer太多了,Consumer太少了导致的还是说其他情况,总之先定位问题。

2.然后看下消息消费速度是否正常,正常的话,可以通过上线更多consumer临时解决消息堆积问题

解决方法:提高消费能力

  • 消费者扩容:如果当前Topic的Message Queue的数量大于消费者数量,就可以对消费者进行扩容,增加消费者,来提高消费能力,尽快把积压的消息消费玩。

  • 消息迁移Queue扩容:如果当前Topic的Message Queue的数量小于或者等于消费者数量,这种情况,再扩容消费者就没什么用,就得考虑扩容Message Queue。可以新建一个临时的Topic,临时的Topic多设置一些Message Queue,然后先用一些消费者把消费的数据丢到临时的Topic,因为不用业务处理,只是转发一下消息,还是很快的。接下来用扩容的消费者去消费新的Topic里的数据,消费完了之后,恢复原状。

!image-20240419205806829

追问:如果Consumer和Queue不对等,上线了多台也在短时间内无法消费完堆积的消息怎么办?

采用消息迁移queue扩容

准备一个临时的topic

queue的数量是堆积的几倍

queue分布到多Broker中

上线一台Consumer做消息的搬运工,把原来Topic中的消息挪到新的Topic里,不做业务逻辑处理,只是挪过去

上线N台Consumer同时消费临时Topic中的数据

改bug

恢复原来的Consumer,继续消费之前的Topic

追问:堆积时间过长消息超时了? RocketMQ中的消息只会在commitLog被删除的时候才会消失,不会超时。也就是说未被消费的消息不会存在超时删除这情况。

追问:堆积的消息会不会进死信队列? 不会,消息在消费失败后会进入重试队列(%RETRY%+ConsumerGroup),16次(默认16次)才会进入死信队列(%DLQ%+ConsumerGroup)

消息过滤

有两种方案:

  • 一种是在 Broker 端按照 Consumer 的去重逻辑进行过滤,这样做的好处是避免了无用的消息传输到 Consumer 端,缺点是加重了 Broker 的负担,实现起来相对复杂。

  • 另一种是在 Consumer 端过滤,比如按照消息设置的 tag 去重,这样的好处是实现起来简单,缺点是有大量无用的消息到达了 Consumer 端只能丢弃不处理。

    一般采用Cosumer端过滤,如果希望提高吞吐量,可以采用Broker过滤。

    对消息的过滤有三种方式:

image-20240419205812338

根据Tag过滤:这是最常见的一种,用起来高效简单

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

SQL 表达式过滤:SQL表达式过滤更加灵活

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); // 只有订阅的消息有这个属性a, a >=0 and a <= 3 consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
   @Override    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   }
});
consumer.start();

Filter Server 方式:最灵活,也是最复杂的一种方式,允许用户自定义函数进行过滤

应用

安装与启动

RocketMQ的下载与安装(全网最细保姆级别教学)rocketmq下载安装舒一笑的博客-CSDN博客

RocketMQ保姆级教程 - 掘金 (juejin.cn)

安装rcmq

环境要求:

  • Linux64位系统

  • JDK1.8(64位)

安装


$ wget https://dist.apache.org/repos/dist/release/rocketmq/5.1.1/rocketmq-all-5.1.1-bin-release.zip

$ unzip rocketmq-all-5.1.1-bin-release.zip

目录介绍

  • bin:启动脚本,包括shell脚本和CMD脚本

  • conf:实例配置文件 ,包括broker配置文件、logback配置文件等

  • lib:依赖jar包,包括Netty、commons-lang、FastJSON等

启动rcmq

0.开放服务器端口

我们在安装rocketmq后,要开放的端口一般有4个:9876,10911,10912,10909

RocketMQ服务中各端口号说明_rocketmq 端口-CSDN博客

1.配置修改

修改jvm参数

在启动NameServer,broker之前,修改一下启动时的jvm参数,因为默认的参数都比较大,为了避免内存不够,建议修改小,否则无法启动

修改runbroker.sh runserver.sh

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx25	6m"

将启动jvm内存参数调小

修改conf/broker.conf

这里需要改一下Broker配置文件,需要指定NameServer的地址,因为需要Broker需要往NameServer注册

在文件末尾追加namesrv地址

namesrvAddr = localhost:9876

因为NameServer跟Broker在同一台机器,所以是localhost,NameServer端口默认的是9876。

文件末尾继续追加brokerIp,IP值是当前部署broker的服务器外网IP

brokerIP1 = 192.168.200.143
brokerIP2 = 192.168.200.143

因为Broker向NameServer进行注册的时候,带过去的ip如果不指定就会自动获取,但是自动获取的有个坑,就是有可能客户端无法访问到这个自动获取的ip,所以我建议手动指定客户端可以访问到的服务器ip。image-20240419205822214

2.启动NameServer

sh ./mqnamesrv

cd /root/rocketmq/rocketmq-all/bin

nohup sh `./mqnamesrv` &

在bin目录下执行

jps查看当前已启动的java进程

image-20231015121410221

出现了namesrvstartup即为成功

3.启动Broker

进入bin目录执行

nohup sh ./mqbroker -c ../conf/broker.conf -n localhost:9876 autoCreateTopicEnable=true &

jps查看当前已启动的java进程

image-20231015121410221

出现brokerstartup即为成功

nohup sh ./mqbroker -c ../conf/broker.conf -n 154.8.204.64:9876 autoCreateTopicEnable=true &

4.搭建可视化控制台

linux - nohup 命令 &后一按回车就exit_nohup回车后就退出-CSDN博客

RocketMQ保姆级教程 - 掘金 (juejin.cn)

cd /root/rocketmq/rcmq-console

nohup `/root/java/jdk1.8.0_151/bin/java -jar -jar -server -Xms256m -Xmx256m -Drocketmq.config.namesrvAddr=localhost:9876 -Dserver.port=8088 ./rcmq-console.jar` & 

image-20231015123807447

5.创建主题

Apache RocketMQ 5.0版本下创建主题操作,推荐使用mqadmin工具,需要注意的是,对于消息类型需要通过属性参数添加。示例如下:

/bin/mqadmin updateTopic -c DefaultCluster -t DelayTopic -n 127.0.0.1:9876 -a +message.type=DELAY

虽然我们可以在发消息时创建主题,但是最好手动来创建

springboot整合

原生使用

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>5.1.1</version>
</dependency>

生产者

public class Producer {
    public static void main(String[] args) throws Exception {
        //创建一个生产者,指定生产者组为StarGeo
        DefaultMQProducer producer = new DefaultMQProducer("StarGeo");

        // 指定NameServer的地址
        producer.setNamesrvAddr("154.8.204.64:9876");
        // 第一次发送可能会超时,我设置的比较大
        producer.setSendMsgTimeout(1000000);

        // 启动生产者
        producer.start();

        // 创建一条消息
        // topic为HomuraAkime
        // 消息内容为homura daisuki
        // tags 为 homura
        Message msg = new Message("HomuraAkime", "homura", "homura daisuki ".getBytes(RemotingHelper.DEFAULT_CHARSET));

        // 发送消息并得到消息的发送结果,然后打印
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);

        // 关闭生产者
        producer.shutdown();
    }

}

构建一个消息生产者实例,然后指定生产者组

指定NameServer的地址:服务器的ip:9876,因为需要从NameServer拉取Broker的信息

producer.start() 启动生产者

构建一个消息,指定这个消息往目标topic发送

producer.send(msg):发送消息,打印结果

消费者

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 通过push模式消费消息,指定消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("StarGeoConsumer");

        // 指定NameServer的地址
        consumer.setNamesrvAddr("154.8.204.64:9876");

        // 订阅这个topic下的所有的消息
        consumer.subscribe("HomuraAkime", "*");

        // 注册一个消费的监听器,当有消息的时候,会回调这个监听器来消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("消费消息:%s", new String(msg.getBody()) + "\n");
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }

}
  • 创建一个消费者实例对象,指定消费者组为

  • 指定NameServer的地址:服务器的ip:9876

  • 订阅xxx 这个topic的所有信息

  • consumer.registerMessageListener ,这个很重要,是注册一个监听器,这个监听器是当有消息的时候就会回调这个监听器,处理消息,所以需要用户实现这个接口,然后处理消息。(异步的还是同步阻塞的?)

  • 启动消费者

启动之后,消费者就会消费刚才生产者发送的消息

集成SpringBoot

常见问题 · apache/rocketmq-spring Wiki (github.com)

官方文档

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>

缺点:相比于官方最新版本相对滞后,有些特性不支持

教程较少,用起来不是特别舒服

yml配置

rocketmq:
  producer:
    group: homura
  name-server: 154.8.204.64:9876

创建消费者

SpringBoot底下只需要实现RocketMQListener接口,然后加上@RocketMQMessageListener注解即可

@Component
@RocketMQMessageListener(consumerGroup = "madoka", topic = "love")
public class MadokaConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String msg) {
        System.out.println( msg);
    }
}

@RocketMQMessageListener需要指定消费者属于哪个消费者组,消费哪个topic,NameServer的地址已经通过yml配置文件配置类获取

消费者消费消息时会执行onMessage里的自定义逻辑

@RocketMQMessageListener 注解处理

img

从这可以看出,会为每一个加了@RocketMQMessageListener注解的对象创建一个DefaultMQPushConsumer,所以最终也是通过DefaultMQPushConsumer消费消息的。

至于监听器,是在这

img

遍历每条消息,然后调用handleMessage,最终会调用实现了RocketMQListener的对象处理消息。

生产者

image-20231015220333544

先注入RocketMQTemplate

image-20231216090759072

直接发送消息,绑定对应的topic和消息内容

NameServer的地址和生产者组名已经通过yml配置文件配置类获取

Java Class

1.DefaultMQProducer

2.DefaultMQPushConsumer

3.Message

DefaultMQProducer

DefaultMQProducer类是应用发送消息使用的基类,封装一些通用的方法方便开发者在更多场景中使用。属于线程安全类,在配置并启动后可在多个线程间共享此对象。 其可以通过无参构造方法快速创建一个生产者,通过getter/setter方法,调整发送者的参数。主要负责消息的发送,支持同步/异步/oneway的发送方式,这些发送方式均支持批量发送。 image-20231015225232462

DefaultMQPushConsumer

DefaultMQPushConsumer类是rocketmq客户端消费者的实现,从名字上已经可以看出其消息获取方式为broker往消费端推送数据,其内部实现了流控,消费位置上报等等。DefaultMQPushConsumer是Push消费模式下的默认配置。

image-20231015225350160

Message

MessageExt

事务消息实战

基于RocketMQ分布式事务 - 完整示例 - 知乎 (zhihu.com)

SpringCloud 集成RocketMQ实现分布式事务 - 掘金 (juejin.cn)

常见问题 · apache/rocketmq-spring Wiki (github.com)

针对不同的分布式场景业界常见的解决方案有2PC、TCC、可靠消息最终一致性、最大努力通知这几种。

rocketmq属于可靠消息最终一致性,但是借鉴了2PC的思想并且实现了自己的补偿机制(回查)

rocketmq把消息的发送分为准备和提交两个阶段,本地事务执行成功后才提交消息(2PC)

普通的分布式事务实现

1.定义半消息的发送者

image-20231025170242430

如图,通过rocketMQTemplate.sendMessageInTransaction发送半消息,用MessageBuider.withPayload构建messaging消息

在方法内,org.springframework.messaging消息(spring封装的)会被转成rocketmq的消息

image-20231025170352566

通过TransactionSendResult获取发送事务消息的结果,在本地事务执行完返回给broker状态信息(UNKNOWN/COMMIT/ROLLBACK)时,返回给消息发送者,消息发送者处理返回消息,可以返回(业务中不需要的情况下也可以不等待返回值直接返回)

image-20231025171010668

image-20231025170828018

image-20231025170624752

2.定义生产者本地事务监听器

image-20231025171122912

如图,要带上@RocketMQTransactionListener,实现RocketMQLocalTransactionListener,复写本地事务执行方法和回查方法

根据事务执行情况返回COMMIT/ROLLBACK/UNKNOWN

3.在接口内调用生产者发送半消息

image-20231025171408591

如图,这里调用了注入的生产者中的方法,传递topic,business,businessid,事务唯一ID,要发送的对象

PS:我们这里的业务规范是,分布式事务的实现采取统一实现,部分特殊的另外再写

统一分布式事务实现规范都调用统一的defaultlistener和produce中的不同方法来处理,在生产者事务监听器中,执行本地事务时根据不同的business来选择执行不同的本地事务方法实现

我们有统一的三个业务字段,用来写日志表,business,businessid,事务唯一ID,事务唯一ID通过UUID或redis实现

4.消费者接受消息开始消费

这步和普通消息的实现一样,没啥特殊的

遇到的问题

1.

image-20231025172051077

2.

rocketmq-spring-boot-starter 2.1.0 事务消息 txProducerGroup 移除解读-CSDN博客

[Spring Boot集成RocketMq如何发送多个事务消息? - 掘金 (juejin.cn)](https://juejin.cn/post/7133245434201374734)

kay1010/spring-boot-rocketmq: spring-boot基于starter整合rocketmq (github.com)

问题描述:

2.1.0后rocketmq事物消息删除了txProducerGroup参数。 项目中只能有一个@RocketMQTransactionListener, 不能出现多个,

也就是说,我们只能有一个事务监听器了!!!

那么如果我们有很多分布式事务的场景需要发送半消息,并执行不同的本地事务该怎么办?

解决方案

1.在同一个RocketMQLocalTransactionListener根据参数判断然后去执行不同的本地事务

2.优雅的解决:使用@ExtRocketMQTemplateConfiguration拓展template

这里说下方法2的实现

如果想要多个事务监听器,设置不同的rocketMQTemplate即可

我们可以通过使用@ExtRocketMQTemplateConfiguration拓展template

我们需要:

0.用@ExtRocketMQTemplateConfiguration拓展一个新的template

image-20231025175136398

1.在半消息的发送者中加入新template发消息的实现

image-20231025175217603

如图,改用新temlate发消息

2.定义生产者本地事务监听器

image-20231025175314836

如图,定义一个新的事务监听器,然后指定 rocketMQTemplateBeanName

3.在接口内调用生产者发送半消息

image-20231025175356582

如图,调用在步骤1写的新方法

自定义消费者重试解决策略

代替死信队列,因为我不会用程序监测死信

image-20231030160313598

MessageExt

是rocketmq的原生消息类

可以用它来拿到消息重试次数等信息

它的body是byte数组,转成string就是我们正常要接受的string消息

string消息可以是一个对象转成的json字符串

我们可以监测重试次数,如果出现异常的话,就执行回滚逻辑,或者通知技术人员处理

通知技术人员处理可以是:websocket/短信通知

我们项目选择短信通知,项目的技术维护人员信息+联系方式是持久化在项目里的,遇到问题时会查表,向所有技术人员发送短信通知报错信息

事务消息解决的问题

问题:

目前看来事务消息的目的就是保证创建订单和消息发送的一致性

为什么不能先创建订单 在发送消息消息失败 或者发送状态是不是SEND_OK 就回滚事务

这样不是也是一致的嘛?

回答:

事务消息可以防止下列数据不一致场景,而等待消息发送状态看是否回滚无法做到

1.本地先预减完缓存,消息发送超时了,本地接收到send result判断消息发送失败 选择回滚,然而超时的消息还会被消费者消费,造成了不一致(一个回滚了,一个却执行了)

2.本地库存预减成功了,然后系统挂了,这个时候还没来得及发送消息(此时本地事务已执行,而消费者事务不执行,数据不一致),所以必须要在执行事务前发送一条half消息

延时消息实战

使用RocketMQTemplate发送各种消息 - 掘金 (juejin.cn)

springboot+rocketmq(4):实现延时消息rocketmqtemplate发送延迟消息12程序猿的博客-CSDN博客

1.构建生产者工具类

image-20231023203741690

timeout指定的是消息发送超时间

delaylevel来指定消息延时时长

rcmq 5.x才开始支持精确延时,目前还是阶梯指定的

2.在业务代码使用生产者工具类发消息

image-20231023203920685

比如刚创建完订单时

3.构建消费者,自定义消费逻辑

image-20231023204041894

这里是简单的逻辑,实战时还需要考虑redis保证幂等性,根据订单状态判断是否需要回滚库存,根据本地事务方法执行结构,返回成功或失败信息

PS:如果是原生的话,需要在消费者监听器的consumeMessage方法中手动返回消费是否成功的消息,broker据此判断是否重试。

但是如果使用spring版本的,我们只能实现RocketMQListener<MessageExt>接口的onMessage方法,而此方法是void型的,那么消费失败时如何重试呢?

rocketmq的github上的issue中找到了答案,原来这里默认就是消费者处理时抛出异常时就会自动重试

image-20231023210529293

image-20231023210620377

如图,我们抛出异常,消费重试了3次

RocketMq-Spring 请求应答语义支持_rocketmq返回值-CSDN博客

RocketMQReplyListener

用来返回消息处理结果给生产者

生产者这样接收

String replyString = rocketMQTemplate.sendAndReceive("stringRequestTopic", "request string", String.class);

批量消息发送

批量消息发送 | RocketMQ (apache.org)

好像用不着,有那样需求的直接传list呗?