Kafka
Kafka基本原理详解(超详细!)_kafka工作原理-CSDN博客
看完这篇Kafka,你也许就会了Kafka_kafka心的步伐csdn-CSDN博客
kafka专题:kafka的消息丢失、重复消费、消息积压等线上问题汇总及优化_java kafk数据积压导致其他队列消息丢失-CSDN博客
一看就会的kafka多线程顺序消费【内附Demo哦】_kafka顺序消费-CSDN博客
Kafka是什么,以及如何使用SpringBoot对接Kafka_现在对接一个三方硬件,厂商将数据发送到kafka的,我需要搭建一个监听服务来处理数-CSDN博客
是什么
一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式发布-订阅消息系统
它的最大的特性就是实时,大数据量,高吞吐量,高性能,分布式,可拓展
特性
同时为发布和订阅提供高吞吐量。kafka每秒可以生产约25万消息(50MB)、每秒处理55万消息(110MB),轻松百万级吞吐量
消息持久化到磁盘,有冗余备份机制(replication)保证高可用
分布式系统,易于向外扩展。所有的producer、broker和consumer均为分布式存在。可直接在线扩展集群机器
消息被处理的状态由consumer端维护,而不是由server端维护
scala语言开发,运行在JVM上
多Partition分布式存储,负载均衡,提高吞吐量,同时方便随时加机器拓展
基础架构
Kafka像其他Mq一样,也有自己的基础架构,主要存在生产者Producer、Kafka集群Broker、消费者Consumer、注册消息Zookeeper.
核心概念
Producer:生产者
Message:每一条发送的消息主体。(offset,消息大小,消息体)
Consumer:消费者,即消息的消费方,是消息的出口。
Consumer Group:同一个分区的数据只能被某消费者组中的某一个消费者消费。
Topic:消息的主题,消息的逻辑分类
Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……
Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!(消息的物理存储分类)
Replication:每一个分区都有多个副本用于保证高可用。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本
Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。注册中心
Leader:每个分区多个副本的主角色,生产者发送数据的对象,以及消费者消费数据的对象都是Leader。
Follower:每个分区多个副本的从角色,实时的从Leader中同步数据,保持和Leader数据的同步,Leader发生故障的时候,某个Follower会成为新的Leader
kafka为什么高性能
顺序读写
kafka 的消息是不断追加到文件中的,这个特性使 kafka 可以充分利用磁盘的顺序读写性能,顺序读写不需要硬盘磁头的寻道时间,只需很少的扇区旋转时间,所以速度远快于随机读写。
零拷贝,减少数据的拷贝 减少了应用程序与操作系统上下文切换
批量发送
Kafka 允许进行批量发送消息,先将消息缓存在内存中,然后一次请求批量发送出去,比如可以指定缓存的消息达到某个量的时候就发出去,或者缓存了固定的时间后就发送出去,如100条消息就发送,或者每5秒发送一次,这种策略将大大减少服务端的I/O次数。
稀疏索引加二分查找,快速读取要消费的数据
长轮询
Push与Pull
作为一个消息系统,Kafka遵循了传统的方式,选择由Producer向broker push消息并由Consumer从broker pull消息。
push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成Consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据Consumer的消费能力以适当的速率消费消息。
对于Kafka而言,pull模式更合适。pull模式可简化broker的设计,Consumer可自主控制消费消息的速率,同时Consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
且由于kafka broker会持久化数据到磁盘,broker没有内存压力。因此,consumer非常适合采用pull的方式消费数据
工作流程
Producer push发送消息 写入broker consumer pull拉数据消费
发送数据
follwer和leader
produce写数据永远是写入leader,然后对应分区的follower会主动向leader pull同步消息数据。
消息路由
分区是为了负载均衡,如果某个topic有多个partition,producer又怎么知道该将数据发往哪个partition呢?
partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
如果既没指定partition,又没有设置key,则会轮询选出一个partition。
broker存储数据
顺序写:
producer采用push模式将数据发布到broker,每条消息追加append 写(顺序写,性能高) 到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的
文件存储
partition结构
Kafka文件存储也是通过本地落盘的方式存储的,主要是通过相应的log与index等文件保存具体的消息文件。
由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引的机制,将每个partition分为多个segment。
Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。
查找效率为什么高?
数据分段
kafka解决查询效率的手段之一是将数据分段 segment
为数据建立索引
为了进一步提高查找效率,kafka为每个分段后的数据文件建立了索引文件,.index
索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引
索引包含两个部分(均为4字节的数字),分别为相对应的offset和position
索引优化
稀疏存储,每隔一定字节的数据建立一条索引
Kafka中采用了稀疏索引的方式读取索引,kafka每当写入了4k大小的日志(.log),就往index里写入一个记录索引。其中会采用二分查找。
二分查找:先二分找到对应的segment文件(segment文件本身代表一个offset,起始值就是),然后到segment里的.index文件里继续二分
position Position:物理位置(磁盘上面哪个地方)
存储策略
无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢?
基于时间,默认配置是168小时(7天)。
基于大小,默认配置是1073741824。
需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能
消费数据
与producer相同的是,消费者在拉取消息的时候也是找leader去拉取
多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id
这里partition和consumer的对应关系和 queue和consumer一样
生产者
发送重试
为保证producer发送的数据能够可靠的发送到指定的topic中,topic的每个partition收到producer发送的数据后,都需要向producer发送ackacknowledgement,如果producer收到ack就会进行下一轮的发送,否则重新发送数据。
注:这张图有误导性,生产者发送消息都会发给leader,follower去pull leader同步数据
ISR(同步副本集)
猜想
如果leader收到数据,所有的follower开始同步数据,但有一个follower因为某种故障,迟迟不能够与leader进行同步,那么leader就要一直等待下去,直到它同步完成,才可以发送ack,此时需要如何解决这个问题呢?
解决
leader中维护了一个动态的ISR(in-sync replica set),即与leader保持同步的follower集合,当ISR中的follower完成数据的同步之后,给leader发送ack,如果follower长时间没有向leader同步数据,则该follower将从ISR中被踢出,该之间阈值由replica.lag.time.max.ms参数设定
生产者ack机制
对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没有必要等到ISR中所有的follower全部接受成功。
Kafka为用户提供了三种可靠性级别,用户根据可靠性和延迟的要求进行权衡选择不同的配置。
ack参数配置
0:producer不等待broker的ack,这一操作提供了最低的延迟,broker接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据
1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将丢失数据。(只是leader落盘)
-1(all):producer等待broker的ack,partition的leader和ISR的follower全部落盘成功才返回ack
生产者什么时候收到ack返回:0:直接返回,异步 1:等leader落盘后返回 -1:等所有follower同步落盘成功后返回
幂等性
在0.11版本的Kafka之前,只能保证数据不丢失,在下游对数据的重复进行去重操作,多余多个下游应用的情况,则分别进行全局去重,对性能有很大影响。
0.11版本的kafka,引入了一项重大特性:幂等性,幂等性指代Producer不论向Server发送了多少次重复数据,Server端都只会持久化一条数据
启用幂等性,即在Producer的参数中设置enable.idempotence=true即可,Kafka的幂等性实现实际是将之前的去重操作放在了数据上游来做,开启幂等性的Producer在初始化的时候会被分配一个PID,发往同一个Partition的消息会附带Sequence Number,而Broker端会对<PID,Partition,SeqNumber>做缓存,当具有相同主键的消息的时候,Broker只会持久化一条。
但PID在重启之后会发生变化,同时不同的Partition也具有不同的主键,所以幂等性无法保证跨分区跨会话的Exactly Once。
自定义分区
kafka中的自定义分区器使用详解_kafka 自定义partition-CSDN博客
1、没有设置key我们的消息就会被轮训的发送到不同的分区。
2、设置了key : 使用kafka自带的分区器,会根据key计算出来一个hash值,这个hash值会对应某一个分区。如果key相同的,那么hash值必然相同,key相同的值,必然是会被发送到同一个分区
生产者在生产数据的时候,可以为每条消息人为地指定key,这样消息被发送到broker时,会根据分区规则选择消息将被存储到哪一个分区中。如果分区规则设置合理,那么所有的消息将会被均匀/线性的分布到不同的分区中,这样就实现了负载均衡和水平扩展。
但是有些比较特殊的时候,我们就需要自定义分区
可以通过 实现 org.apache.kafka.clients.producer.Partitioner 接口,这个实现类可以根据自己的业务规则进行自定义制定分区,如根据hash算法指定分区的分布规则。
/**
* @author:xsluo
* @date:2020/7/10
* @aim:自定义分区器
*/
public class MyPartitioner implements Partitioner {
/**
* 自定义kafka分区主要解决用户分区数据倾斜问题 提高并发效率(假设 3 分区)
* @param topic 消息队列名
* @param key 用户传入key
* @param keyBytes key字节数组
* @param value 用户传入value
* @param valueBytes value字节数组
* @param cluster 当前kafka节点数
* @return 如果3个分区,返回 0 1 2
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//获取topic的partitions信息
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
int partitionsNum = partitionInfos.size();
//为特定的key自定义分区规则
if (key.toString().equals("key666") || key.toString().equals("key888")) {
//分配到最后一个分区
return partitionsNum - 1;
}
//其它的key采用默认分区规则
return key.toString().hashCode() % (partitionsNum - 1);
// return key.toString().hashCode() % (partitionsNum);
}
public void close() {
}
public void configure(Map<String, ?> map) {
}
}
消费者
消费方式
consumer采用pull拉的方式来从broker中读取数据。
push推的模式很难适应消费速率不同的消费者,因为消息发送率是由broker决定的,它的目标是尽可能以最快的速度传递消息,但是这样容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull方式则可以让consumer根据自己的消费处理能力以适当的速度消费消息。
pull模式不足在于如果Kafka中没有数据,消费者可能会陷入循环之中 (因为消费者类似监听状态获取数据消费的),一直返回空数据,针对这一点,Kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可供消费,consumer会等待一段时间之后再返回,时长为timeout(kafka消费模型是长轮询)
Offset
由于Consumer在消费过程中可能会出现断电宕机等故障,Consumer恢复以后,需要从故障前的位置继续消费,所以Consumer需要实时记录自己消费到了那个offset,以便故障恢复后继续消费。
consumer记录消费到的位置offset,消费完后会将该offset提交到kafka中,kafka只保存已提交的offset
offset表示这条消息在他的partition中的偏移量,唯一标识一条消息
消费者从订阅的主题消费消息,消费消息的偏移量保存在Kafka的名字是 __consumer_offsets 的主题中。
消费者还可以将自己的偏移量存储到Zookeeper,需要设置offset.storage=zookeeper。
推荐使用Kafka存储消费者的偏移量。因为Zookeeper不适合高并发。
auto.offset.reset:
作用:当消费者在消费过程中无法在本地找到offset,所选择的消费策略 默认值为:latest
三者均有共同定义: 对于同一个消费者组,若已有提交的offset,则从提交的offset开始接着消费
earliest
如果这个topic有历史消息,现在新启动了一个消费者组,消费者会从头开始消费历史信息
latest
如果这个topic有历史消息,现在新启动了一个消费者组,消费者会从连接上broker后接受的第一个消息开始消费,不会消费历史信息
none
对于同一个消费者组,若没有提交过offset,会抛异常
enable.auto.commit
该属性指定了消费者是否自动提交偏移量,默认值是 true。为了尽量避免出现重复数据和数据丢失,可以把它设为 false,由自己控制何时提交偏移量
故障后数据一致性的恢复
LEO(Log End Offset):每个副本最后的一个offset
HW(High Watermark):高水位,指代消费者能见到的最大的offset,ISR队列中最小的LEO。
follower故障和leader故障
follower故障:follower发生故障后会被临时提出ISR,等待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步,等待该follower的LEO大于等于该partition的HW,即follower追上leader之后,就可以重新加入ISR了。
leader故障:leader发生故障之后,会从ISR中选出一个新的leader,为了保证多个副本之间的数据的一致性,其余的follower会先将各自的log文件高于HW的部分截掉,然后从新的leader中同步数据。
这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复
假设分区的副本为3,其中副本0是 Leader,副本1和副本2是 follower,并且在 ISR 列表里面。虽然副本0已经写入了 Message4,但是 Consumer 只能读取到 Message2。因为所有的 ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 读取,而 High Water Mark 取决于 ISR 列表里面偏移量最小的分区,对应于上图的副本2,这个很类似于木桶原理。
这样做的原因是还没有被足够多副本复制的消息被认为是“不安全”的,如果 Leader 发生崩溃,另一个副本成为新 Leader,那么这些消息很可能丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想,一个消费者从当前 Leader(副本0) 读取并处理了 Message4,这个时候 Leader 挂掉了,选举了副本1为新的 Leader,这时候另一个消费者再去从新的 Leader 读取消息,发现这个消息其实并不存在,这就导致了数据不一致性问题。
当然,引入了 High Water Mark 机制,会导致 Broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数 replica.lag.time.max.ms 参数配置,它指定了副本在复制消息时可被允许的最大延迟时间。
Rebalance 分区再均衡
这也是kafka里面非常重要的一个概念
首先 Rebalance 是一个操作 在以下情况下会触发Rebalance 操作:
组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了)
订阅主题数发生变更,如果你使用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance
订阅主题的分区数发生变更(broker宕机/分区增加)
kafka怎么监听是否需要重平衡?
通过心跳机制:Kafka 的心跳是 Kafka Consumer 和 Broker 之间的健康检查,只有当 Broker Coordinator 正常时,Consumer 才会发送心跳。
需要重平衡的原因:谁消费哪几个分区是消费者组消费者们一起协调指定的
什么是coordinator?
每个consumer group 都会选择一个broker作为自己的coordinator,他是负责监控整个消费组里的各个分区的心跳,以及判断是否宕机,管理位移offset,和开启rebalance的
coordinator存储了partition和消费者的对应关系,还有订阅的topic列表
offset是提交到特定的主题,存储到coordinator broker里
如何选择coordinator机器
首先对group id 进行hash,接着对__consumer_offsets
的分区数量进行取模,默认分区数量是50
重平衡流程
Rebalance过程
rebalance的前提是coordinator已经确定了。
总体而言,rebalance分为2步:Join和Sync
1 Join, 顾名思义就是加入组。这一步中,所有成员都向coordinator发送JoinGroup请求,请求入组。一旦所有成员都发送了JoinGroup请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader——注意leader和coordinator不是一个概念。leader负责消费分配方案的制定。
2 Sync,这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。
避免重平衡
要说完全避免重平衡,是不可能,因为你无法完全保证消费者不会故障。而消费者故障其实也是最常见的引发重平衡的地方,所以我们需要保证尽力避免消费者故障。
而其他几种触发重平衡的方式,增加分区,或是增加订阅的主题,抑或是增加消费者,更多的是主动控制。
一般会约定一个时间,超时即判定对方挂了。而在kafka消费者场景中,session.timout.ms参数就是规定这个超时时间是多少。
还有一个参数,heartbeat.interval.ms,这个参数控制发送心跳的频率,频率越高越不容易被误判,但也会消耗更多资源。
此外,还有最后一个参数,max.poll.interval.ms,消费者poll数据后,需要一些处理,再进行拉取。如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组。也就是说,拉取,然后处理,这个处理的时间不能超过 max.poll.interval.ms 这个参数的值。这个参数的默认值是5分钟,而如果消费者接收到数据后会执行耗时的操作,则应该将其设置得大一些。
三个参数,
session.timout.ms控制心跳超时时间,
heartbeat.interval.ms控制心跳发送频率,
max.poll.interval.ms控制poll的间隔。
这里有三种rebalance的策略:range、round-robin、sticky
为什么要避免重平衡
重平衡过程中,消费者无法从 kafka
消费消息,这对 kafka
的 TPS
影响极大,而如果 kafka
集群内节点较多,比如数百个,那重平衡可能会耗时极多。数分钟到数小时都有可能,而这段时间 kafka
基本处于不可用状态。所以在线上环境中,应该尽量避免重平衡发生。
常见问题
kafka和rocketmq的区别
mq选型:rocketMq和kafka对比,kafka由于重平衡机制可能相比于rcmq不够可靠,而且kafka不支持定时消息和事务消息。但是相对应的,由于producer每次是缓存批量发消息,所以比rcmq吞吐量要好一些
并发消费
Listener concurency=2
消息丢失
kafka在生产端发送消息 和 消费端消费消息 时都可能会丢失一些消息
①:生产者消息丢失
生产者在发送消息时,会有一个ack机制,当ack=0 或者 ack=1时,都可能会丢消息。如下所示:
acks=0
表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。大数据统计报表场景,对性能要求很高,对数据丢失不敏感的情况可以用这种。
acks=1
至少要等待leader已经成功将数据写入本地log返回ack,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
acks=-1或all
这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。当然如果min.insync.replicas配置的是1则也可能丢消息,跟acks=1情况类似。
②:消费端消息丢失
消费端丢消息最主要体现在消费端offset的自动提交,如果开启了自动提交,万一消费到数据还没处理完,此时你consumer直接宕机了,未处理完的数据 丢失了,下次也消费不到了,因为offset已经提交完毕,下次会从新offset处开始消费新消息。
解决办法是采用消费端的手动提交
//手动提交offset
/**
* 注意如果要使用手动提交offset,需要以下三点
* ①:配置文件配置手动提交方式
* ②:加上参数Acknowledgment ack
* ③:方法中使用ack.acknowledge();手动提交
*/
ack.acknowledge();
消息重复消费
消息的重复消费在生产端和消费端都可能发生,下面一一讲解:
生产端消息重复发送
发送消息如果配置了重试机制,比如由于网络波动,生产者未得到broker收到了消息的响应,就会触发重试机制,3秒后再次发送此消息。broker之前已经收到过这个消息,但生产者由于触发了重试机制,就导致了消息的重复发送。那么broker就会在磁盘缓存多条同样的消息,消费端从broker拉取消息时,就会造成重复消费。
解决:
注意:kafka新版本 在broker保证了接受消息的幂等性(解决了生产者消息重复发送问题),只需在生产者加上参数 props.put(“enable.idempotence”, true) 即可,默认是false不开启。
消费端消息重复消费
对于消费端消息的重复消费问题,如果消费端拉取了一部分数据,消费完毕后,准备执行手动提交(或自动提交)时,消费者挂掉了!此时offset还未提交呢,那么当服务重启时,还是会拉取相同的一批数据重复处理!造成消息重复消费
解决:
无论是生产者还是消费者的重复消息,一般都会在消费端卡死,做幂等性处理。
幂等性可以用redis的setnx分布式锁+业务逻辑去重 来实现。
顺序消息
kafka想要保证消息顺序,是需要牺牲一定性能的,方法就是一个消费者,消费一个分区,可以保证消费的顺序性。但也仅限于消费端消费消息的有序性,无法保证生产者发送消息有序。
比如:如果发送端配置了重试机制,kafka不会等之前那条消息完全发送成功才去发送下一条消息,这样可能会出现,发送了1,2,3条消息,第一条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了。发送端消息发送已经乱序,到了消费端消费时,自然无法保证顺序!
如果一定要保证生产-消费全链路消息有序,发送端需要同步发送,ack回调不能设置为0。且只能有一个分区,一个消费者进行消费,但这样明显有悖于kafka的高性能理论!
问题:如何在多个分区中保证消息顺序和消息处理效率呢?
首先使用多个分区,消息可以被发送端发送至多个分区,保证消息发送的效率。然后在消费端在拉消息时使用ConutdownLunch来记录一组有序消息的个数。如果达到个数,说明已拉取到完整的一组有序消息。然后在消费端根据消息序号进行排序,消费端将排好序的消息发到内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消息。即可最大程度上既保证顺序又保证效率!
消息积压
1.线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致broker积压大量未消费消息。
解决方案:此种情况如果积压了上百万未消费消息需要紧急处理,可以修改消费端程序,让其将收到的消息快速转发到其他topic(可以设置很多分区),然后再启动多个消费者同时消费新主题的不同分区。如图所示:
2.由于消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息。
解决方案:此种情况可以将这些消费不成功的消息转发到其它队列里去(类似死信队列),后面再慢慢分析死信队列里的消息处理问题。这个死信队列,kafka并没有提供,需要整合第三方插件!
Zkp
kafak的Zookeeper协调控制
管理broker与consumer的动态加入与离开
触发负载均衡,当broker或consumer加入或离开时会触发负载均衡算法,使得一个consumer group内的多个consumer的订阅负载均衡
维护消费关系及每个partition的消费信心
zookeeper上的细节:
每个broker启动会在zookeeper上注册一个临时的broker registry,包含broker的 ip地址 和 端口号 所存储的topics和partitions信息
每个consumer启动后会在zookeeper上注册一个临时额consumer registry,包含consumer所属的consumer group以及订阅的topics
每个consumer group会关联一个临时的 owner registry 和一个持久的 offset registry。对于订阅的每个 partition 包含一个 owner registry ,内容为订阅这个 partition 的 consumer id ,同事包含一个 offset registry,内容为上一次订阅的offset
Kafka中zookeeper的作用
Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线、所有topic的分区副本分配和leader的选举等工作。Controller的工作管理是依赖于zookeeper的。
Partition的Leader的选举过程
ps:
1.和rocketmq的多主从集群架构不同的是,partition虽然对标queue,也能够多主部署,replication虽然对标的rcmq的从,但是rcmq的多主从,从是无法切换成主的(rcmq可以使用Dledger集群 来达到主从切换的目的 https://blog.csdn.net/qq_27740127/article/details/133686238),kafka的follower是可以变成leader的
2.Controller:Kafka节点里面的一个主节点,借助zookeeper。
Controller作为Kafka集群中的核心组件,它的主要作用是在 Apache ZooKeeper 的帮助下管理和协调整个 Kafka 集群。 Controller与Zookeeper进行交互,获取与更新集群中的元数据信息。其他broker并不直接与zookeeper进行通信,而是与 Controller 进行通信并同步Controller中的元数据信息。 Kafka集群中每个节点都可以充当Controller节点,但集群中同时只能有一个Controller节点。
3.消息发送多次失败怎么办: 备用链路,将它写入redis或文件系统,或者丢弃
项目拉下来找不到依赖怎么办:
https://mvnrepository.com/artifact/com.alibaba/fastjson
去中心仓库找好使的
【Spring】@ConfigurationProperties
https://zhuanlan.zhihu.com/p/515704401
应用
Kafka 安装部署及使用(单节点/集群)_kafka 一个节点能用么-CSDN博客
Kafka强依赖ZK,如果想要使用Kafka,就必须安装ZK,Kafka中的消费偏置信息、kafka集群、topic信息会被存储在ZK中。有人可能会说我在使用Kafka的时候就没有安装ZK,那是因为Kafka内置了一个ZK,一般我们不使用它。
Kafka 单节点部署
docker学习
数据卷介绍
Docker将运用与运行的环境打包形成容器运行, Docker容器产生的数据,如果不通过docker commit生成新的镜像,使得数据做为镜像的一部分保存下来, 那么当容器删除后,数据自然也就没有了。 为了能保存数据在Docker中我们使用卷。|
卷就是目录或文件,存在于一个或多个容器中,由Docker挂载到容器,但卷不属于联合文件系统(Union FileSystem),因此能够绕过联合文件系统提供一些用于持续存储或共享数据的特性:。
卷的设计目的就是数据的持久化,完全独立于容器的生存周期,因此Docker不会在容器删除时删除其挂载的数据卷。
数据卷的特点:
数据卷可在容器之间共享或重用数据
卷中的更改可以直接生效
数据卷中的更改不会包含在镜像的更新中
数据卷的生命周期一直持续到没有容器使用它为止
什么是docker的link机制
同一个宿主机上的多个docker容器之间如果想进行通信,可以通过使用容器的ip地址来通信,也可以通过宿主机的ip加上容器暴露出的端口号来通信,前者会导致ip地址的硬编码,不方便迁移,并且容器重启后ip地址会改变,除非使用固定的ip,后者的通信方式比较单一,只能依靠监听在暴露出的端口的进程来进行有限的通信。通过docker的link机制可以通过一个name来和另一个容器通信,link机制方便了容器去发现其它的容器并且可以安全的传递一些连接信息给其它的容器。
Dockerfile和docker-compose
简而言之, Dockerfile 记录单个镜像的构建过程, docker-compse.yml 记录一个项目(project, 一般是多个镜像)的构建过程。
你说有些教程用了 dockerfile+docker-compose, 是因为 docker-compose.yml 本身没有镜像构建的信息,如果镜像是从 docker registry 拉取下来的,那么 Dockerfile 就不需要;如果镜像是需要 build 的,那就需要提供 Dockerfile.
docker-compose是编排容器的。例如,你有一个php镜像,一个mysql镜像,一个nginx镜像。如果没有docker-compose,那么每次启动的时候,你需要敲各个容器的启动参数,环境变量,容器命名,指定不同容器的链接参数等等一系列的操作,相当繁琐。而用了docker-composer之后,你就可以把这些命令一次性写在docker-composer.yml文件中,以后每次启动这一整个环境(含3个容器)的时候,你只要敲一个docker-composer up命令就ok了。
dockerfile的作用是从无到有的构建镜像。它包含安装运行所需的环境、程序代码等。这个创建过程就是使用 dockerfile 来完成的。Dockerfile - 为 docker build 命令准备的,用于建立一个独立的 image ,在 docker-compose 里也可以用来实时 build
docker-compose.yml - 为 docker-compose 准备的脚本,可以同时管理多个 container ,包括他们之间的关系、用官方 image 还是自己 build 、各种网络端口定义、储存空间定义等
done:docker安装部署
1.创建一个网络
不然不同容器间网络隔离无法相互访问
可以通过容器间共享网络network的方式,也可以通过link
2.安装zkp
Kafka依赖zookeeper所以先安装zookeeper
-p:设置映射端口(默认2181)
-d:后台启动
docker run -d --name zookeeper-server \
--network app-tier \
-p 2181:2181 \
-e ALLOW_ANONYMOUS_LOGIN=yes \
bitnami/zookeeper:latest
查看zookeeper容器日志(可省略)
docker logs -f zookeeper
3.安装kafka
安装并运行Kafka,
–name:容器名称
-p:设置映射端口(默认9092 )
-d:后台启动
ALLOW_PLAINTEXT_LISTENER任何人可以访问
KAFKA_CFG_ZOOKEEPER_CONNECT链接的zookeeper
KAFKA_CFG_ADVERTISED_LISTENERS当前主机IP或地址(重点:如果是服务器部署则配服务器IP或域名否则客户端监听消息会报地址错误)
docker run -d --name kafka-server0 \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e ALLOW_PLAINTEXT_LISTENER=yes \
-e KAFKA_CFG_ZOOKEEPER_CONNECT=43.139.143.108:2181 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://43.139.143.108:9092 \
bitnami/kafka:latest
docker run -d --name kafka-server1 \
-p 9093:9092 \
-e KAFKA_BROKER_ID=1 \
-e ALLOW_PLAINTEXT_LISTENER=yes \
-e KAFKA_CFG_ZOOKEEPER_CONNECT=43.139.143.108:2181 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://182.92.164.82:9092 \
bitnami/kafka:latest
docker run -d --name kafka-server2 \
-p 9094:9095 \
-e KAFKA_BROKER_ID=2 \
-e ALLOW_PLAINTEXT_LISTENER=yes \
-e KAFKA_CFG_ZOOKEEPER_CONNECT=43.139.143.108:2181 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://182.92.164.82:9095\
bitnami/kafka:latest
Ps:集群部署,第一台服务器部署了一个kafka broker0 和一个zkp还有一个gui,第二个部署了2个broker 1,2
docker logs -f kafka-server
4.GUI
docker run --name kafka-manager -d \
--network app-tier \
-p 9000:9000 \
-e ZK_HOSTS="43.139.143.108:2181" \
sheepkiller/kafka-manager
我们使用/kafka-manager,可以用他创建topic,broker
如图,先创建集群,绑定 zkp
创建完成后会显示绑定该zkp的broker
让我们创建一个topic,分区数和副本数不能超过broker的数量
可以看到新创建的topic,分区0的leader是broker0,分区1的leader是broker1
springboot整合
SpringBoot——集成Kafka详解_springboot kafka-CSDN博客
配置方式有两种:config或者yml
sprinboot链接多个kafka
springboot配置多kafka_springboot配置多个kafka-CSDN博客
kafka事务
Kafka使用事务的两种方式
配置Kafka事务管理器并使用@Transactional注解(使用注解方式开启事务还是比较方便的,不过首先需要我们配置KafkaTransactionManager,这个类就是Kafka提供给我们的事务管理类....Spring-Kafka(五)—— 使用Kafka事务的两种方式 略过)
使用KafkaTemplate的executeInTransaction方法
1.配置yml
producer: transaction-id-prefix: tx_ 事务id前缀
acks: -1 #ack必须设置为all
2.使用KafkaTemplate的executeInTransaction
//声明事务:后面报错消息不会发出去
kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, Object, Object>() {
@Override
public Object doInOperations(KafkaOperations<String, Object> operations) {
operations.send("abc", message + " test executeInTransaction");
throw new RuntimeException("fail");
}
});
生产者
带回调的消息发送
public void sendCallbackOneMessage(@PathVariable("message") String message) {
System.out.println(message);
kafkaTemplate.send("abc",1,"key1", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("发送消息失败2:"+throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送消息成功2:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}
消费者
基础版本,
ConsumerRecord一般都是<String,Object> ,String是key,Object是发送的消息内容
KafkaTemplate<String, Object>也是一样
@KafkaListener(topics = "abc", groupId = "${spring.kafka.consumer.group-id}")
public void topic_test1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
System.out.println(record.value()+"-------");
System.out.println(record);
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("topic.test3 消费了: Topic:" + topic + ",Message:" + msg);
ack.acknowledge();
}
}
SpringBoot——集成Kafka详解_springboot kafka-CSDN博客
批量消费
自动提交,批量消费
yml需要设置自动提交,以及每次最多消费条数,设置监听器并发数,一般等于分区数就行,可以并发提高消费速度。 type设置为batch
consumer:
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: true
max-poll-records: 50 #每次最多消费多少条消息
listener:
concurrency: 2 # 设置并发数
type: batch #设置批量消费
//批量消费
@KafkaListener(topics = {"abc"}, groupId = "${spring.kafka.consumer.group-id}")
public void onBatchMessage(List<String> messages) {
System.out.println(">>> 批量消费一次,recoreds.size()=" + messages.size());
}
@GetMapping("/kafka/callbackOne/{message}")
public void sendCallbackOneMessage(@PathVariable("message") String message) {
System.out.println(message);
for (int i=0;i<3;i++){
kafkaTemplate.send("abc",1,"key1", message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("发送消息失败2:"+throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送消息成功2:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}
}
ps:批量消费是只有生产者推消息速度很快时才能批量消费