kafka基础知识
1、kafka概念
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
2、kafka特性
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
3、kafka应用场景
- 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
- 流式处理:比如spark streaming和storm
- 事件源
4、kafka的基础架构
- Producer:Producer即生产者,消息的产生者,是消息的入口。
- Broker:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如图中的broker-0、broker-1等……
- Topic:消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
- Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!
- Replication: 每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
- Message:每一条发送的消息主体。
- Consumer:消费者,即消息的消费方,是消息的出口。
- ConsumerGroup:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量!
- Zookeeper:kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。
5、工作流程
1)、发送数据
我们看上一点中的架构图,producer就是生产者,是数据的入口。注意看图中的红色箭头,Producer在写入数据的时候永远的找leader,不会直接将数据写入follower!那leader怎么找呢?写入的流程又是什么样的呢?我们看下图:
发送的流程就在图中已经说明了,就不单独在文字列出来了!需要注意的一点是,消息写入leader后,follower是主动的去leader进行同步的!producer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证同一分区内的数据是有序的!写入示意图如下:
上面说到数据会写入到不同的分区,那kafka为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:
- 方便扩展:因为一个topic可以有多个partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。
- 提高并发:以partition为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。
熟悉负载均衡的朋友应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,那在kafka中,如果某个topic有多个partition,producer又怎么知道该将数据发往哪个partition呢?kafka中有几个原则:
partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
如果既没指定partition,又没有设置key,则会轮询选出一个partition。
保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,怎么保证消息不丢失呢?其实上面的写入流程图中有描述出来,那就是通过ACK应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为0、1、all。
0 代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
1 代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。
all 代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。
最后要注意的是,如果往不存在的topic写数据,能不能写入成功呢?kafka会自动创建topic,分区和副本的数量根据默认配置都是1。
2)、保存数据
Producer将数据写入kafka后,集群就需要对数据进行保存了!kafka将数据保存在磁盘,可能在我们的一般的认知里,写入磁盘是比较耗时的操作,不适合这种高并发的组件。Kafka初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。
(1)Partition 结构
前面说过了每个topic都可以分为一个或多个partition,如果你觉得topic比较抽象,那partition就是比较具体的东西了!Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。
如上图,这个partition有三组segment文件,每个log文件的大小是一样的,但是存储的message数量是不一定相等的(每条的message大小不一致)。文件的命名是以该segment最小offset来命名的,如000.index存储offset为0~368795的消息,kafka就是利用分段+索引的方式来解决查找效率的问题。
(2)Message结构
上面说到log文件就实际是存储message的地方,我们在producer往kafka写入的也是一条一条的message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型……等等!我们重点需要知道的是下面三个:
- offset:offset是一个占8byte的有序id号,它可以唯一确定每条消息在parition内的位置!
- 消息大小:消息大小占用4byte,用于描述消息的大小。
- 消息体:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。
(3)存储策略
无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢?
- 基于时间,默认配置是168小时(7天)。
- 基于大小,默认配置是1073741824。
需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能!
3)、消费数据
消息存储在log文件后,消费者就可以进行消费了。在讲消息队列通信的两种模式的时候讲到过点对点模式和发布订阅模式。Kafka采用的是发布订阅模式,消费者主动的去kafka集群拉取消息,与producer相同的是,消费者在拉取消息的时候也是找leader去拉取。
多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id!同一个消费者组的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!!!我们看下图:
图示是消费者组内的消费者小于partition数量的情况,所以会出现某个消费者消费多个partition数据的情况,消费的速度也就不及只处理一个partition的消费者的处理速度!如果是消费者组的消费者多于partition的数量,那会不会出现多个消费者消费同一个partition的数据呢?上面已经提到过不会出现这种情况!多出来的消费者不消费任何partition的数据。所以在实际的应用中,建议消费者组的consumer的数量与partition的数量一致!
在保存数据的小节里面,我们聊到了partition划分为多组segment,每个segment又包含.log、.index、.timeindex文件,存放的每条message包含offset、消息大小、消息体……我们多次提到segment和offset,查找消息的时候是怎么利用segment+offset配合查找的呢?假如现在需要查找一个offset为368801的message是什么样的过程呢?我们先看看下面的图:
- 先找到offset的368801message所在的segment文件(利用二分法查找),这里找到的就是在第二个segment文件。
- 打开找到的segment中的.index文件(也就是368796.index文件,该文件起始偏移量为368796+1,我们要查找的offset为368801的message在该index内的偏移量为368796+5=368801,所以这里要查找的相对offset为5)。由于该文件采用的是稀疏索引的方式存储着相对offset及对应message物理偏移量的关系,所以直接找相对offset为5的索引找不到,这里同样利用二分法查找相对offset小于或者等于指定的相对offset的索引条目中最大的那个相对offset,所以找到的是相对offset为4的这个索引。
- 根据找到的相对offset为4的索引确定message存储的物理偏移位置为256。打开数据文件,从位置为256的那个地方开始顺序扫描直到找到offset为368801的那条Message。
这套机制是建立在offset为有序的基础上,利用segment+有序offset+稀疏索引+二分查找+顺序查找等多种手段来高效的查找数据!至此,消费者就能拿到需要处理的数据进行处理了。那每个消费者又是怎么记录自己消费的位置呢?在早期的版本中,消费者将消费到的offset维护zookeeper中,consumer每间隔一段时间上报一次,这里容易导致重复消费,且性能不好!在新的版本中消费者消费到的offset已经直接维护在kafk集群的__consumer_offsets这个topic中!
6、重要的设计思想
Consumergroup:各个consumer可以组成一个组,每个消息只能被组中的一个consumer消费,如果一个消息可以被多个consumer消费的话,那么这些consumer必须在不同的组。
消息状态:在Kafka中,消息的状态被保存在consumer中,broker不会关心哪个消息被消费了被谁消费了,只记录一个offset值(指向partition中下一个要被消费的消息位置),这就意味着如果consumer处理不好的话,broker上的一个消息可能会被消费多次。
消息持久化:Kafka中会把消息持久化到本地文件系统中,并且保持极高的效率。
消息有效期:Kafka会长久保留其中的消息,以便consumer可以多次消费,当然其中很多细节是可配置的。
批量发送:Kafka支持以消息集合为单位进行批量发送,以提高push效率。
push-and-pull: Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,两者对消息的生产和消费是异步的。
Kafka集群中broker之间的关系:不是主从关系,各个broker在集群中地位一样,我们可以随意的增加或删除任何一个broker节点。
负载均衡方面: Kafka提供了一个 metadata API来管理broker之间的负载(对Kafka0.8.x而言,对于0.7.x主要靠zookeeper来实现负载均衡)。
同步异步:Producer采用异步push方式,极大提高Kafka系统的吞吐率(可以通过参数控制是采用同步还是异步方式)。
分区机制partition:Kafka的broker端支持消息分区,Producer可以决定把消息发到哪个分区,在一个分区中消息的顺序就是Producer发送消息的顺序,一个主题中可以有多个分区,具体分区的数量是可配置的。分区的意义很重大,后面的内容会逐渐体现。
离线数据装载:Kafka由于对可拓展的数据持久化的支持,它也非常适合向Hadoop或者数据仓库中进行数据装载。
插件支持:现在不少活跃的社区已经开发出不少插件来拓展Kafka的功能,如用来配合Storm、Hadoop、flume相关的插件。
7、通讯模式
1、点对点模式
点对点模式通常是基于拉取或者轮询的消息传送模型,这个模型的特点是发送到队列的消息被一个且只有一个消费者进行处理。生产者将消息放入消息队列后,由消费者主动的去拉取消息进行消费。点对点模型的的优点是消费者拉取消息的频率可以由自己控制。但是消息队列是否有消息需要消费,在消费者端无法感知,所以在消费者端需要额外的线程去监控。
2、发布订阅模式
发布订阅模式是一个基于消息送的消息传送模型,改模型可以有多种不同的订阅者。生产者将消息放入消息队列后,队列会将消息推送给订阅过该类消息的消费者(类似微信公众号)。由于是消费者被动接收推送,所以无需感知消息队列是否有待消费的消息!但是consumer1、consumer2、consumer3由于机器性能不一样,所以处理消息的能力也会不一样,但消息队列却无法感知消费者消费的速度!所以推送的速度成了发布订阅模模式的一个问题!假设三个消费者处理速度分别是8M/s、5M/s、2M/s,如果队列推送的速度为5M/s,则consumer3无法承受!如果队列推送的速度为2M/s,则consumer1、consumer2会出现资源的极大浪费!
8、Topics和日志
让我们首先深入了解下Kafka的核心概念:提供一串流式的记录— topic 。
Topic 就是数据主题,是数据记录发布的地方,可以用来区分业务系统。Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。
对于每一个topic, Kafka集群都会维持一个分区日志
每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的commit log文件。分区中的每一个记录都会分配一个id号来表示顺序,我们称之为offset,offset用来唯一的标识分区中每一条记录。
Kafka 集群保留所有发布的记录—无论他们是否已被消费—并通过一个可配置的参数——保留期限来控制. 举个例子, 如果保留策略设置为2天,一条记录发布后两天内,可以随时被消费,两天过后这条记录会被抛弃并释放磁盘空间。Kafka的性能和数据大小无关,所以长时间存储数据没有什么问题.
事实上,在每一个消费者中唯一保存的元数据是offset(偏移量)即消费在log中的位置.偏移量由消费者所控制:通常在读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。例如,一个消费者可以重置到一个旧的偏移量,从而重新处理过去的数据;也可以跳过最近的记录,从”现在”开始消费。
这些细节说明Kafka 消费者是非常廉价的—消费者的增加和减少,对集群或者其他消费者没有多大的影响。比如,你可以使用命令行工具,对一些topic内容执行 tail操作,并不会影响已存在的消费者消费数据。
日志中的 partition(分区)有以下几个用途。第一,当日志大小超过了单台服务器的限制,允许日志进行扩展。每个单独的分区都必须受限于主机的文件限制,不过一个主题可能有多个分区,因此可以处理无限量的数据。第二,可以作为并行的单元集
核心原理
1、消费者组
消费组指的是多个消费者(consumer)组成起来的一个组,它们共同消费 topic 的所有消息,并且一个 topic 的一个 partition 只能被一个 consumer 消费。
Kafka 为消费者组定义了 5 种状态,它们分别是:Empty、Dead、PreparingRebalance、CompletingRebalance 和 Stable。
了解了这些状态的含义之后,我们来看一张图片,它展示了状态机的各个状态流转。
一个消费者组最开始是 Empty 状态,当重平衡过程开启后,它会被置于 PreparingRebalance 状态等待成员加入,之后变更到 CompletingRebalance 状态等待分配方案,最后流转到 Stable 状态完成重平衡。
当有新成员加入或已有成员退出时,消费者组的状态从 Stable 直接跳到 PreparingRebalance 状态,此时,所有现存成员就必须重新申请加入组。当所有成员都退出组后,消费者组状态变更为 Empty。Kafka 定期自动删除过期位移的条件就是,组要处于 Empty 状态。因此,如果你的消费者组停掉了很长时间(超过 7 天),那么 Kafka 很可能就把该组的位移数据删除了。我相信,你在 Kafka 的日志中一定经常看到下面这个输出:
1 | Removed ✘✘✘ expired offsets in ✘✘✘ milliseconds. |
这就是 Kafka 在尝试定期删除过期位移。现在你知道了,只有 Empty 状态下的组,才会执行过期位移删除的操作。
2、rebalance
什么是rebalance?
我们都知道 kafka 主要可以分为三大块:生产者、kafka broker、消费者。
而 kafka 怎么均匀地分配某个 topic 下的所有 partition 到各个消费者,从而使得消息的消费速度达到最快,这就是平衡(balance)。而 rebalance(重平衡)其实就是重新进行 partition 的分配,从而使得 partition 的分配重新达到平衡状态。
rebalance的流程
重平衡的完整流程需要消费者端和协调者组件共同参与才能完成。我们先从消费者的视角来审视一下重平衡的流程。
在消费者端,重平衡分为两个步骤:分别是加入组和等待领导消费者(Leader Consumer)分配方案。这两个步骤分别对应两类特定的请求:JoinGroup 请求和 SyncGroup 请求。
JoinGroup请求
当组内成员加入组时,它会向协调者发送 JoinGroup 请求。在该请求中,每个成员都要将自己订阅的主题上报,这样协调者就能收集到所有成员的订阅信息。一旦收集了全部成员的 JoinGroup 请求后,协调者会从这些成员中选择一个担任这个消费者组的领导者。
通常情况下,第一个发送 JoinGroup 请求的成员自动成为领导者。你一定要注意区分这里的领导者和之前我们介绍的领导者副本,它们不是一个概念。这里的领导者是具体的消费者实例,它既不是副本,也不是协调者。这里的领导者指的是消费组(consumer group)的领导者,消费组领导者的任务是收集所有成员的订阅信息,然后根据这些信息,制定具体的分区消费分配方案。
选出领导者之后,协调者会把消费者组订阅信息封装进 JoinGroup 请求的响应体中,然后发给领导者,由领导者统一做出分配方案后,进入到下一步:发送 SyncGroup 请求。
SyncGroup请求
在这一步中,领导者向协调者发送 SyncGroup 请求,将刚刚做出的分配方案发给协调者。值得注意的是,其他成员也会向协调者发送 SyncGroup 请求,只不过请求体中并没有实际的内容。这一步的主要目的是让协调者接收分配方案,然后统一以 SyncGroup 响应的方式分发给所有成员,这样组内所有成员就都知道自己该消费哪些分区了。
接下来,我用一张图来形象地说明一下 JoinGroup 请求的处理过程。
就像前面说的,JoinGroup 请求的主要作用是将组成员订阅信息发送给领导者消费者,待领导者制定好分配方案后,重平衡流程进入到 SyncGroup 请求阶段。
下面这张图描述的是 SyncGroup 请求的处理流程。
SyncGroup 请求的主要目的,就是让协调者把领导者制定的分配方案下发给各个组内成员。当所有成员都成功接收到分配方案后,消费者组进入到 Stable 状态,即开始正常的消费工作。
什么时候会发生rebalance?
前面我们已经说到,rebalance 其实就是对 partition 进行重新分配。那么什么时候会发生 rebalance 呢?其实在以下三种情况下,会触发 rebalance:
- 订阅 Topic 的分区数发生变化。
- 订阅的 Topic 个数发生变化。
- 消费组内成员个数发生变化。例如有新的 consumer 实例加入该消费组或者离开组。
订阅Topic的分区数发生变化
简单地说,就是之前 topic 有 10 个分区,现在变成了 20 个,那么多出来的 10 个分区的数据就没人消费了。那么此时就需要进行重平衡,将新增的 10 个分区分给消费组内的消费者进行消费。所以在这个情况下,会发生重平衡。
订阅的Topic个数发生变化
简单地说,一个 consumer group 如果之前只订阅了 A topic,那么其组内的 consumer 知会消费 A topic 的消息。而如果现在新增订阅了 B topic,那么 kafka 就需要把 B topic 的 partition 分配给组内的 consumer 进行消费。这个分配的过程,其实也是一个 rebalance 的过程。
消费组内成员个数发生变化
我们都知道 kafka 中是以消费组(consumer group)的方式进行消费的,消费组内的消费者共同消费一个 topic 下的消息。而当消费组内成员个数发生变化,例如某个 consumer 离开,或者新 consumer 加入,都会导致消费组内成员个数发生变化,从而导致重平衡。
相比起之前的两个情况,这种情况在实际情况中更加常见。因为订阅分区数、以及订阅 topic 数都是我们主动改变才会发生,而组内消费组成员个数发生变化,则是更加随机的。
下面我们一起分析一下「消费组内成员个数发生变化」的几种情况:
- 新成员加入
- 组成员主动离开
- 组成员崩溃
新成员加入
新成员入组是指组处于 Stable 状态后,有新成员加入。如果是全新启动一个消费者组,Kafka 是有一些自己的小优化的,流程上会有些许的不同。我们这里讨论的是,组稳定了之后有新成员加入的情形。
当协调者收到新的 JoinGroup 请求后,它会通过心跳请求响应的方式通知组内现有的所有成员,强制它们开启新一轮的重平衡。具体的过程和之前的客户端重平衡流程是一样的。现在,我用一张时序图来说明协调者一端是如何处理新成员入组的。
组成员主动离开
何谓主动离组?就是指消费者实例所在线程或进程调用 close() 方法主动通知协调者它要退出。这个场景就涉及到了第三类请求:LeaveGroup 请求。协调者收到 LeaveGroup 请求后,依然会以心跳响应的方式通知其他成员,因此我就不再赘述了,还是直接用一张图来说明。
组成员崩溃
崩溃离组是指消费者实例出现严重故障,突然宕机导致的离组。它和主动离组是有区别的,因为后者是主动发起的离组,协调者能马上感知并处理。但崩溃离组是被动的,协调者通常需要等待一段时间才能感知到,这段时间一般是由消费者端参数 session.timeout.ms 控制的。
也就是说,Kafka 一般不会超过 session.timeout.ms 就能感知到这个崩溃。当然,后面处理崩溃离组的流程与之前是一样的,我们来看看下面这张图。
疑惑
在许多文章中,它们会加多了一个 rebalance 场景,即:「重平衡时协调者对组内成员提交位移的处理」。其实这个要说是 rebalance 场景,有点牵强。我们先来了解下这个场景究竟是什么情况。
正常情况下,每个组内成员都会定期汇报位移给协调者。当重平衡开启时,协调者会给予成员一段缓冲时间,要求每个成员必须在这段时间内快速地上报自己的位移信息,然后再开启正常的 JoinGroup/SyncGroup 请求发送。还是老办法,我们使用一张图来说明。
所以这种场景是指 rebalance 发生之时,留有时间给消费者提交 offset,并不是引起 rebalance 的触发原因(并不是因为提交 offset 引发 rebalance)。因此在我这篇文章里,我并没有将其作为 rebalance 的一种场景。
rebalance问题处理思路
前面我们讲过 rebalance 一般会有 3 种情况,分别是:
- 新成员加入
- 组成员主动离开
- 组成员崩溃
对于「新成员加入」、「组成员主动离开」都是我们主动触发的,能比较好地控制。但是「组成员崩溃」则是我们预料不到的,遇到问题的时候也比较不好排查。但对于「组成员崩溃」也是有一些通用的排查思路的,下面我们就来聊聊「rebalance问题的处理思路」。
要学会处理 rebalance 问题,我们需要先搞清楚 kafaka 消费者配置的四个参数:
- session.timeout.ms 设置了超时时间
- heartbeat.interval.ms 心跳时间间隔
- max.poll.interval.ms 每次消费的处理时间
- max.poll.records 每次消费的消息数
session.timeout.ms 表示 consumer 向 broker 发送心跳的超时时间。例如 session.timeout.ms = 180000 表示在最长 180 秒内 broker 没收到 consumer 的心跳,那么 broker 就认为该 consumer 死亡了,会启动 rebalance。
heartbeat.interval.ms 表示 consumer 每次向 broker 发送心跳的时间间隔。heartbeat.interval.ms = 60000 表示 consumer 每 60 秒向 broker 发送一次心跳。一般来说,session.timeout.ms 的值是 heartbeat.interval.ms 值的 3 倍以上。
max.poll.interval.ms 表示 consumer 每两次 poll 消息的时间间隔。简单地说,其实就是 consumer 每次消费消息的时长。如果消息处理的逻辑很重,那么时长就要相应延长。否则如果时间到了 consumer 还么消费完,broker 会默认认为 consumer 死了,发起 rebalance。
max.poll.records 表示每次消费的时候,获取多少条消息。获取的消息条数越多,需要处理的时间越长。所以每次拉取的消息数不能太多,需要保证在 max.poll.interval.ms 设置的时间内能消费完,否则会发生 rebalance。
简单来说,会导致崩溃的几个点是:
- 消费者心跳超时,导致 rebalance。
- 消费者处理时间过长,导致 rebalance。
消费者心跳超时
我们知道消费者是通过心跳和协调者保持通讯的,如果协调者收不到心跳,那么协调者会认为这个消费者死亡了,从而发起 rebalance。
而 kafka 的消费者参数设置中,跟心跳相关的两个参数为:
- session.timeout.ms 设置了超时时间
- heartbeat.interval.ms 心跳时间间隔
这时候需要调整 session.timeout.ms 和 heartbeat.interval.ms 参数,使得消费者与协调者能保持心跳。一般来说,超时时间应该是心跳间隔的 3 倍时间。即 session.timeout.ms 如果设置为 180 秒,那么 heartbeat.interval.ms 最多设置为 60 秒。
为什么要这么设置超时时间应该是心跳间隔的 3 倍时间?因为这样的话,在一个超时周期内就可以有多次心跳,避免网络问题导致偶发失败。
消费者处理时间过长
如果消费者处理时间过长,那么同样会导致协调者认为该 consumer 死亡了,从而发起重平衡。
而 kafka 的消费者参数设置中,跟消费处理的两个参数为:
- max.poll.interval.ms 每次消费的处理时间
- max.poll.records 每次消费的消息数
对于这种情况,一般来说就是增加消费者处理的时间(即提高 max.poll.interval.ms 的值),减少每次处理的消息数(即减少 max.poll.records 的值)。
除此之外,超时时间参数(session.timeout.ms)与 消费者每次处理的时间(max.poll.interval.ms)也是有关联的。max.poll.interval.ms 时间不能超过 session.timeout.ms 时间。 因为在 kafka 消费者的实现中,其是单线程去消费消息和执行心跳的,如果线程卡在处理消息,那么这时候即使到时间要心跳了,还是没有线程可以去执行心跳操作。很多同学在处理问题的时候,明明设置了很长的 session.timeout.ms 时间,但最终还是心跳超时了,就是因为没有处理好这两个参数的关联。
对于 rebalance 类问题,简单总结就是:处理好心跳超时问题和消费处理超时问题。
- 对于心跳超时问题。一般是调高心跳超时时间(session.timeout.ms),调整超时时间(session.timeout.ms)和心跳间隔时间(heartbeat.interval.ms)的比例。阿里云官方文档建议超时时间(session.timeout.ms)设置成 25s,最长不超过 30s。那么心跳间隔时间(heartbeat.interval.ms)就不超过 10s。
- 对于消费处理超时问题。一般是增加消费者处理的时间(max.poll.interval.ms),减少每次处理的消息数(max.poll.records)。阿里云官方文档建议 max.poll.records 参数要远小于当前消费组的消费能力(records < 单个线程每秒消费的条数 x 消费线程的个数 x session.timeout的秒数)。
3、分区状态机
- NonExistentPartition :分区在将要被创建之前的初始状态是这个,表示不存在
- NewPartition: 表示正在创建新的分区, 是一个中间状态, 这个时候只是在Controller的内存中存了状态信息
- OnlinePartition: 在线状态, 正常的分区就应该是这种状态,只有在线的分区才能够提供服务
- OfflinePartition: 下线状态, 分区可能因为Broker宕机或者删除Topic等原因流转到这个状态, 下线了就不能提供服务了
- NonExistentPartition: 分区不存在的状态, 当Topic删除完成成功之后, 就会流转到这个状态, 当还处在删除中的时候,还是停留在下线状态。
4、Leader选举流程和选举策略
1、执行选举流程
判断分区状态是否变更为OnlinePartition,其他状态退出流程
判断上一个状态是否为NewPartition
- 如果是NewPartition则初始化分区,然后将第一个活动副本指定为领导,将所有活动副本分区指定为ISR,将leader和ISR写入该分区,向每个实时副本发送LeaderAndIsr请求,向每个实时代理发送UpdateMetadata请求
如果是OnlinePartition、OffinePartition
开始选举流程,从zk获取节点数据,节点为:
/broker/topics/{topic名称}/partitions/{分区号}/state
主要获取当前分区Leader
和ISR
和controller_epoch
1
2[zk: localhost:2181(CONNECTED) 1] get /brokers/topics/test/partitions/0/state
{"controller_epoch":2,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}过滤异常数据,读取zk失败,zk数据不存在,拿到的数据controller_epoch > 当前controller的epoch
根据策略选出Leader和ISR信息
如果没有选出则退出流程
重新写入zk信息:
/broker/topics/{topic名称}/partitions/{分区号}/state
更新controller内存中的Leader和ISR信息
向相关Broker发起
LeaderAndIsrRequst
,结束流程
2、leader选举策略和场景
2.1、OfflinePartitionLeaderElectionStrategy
遍历分区的AR, 找到第一个满足以下条件的副本:
- 副本在线
- 在ISR中。
如果找不到满足条件的副本,那么再根据 传入的参数allowUnclean判断
- allowUnclean=true:AR顺序中所有在线副本中的第一个副本。
- allowUnclean=false: 需要去查询配置
unclean.leader.election.enable
的值。
若=true ,则跟上面 1一样 。
若=false,直接返回None,没有找到合适的Leader。
1 | 触发场景: |
源码位置:
Election#leaderForOffline
1 | case OfflinePartitionLeaderElectionStrategy(allowUnclean) => |
2.1.1、触发场景:Controller 重新加载
Controller 当选的时候会启动 分区状态机
partitionStateMachine
, 启动的时候会重新加载所有分区的状态到内存中, 并触发 对处于 NewPartition 或 OfflinePartition 状态的所有分区尝试变更为 OnlinePartition 状态的状态。把新创建的分区和离线的分区触发一下选举流程。
触发源码入口:
KafkaController#onControllerFailover
1 | partitionStateMachine.startup() |
2.1.2、触发场景:脚本执行脏选举
当执行
kafka-leader-election.sh
的时候并且模式选择的是UNCLEAN
. 则会触发这个模式。 这里注意一下,入参allowUnclean
= (electionTrigger == AdminClientTriggered) 意思是: 当触发的场景是AdminClientTriggered的时候, 则allowUnclean=true
,表示 不关心配置参数unclean.leader.election.enable
是什么, 如果没有找到符合条件的Leader, 则就去非ISR 列表找Leader。 刚好我能脚本执行的时候 触发器就是 AdminClientTriggered 其他触发器有:AutoTriggered : 定时自动触发。
ZkTriggered:Controller切换的时候触发的(zk节点/controller 的变更便是Controller角色的切换)
AdminClientTriggered:客户端主动触发。
2.1.3、触发场景:Controller 监听到有Broker启动了
同上。
触发源码入口:
KafkaController#processBrokerChange#onBrokerStartup
1 | partitionStateMachine.triggerOnlinePartitionStateChange() |
2.1.4、触发场景:Controller 监听 LeaderAndIsrResponseReceived请求
同上。
当Controller向对应的Broker发起 LeaderAndIsrRequest 请求的时候.
有一个回调函数callback, 这个回调函数会向Controller发起一个事件为 LeaderAndIsrResponseReceived 请求。
具体源码在:
ControllerChannelManager#sendLeaderAndIsrRequest
Controller收到这个事件的请求之后,根据返回的 leaderAndIsrResponse 数据
会判断一下有没有新增加的离线副本(一般都是由于磁盘访问有问题)
如果有新的离线副本,则需要将这个离线副本标记为Offline状态
源码入口:
KafkaController#onReplicasBecomeOffline
1 | partitionStateMachine.triggerOnlinePartitionStateChange() |
2.1.5、触发场景:Controller 监听 UncleanLeaderElectionEnable请求
当我们在修改动态配置的时候, 将动态配置:
unclean.leader.election.enable
设置为 true 的时候
会触发向Controller发起UncleanLeaderElectionEnable的请求,这个时候则需要触发一下。触发请求同上。
触发源码入口:
KafkaController#processTopicUncleanLeaderElectionEnable
1 | partitionStateMachine.triggerOnlinePartitionStateChange(topic) |
上面的触发调用的代码就是下面的接口
对处于 NewPartition 或 OfflinePartition 状态的所有分区尝试变更为 OnlinePartition 的状态。 状态的流程触发了Leader选举。
1 | /** |
- 获取所有 OfflinePartition 、NewPartition 的分区状态
- 尝试将 所有 NewPartition or OfflinePartition 状态的分区全部转别成 OnlinePartition状态, 但是如果对应的Topic正在删除中,则会被排除掉
- 分区状态机进行状态流转 使用 OfflinePartitionLeaderElectionStrategy 选举策略(
allowUnclean=true
表示如果从isr中没有选出leader,则允许从非isr列表中选举leader ,allowUnclean=false
表示如果从isr中没有选出leader, 则需要去读取配置文件的配置unclean.leader.election.enable
来决定是否允许从非ISR列表中选举Leader。 )
2.2、ReassignPartitionLeaderElectionStrategy
分区副本重分配选举策略: 当执行分区副本重分配的时候, 原来的Leader可能有变更, 则需要触发一下 Leader选举。
- 只有当之前的Leader副本在经过重分配之后不存在了。 例如: [2,0] ==> [1,0] 。 原来2是Leader副本,经过重分配之后变成了 [1,0]。2已经不复存在了,所以需要重新选举Leader。
- 当原来的分区Leader副本 因为某些异常,下线了。需要重新选举Leader
1 | 触发场景: |
分区副本重分配发生的Leader选举.
Election#leaderForReassign
1 | private def leaderForReassign(partition: TopicPartition, |
总结:从当前的副本分配列表中,获取副本在线&&副本在ISR中的 第一个副本,遍历的顺序是当前副本的分配方式(AR),跟ISR的顺序没有什么关系。
2.2.1、触发场景:分区副本重分配
并不是每次执行分区副本重分配都会触发这个Leader选举策略, 下面两种情况才会触发
- 只有当之前的Leader副本在经过重分配之后不存在了。例如: [2,0] ==> [1,0] 。 原来2是Leader副本,经过重分配之后变成了 [1,0]。2已经不复存在了,所以需要重新选举Leader。
- 当原来的分区Leader副本 因为某些异常,下线了。需要重新选举Leader
对应的判断条件代码如下:
KafkaController#moveReassignedPartitionLeaderIfRequired
1 | private def moveReassignedPartitionLeaderIfRequired(topicPartition: TopicPartition, |
2.3、PreferredReplicaPartitionLeaderElectionStrategy
优先副本选举策略, 必须满足三个条件:
是第一个副本&&副本在线&&副本在ISR列表中。
满足上面三个条件才会当选leader,不满足则不会做变更。
1 | 触发场景: |
1 | def leaderForPreferredReplica(controllerContext: ControllerContext, |
- 从内存中获取TopicPartition的分配方式
- 过滤不在线的副本
- 找到第一个副本判断一下是否在线&&在ISR列表中。如果满足,则选他为leader,如果不满足,也不会再找其他副本了。
- 返回leaderAndIsr信息, 这里的ISR是没有做修改的。
2.3.1、触发场景:自动定时执行优先副本选举任务
Controller 启动的时候,会启动一个定时任务 。每隔一段时间就去执行 优先副本选举任务。
与之相关配置:
1 | # 如果为true表示会创建定时任务去执行 优先副本选举,为false则不会创建 |
2.3.2、触发场景: Controller 重新加载的时候
在这个触发之前还有执行
partitionStateMachine.startup()
相当于是先把 OfflinePartition、NewPartition状态的分区执行了OfflinePartitionLeaderElectionStrategy 策略。
然后又执行了
PreferredReplicaPartitionLeaderElectionStrategy策略 这里是从zk节点/admin/preferred_replica_election
读取数据, 来进行判断是否有需要执行Leader选举的分区
它是在执行kafka-preferred-replica-election
命令的时候会创建这个zk节点
但是这个已经被标记为废弃了,并且在3.0的时候直接移除了。
源码位置:
KafkaController#onControllerFailover
1 | // 从zk节点/admin/preferred_replica_election找到哪些符合条件需要执行优先副本选举的分区 |
2.3.3、触发场景:执行优先副本选举脚本的时候
执行脚本
kafka-leader-election.sh
并且选择的模式是PREFERRED
(优先副本选举) 则会选择 PreferredReplicaPartitionLeaderElectionStrategy 策略选举
2.4、ControlledShutdownPartitionLeaderElectionStrategy
受控关机选举策略 :
当Broker关机的过程中,会向Controller发起一个请求, 让它重新发起一次选举, 把在所有正在关机(也就是发起请求的那个Broker,或其它同时正在关机的Broker) 的Broker里面的副本给剔除掉。
根据算法算出leader:找到第一个满足条件的副本:
副本在线 && 副本在ISR中 && 副本所在的Broker不在正在关闭的Broker集合中 。构造新的ISR列表: 在之前的isr列表中将 正在被关闭的Broker里面的副本 给剔除掉
1 | 触发场景: |
Election#leaderForControlledShutdown
1 | /** |
2.4.1、触发场景:Broker关机的时候
当Broker关闭的时候, 会向Controller发一起一个
ControlledShutdownRequest
请求, Controller收到这个请求会针对性的做一些善后事件。比如说 执行Leader重选举 等等之类的。
源码位置:KafkaServer#controlledShutdown
Controller收到请求的源码位置:KafkaController#doControlledShutdown
与之相关的配置有:
1 | controlled.shutdown.enable : 是否启用受控关闭操作 |
3、其他场景
新创建的Topic Leader选举策略
创建新的Topic的时候,并没有发生Leader选举的操作, 而是默认从分区对应的所有在线副本中选择第一个为leader, 然后isr就为 所有在线副本,再组装一下当前的controller_epoch信息,写入到zk节点
/brokers/topics/{Topic名称}/partitions/{分区号}/state
中。
最后发起 LeaderAndIsrRequest 请求,通知 leader 的变更。
详细看看源码:
PartitionStateMachine#doHandleStateChanges
分区状态从 NewPartition
流转到OnlinePartition
1 | /** |
从当前的Controller 内存中获取所有入参的分区对应的副本信息
过滤那些已经下线的副本( Broker宕机、网络异常、磁盘脱机、等等都有可能造成副本下线) 。
每个分区对应的所有在线副本信息 为 ISR 信息,然后取ISR的第一个副本为leader分区。当然特别注意一下, 这个时候获取的isr信息的顺序就是 分区创建时候分配好的AR顺序, 获取第一个在线的。(因为在其他情况下 ISR的顺序跟AR的顺序并不一致)
组装 上面的 isr、leader、controller_epoch 等信息 写入到zk节点 /brokers/topics/{Topic名称}/partitions/{分区号}/state 例如下面所示
1
{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0,1,2]}
然后向其他相关Broker 发起 LeaderAndIsrRequest 请求,通知他们Leader和Isr信息已经变更了,去做一下想要的处理。比如去新的leader发起Fetcher请求同步数据。
5、kafka在Zookeeper的存储结构
当我们kafka启动运行以后,就会在zookeeper上初始化kafka相关数据,主要包括六大类:
- consumers
- admin
- config
- controller
- brokers
- controller_epoch
5.1、brokers节点结构说明
5.1.1 topic信息结构
/brokers/topics/[topic] :
存储某个topic的partitions所有分配信息:
1 | Schema: |
5.1.2 partitions信息
/brokers/topics/[topic]/partitions/[0…N] 其中[0..N]表示partition索引号 /brokers/topics/[topic]/partitions/[partitionId]/state
1 | Schema: |
5.1.3 broker信息
/brokers/ids/[0…N]
每个broker的配置文件中都需要指定一个数字类型的id(全局不可重复),此节点为临时znode(EPHEMERAL)
1 | Schema: |
5.2、Controller_epoch
/controller_epoch -> int (epoch)
此值为一个数字,kafka集群中第一个broker第一次启动时为1,以后只要集群中center controller(中央控制器)所在broker变更或挂掉,就会重新选举新的center controller,每次center controller变更controller_epoch值就会 + 1;
5.3、Controller信息
/controller -> int (broker id of the controller)
存储center controller(中央控制器)所在kafka broker的信息。
1 | Schema: |
这个的意思就说明,当前的Controller所在的Broker机器是哪台,变更时间是多少等。
5.4、Consumer信息
/consumers/[groupId]/ids/[consumerIdString]
每个consumer都有一个唯一的ID(consumerId可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息。
1 | Schema: |
5.4.1 Consumer offset信息
/consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset)
用来跟踪每个consumer目前所消费的partition中最大的offset。此znode为持久节点,可以看出offset跟group_id有关,以表明当消费者组(consumer group)中一个消费者失效,重新触发balance,其他consumer可以继续消费。
5.5、admin信息
1 | [zk: localhost:2181(CONNECTED) 49] ls -R /admin |
5.6、config信息
1 | [zk: localhost:2181(CONNECTED) 50] ls -R /config |
6、零拷贝
Kafka之所以那么快,其中一个很大的原因就是零拷贝(Zero-copy)
技术,零拷贝不是kafka的专利,而是操作系统的升级,又比如Netty,也用到了零拷贝。
Java 类库通过 java.nio.channels.FileChannel 中的 transferTo() 方法来在 Linux 和 UNIX 系统上支持零拷贝。可以使用 transferTo() 方法直接将字节从它被调用的通道上传输到另外一个可写字节通道上,数据无需流经应用程序。
传统IO
kafka的数据是要落入磁盘的,那么必然牵扯到磁盘的IO,传统磁盘IO又叫缓存IO,效率是很低的,那么为什么效率低呢?我们先来粗略讲讲操作系统的知识。
用户空间以及内核空间的概念:
我们知道现在操作系统都是采用虚拟存储器。那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操心系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核,保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间(Kernel space),一部分为用户空间(User space)。针对Linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。每个进程可以通过系统调用进入内核,因此,Linux内核由系统内的所有进程共享。于是,从具体进程的角度来看,每个进程可以拥有4G字节的虚拟空间。
传统的文件读写或者网络传输,通常需要将数据从内核态转换为用户态。应用程序读取用户态内存数据,写入文件 / Socket之前,需要从用户态转换为内核态之后才可以写入文件或者网卡当中。我们可以称之为read/write模式,此模式的步骤为:
- 首先,调用read时,磁盘文件拷贝到了内核态;
- 之后,CPU控制将内核态数据copy到用户态下;
- 调用write时,先将用户态下的内容copy到内核态下的socket的buffer中;
- 最后将内核态下的socket buffer的数据copy到网卡设备中传送;
DMA
DMA(Direct Memory Access,直接存储器访问) 是所有现代电脑的重要特色,它允许不同速度的硬件装置来沟通,而不需要依赖于 CPU 的大量中断负载 。通俗来讲,就是DMA 传输将数据从一个地址空间复制到另外一个地址空间,当CPU 初始化这个传输动作,传输动作本身是由 DMA 控制器来实行和完成,也就是两个硬件之间完成的,而没有CPU的参与,那么CPU就可以释放出来做别的事情,这样极大地提高了效率。我们常见的硬件设备如网卡、磁盘设备、显卡、声卡之类的都支持DMA。
所以上面所说的read/write模式大概如图所示:
传统IO有两个很大的缺点导致很慢:
- 我们可以清楚的看到共产生了4次copy,从磁盘文件到Kernal的相互读写是支持DMA copy的,但即使是这样,从Kernal到User没有硬件的支持所以不支持DMA,还有两次CPU copy。
- Kafka只是把文件存放到磁盘之后通过网络发出去,中间并不需要修改什么数据,那read和write的两次CPU copy的操作完全是多余的。
零拷贝
mmap
mmap是零拷贝的一种,主要就是去掉read write这两次CPU copy以提升性能,调用mmap()来代替read调用:
1 | buf = mmap(diskfd, len); |
此模式步骤为:
- 用户程序调用 mmap(),磁盘上的数据会通过 DMA被拷贝的内核缓冲区;
- 接着操作系统会把这段内核缓冲区与用户程序共享,这样就不需要把内核缓冲区的内容往用户空间拷贝;
- 用户程序再调用 write(),操作系统直接将内核缓冲区的内容拷贝到 socket缓冲区中;
- 最后, socket缓冲区再把数据发到网卡去。
这显然是一个伟大的进步,把上下文的切换次数从4次减少到2次,同时也把数据copy的次数从4次降低到了3次。
sendfile
Linux2.1内核开始引入了sendfile函数,用于将文件通过socket传送。开始时跟mmap没什么区别,但是Linux2.4做出了重大优化,将零拷贝推到顶峰。
优化后的处理过程如下:
- 将文件拷贝到kernel buffer中;
- 向socket buffer中追加当前要发生的数据在kernel buffer中的位置和偏移量;
- 根据socket buffer中的位置和偏移量直接将kernel buffer的数据copy到网卡设备中;
如图:
经过上述过程,数据只经过了2次copy就从磁盘传送出去了。这个才是真正的Zero-Copy(这里的零拷贝是针对kernel来讲的,数据在kernel模式下是Zero-Copy)。
正是Linux2.4的内核做了改进,Java中的TransferTo()实现了Zero-Copy。
mmap 和 sendfile总结
1、都是Linux内核提供、实现零拷贝的API;
2、sendfile 是将读到内核空间的数据,转到socket buffer,进行网络发送;
3、mmap将磁盘文件映射到内存,支持读和写,对内存的操作会反映在磁盘文件上。
RocketMQ 在消费消息时,使用了 mmap。kafka 使用了 sendFile。
测试
在Windows10上测试:
测试结果仅供参考 ,并不是平均数,所以可能偏差较大。
常见面试题
1、kafka的消费者是pull(拉)还是push(推)模式,这种模式有什么好处?
Kafka 遵循了一种大部分消息系统共同的传统的设计:producer 将消息推送到 broker,consumer 从broker 拉取消息。
优点:pull模式消费者自主决定是否批量从broker拉取数据,而push模式在无法知道消费者消费能力情况下,不易控制推送速度,太快可能造成消费者奔溃,太慢又可能造成浪费。
缺点:如果 broker 没有可供消费的消息,将导致 consumer 不断在循环中轮询,直到新消息到到达。为了避免这点,Kafka 有个参数可以让 consumer阻塞知道新消息到达(当然也可以阻塞知道消息的数量达到某个特定的量这样就可以批量发送)。
2、kafka维护消息状态的跟踪方法
Kafka中的Topic 被分成了若干分区,每个分区在同一时间只被一个 consumer 消费。然后再通过offset进行消息位置标记,通过位置偏移来跟踪消费状态。相比其他一些消息队列使用“一个消息被分发到consumer 后 broker 就马上进行标记或者等待 customer 的通知后进行标记”的优点是,避免了通信消息发送后,可能出现的程序奔溃而出现消息丢失或者重复消费的情况。同时也无需维护消息的状态,不用加锁,提高了吞吐量。
3、zookeeper对于kafka的作用是什么?
Zookeeper 主要用于在集群中不同节点之间进行通信,在 Kafka 中,它被用于提交偏移量,因此如果节点在任何情况下都失败了,它都可以从之前提交的偏移量中获取,除此之外,它还执行其他活动,如: leader 检测、分布式同步、配置管理、识别新节点何时离开或连接、集群、节点实时状态等等。
4、kafka判断一个节点还活着的有那两个条件?
(1)节点必须维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接
(2)如果节点是个 follower,他必须能及时的同步 leader 的写操作,延时不能太久
5、讲一讲 kafka 的 ack 的三种机制
request.required.acks 有三个值 0 1 -1(all),具体如下:
0:生产者不会等待 broker 的 ack,这个延迟最低但是存储的保证最弱当 server 挂掉的时候就会丢数据。
1:服务端会等待 ack 值 leader 副本确认接收到消息后发送 ack 但是如果 leader挂掉后他不确保是否复制完成新 leader 也会导致数据丢失。
-1(all):服务端会等所有的 follower 的副本受到数据后才会受到 leader 发出的ack,这样数据不会丢失。
6、kafka 分布式(不是单机)的情况下,如何保证消息的顺序消费?
Kafka 中发送 1 条消息的时候,可以指定(topic, partition, key) 3 个参数,partiton 和 key 是可选的。
Kafka 分布式的单位是 partition,同一个 partition 用一个 write ahead log 组织,所以可以保证FIFO 的顺序。不同 partition 之间不能保证顺序。因此你可以指定 partition,将相应的消息发往同 1个 partition,并且在消费端,Kafka 保证1 个 partition 只能被1 个 consumer 消费,就可以实现这些消息的顺序消费。
另外,你也可以指定 key(比如 order id),具有同 1 个 key 的所有消息,会发往同 1 个partition,那这样也实现了消息的顺序消息。
7、kafka 如何不消费重复数据?比如扣款,我们不能重复的扣。
这个问题换种问法,就是kafka如何保证消息的幂等性。对于消息队列来说,出现重复消息的概率还是挺大的,不能完全依赖消息队列,而是应该在业务层进行数据的一致性幂等校验。
比如你处理的数据要写库(mysql,redis等),你先根据主键查一下,如果这数据都有了,你就别插入了,进行一些消息登记或者update等其他操作。另外,数据库层面也可以设置唯一健,确保数据不要重复插入等 。一般这里要求生产者在发送消息的时候,携带全局的唯一id。
8、讲一下kafka集群的组成?
kafka的集群图如下:
Broker(代理)
Kafka集群通常由多个代理组成以保持负载平衡。 Kafka代理是无状态的,所以他们使用ZooKeeper来维护它们的集群状态。 一个Kafka代理实例可以每秒处理数十万次读取和写入,每个Broker可以处理TB的消息,而没有性能影响。 Kafka经纪人领导选举可以由ZooKeeper完成。
ZooKeeper
ZooKeeper用于管理和协调Kafka代理。 ZooKeeper服务主要用于通知生产者和消费者Kafka系统中存在任何新代理或Kafka系统中代理失败。 根据Zookeeper接收到关于代理的存在或失败的通知,然后生产者和消费者采取决定并开始与某些其他代理协调他们的任务。
Producers(生产者)
生产者将数据推送给经纪人。 当新代理启动时,所有生产者搜索它并自动向该新代理发送消息。 Kafka生产者不等待来自代理的确认,并且发送消息的速度与代理可以处理的一样快。
Consumers(消费者)
因为Kafka代理是无状态的,这意味着消费者必须通过使用分区偏移来维护已经消耗了多少消息。 如果消费者确认特定的消息偏移,则意味着消费者已经消费了所有先前的消息。 消费者向代理发出异步拉取请求,以具有准备好消耗的字节缓冲区。 消费者可以简单地通过提供偏移值来快退或跳到分区中的任何点。 消费者偏移值由ZooKeeper通知。
9、kafka是什么?
Kafka是一种高吞吐量、分布式、基于发布/订阅的消息系统,最初由LinkedIn公司开发,使用Scala语言编写,目前是Apache的开源项目。
broker: Kafka服务器,负责消息存储和转发
topic:消息类别,Kafka按照topic来分类消息
partition: topic的分区,一个topic可以包含多个partition, topic 消息保存在各个partition上
offset:消息在日志中的位置,可以理解是消息在partition上的偏移量,也是代表该消息的唯一序号
Producer:消息生产者
Consumer:消息消费者
Consumer Group:消费者分组,每个Consumer必须属于一个group
Zookeeper:保存着集群 broker、 topic、 partition等meta 数据;另外,还负责broker故障发现, partition leader选举,负载均衡等功能
10、partition的数据文件(offffset,MessageSize,data)
partition中的每条Message包含了以下三个属性: offset,MessageSize,data,其中offset表示Message在这个partition中的偏移量,offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message,可以认为offset是partition中Message的 id; MessageSize表示消息内容data的大小;data为Message的具体内容。
11、kafka如何实现数据的高效读取?(顺序读写、分段命令、二分查找)
Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为index。 index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。
12、 Kafka 消费者端的 Rebalance 操作什么时候发生?
同一个 consumer 消费者组 group.id 中,新增了消费者进来,会执行 Rebalance 操作
消费者离开当期所属的 consumer group组。比如宕机
分区数量发生变化时(即 topic 的分区数量发生变化时)
消费者主动取消订阅
Rebalance的过程如下:
第一步:所有成员都向coordinator发送请求,请求入组。一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。
第二步:leader开始分配消费方案,指明具体哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。
所以对于Rebalance来说,Coordinator起着至关重要的作用
13、Kafka 中的ISR(InSyncRepli)、OSR(OutSyncRepli)、AR(AllRepli)代表什么?
kafka中与leader副本保持一定同步程度的副本(包括leader)组成ISR。与leader滞后太多的副本组成OSR。分区中所有的副本通称为AR。
ISR : 速率和leader相差低于10秒的follower的集合
OSR : 速率和leader相差大于10秒的follower
AR : 全部分区的follower
14、Kafka 中的HW、LEO等分别代表什么?
HW:高水位,指消费者只能拉取到这个offset之前的数据
LEO:日志末端位移,标识当前日志文件中下一条待写入的消息的offset,大小等于当前日志文件最后一条消息的offset+1.
高水位的作用:
- 定义消息可见性,即用来标识分区下的哪些消息是可以被消费者消费者;
- 用来帮助kafka完成副本同步;
- leader副本的高水位就是分区的高水位;
日志末端位移的作用:
- 副本下一条消息的位移值;
- 副本总消息条数就是末端位移值,例如15代表总共有15条消息,下一条消息的位移是15;
- 同一个副本,高水位值不会大于末端位移值;
HW和LEO更新机制
Leader副本会保存所有Followe副本的HW、LEO。但是不会更新远程副本的HW,也就是图中标记为灰色的部分。
分析源码可以知道,Leader副本所在的broker上只有重置更新远程副本的LEO,没有远程副本的HW。
Leader 副本
处理生产者请求的逻辑如下:
- 写入消息到本地磁盘
- 更新分区高水位值
- 获取 Leader 副本所在 Broker 端保存的所有远程副本 LEO 值(LEO-1,LEO-2,……,LEO-n)
- 获取 Leader 副本高水位值:currentHW
- 更新 currentHW = max{currentHW, min(LEO-1, LEO-2, ……,LEO-n)}
处理 Follower 副本拉取消息的逻辑如下:
- 读取磁盘(或页缓存)中的消息数据
- 使用 Follower 副本发送请求中的位移值更新远程副本 LEO 值
- 更新分区高水位值(具体步骤与处理生产者请求的步骤相同)
Follower 副本
从 Leader 拉取消息的处理逻辑如下:
写入消息到本地磁盘
更新 LEO 值
更新高水位值
获取 Leader 发送的高水位值:currentHW
获取步骤 2 中更新过的 LEO 值:currentLEO
更新高水位为 min(currentHW, currentLEO)
Leader高水位会比Follow领先,然后持平
副本同步机制
搞清楚了上面 HW 和 LEO 的更新机制后,我们举一个单分区且有两个副本的主题来演示下 Kafka 副本同步的全流程。
首先是初始状态。下面这张图中的 remote LEO 就是刚才的远程副本的 LEO 值。在初始状态时,所有值都是 0。
当生产者给主题分区发送一条消息后,状态变更为:
此时,Leader 副本成功将消息写入了本地磁盘,故 LEO 值被更新为 1。
Follower 再次尝试从 Leader 拉取消息。和之前不同的是,这次有消息可以拉取了,因此状态进一步变更为:
这时,Follower 副本也成功地更新 LEO 为 1。此时,Leader 和 Follower 副本的 LEO 都是 1,但各自的高水位依然是 0,还没有被更新。它们需要在下一轮的拉取中被更新,如下图所示:
在新一轮的拉取请求中,由于位移值是 0 的消息已经拉取成功,因此 Follower 副本这次请求拉取的是位移值 =1 的消息。Leader 副本接收到此请求后,更新远程副本 LEO 为 1,然后更新 Leader 高水位为 1。做完这些之后,它会将当前已更新过的高水位值 1 发送给 Follower 副本。Follower 副本接收到以后,也将自己的高水位值更新成 1。至此,一次完整的消息同步周期就结束了。事实上,Kafka 就是利用这样的机制,实现了 Leader 和 Follower 副本之间的同步。
Leader Epoch 机制
Follower 副本的高水位更新需要一轮额外的拉取请求才能实现。如果把上面那个例子扩展到多个 Follower 副本,情况可能更糟,也许需要多轮拉取请求。也就是说,Leader 副本高水位更新和 Follower 副本高水位更新在时间上是存在错配的。这种错配是很多“数据丢失”或“数据不一致”问题的根源。基于此,社区在 0.11 版本正式引入了 Leader Epoch 概念,来规避因高水位更新错配导致的各种不一致问题。
所谓 Leader Epoch,我们大致可以认为是 Leader 版本。它由两部分数据组成。
- Epoch。一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号。小版本号的 Leader 被认为是过期 Leader,不能再行使 Leader 权力。
- 起始位移(Start Offset)。Leader 副本在该 Epoch 值上写入的首条消息的位移。
Kafka Broker 会在内存中为每个分区都缓存 Leader Epoch 数据,同时它还会定期地将这些信息持久化到一个 checkpoint 文件中。当 Leader 副本写入消息到磁盘时,Broker 会尝试更新这部分缓存。如果该 Leader 是首次写入消息,那么 Broker 会向缓存中增加一个 Leader Epoch 条目,否则就不做更新。这样,每次有 Leader 变更时,新的 Leader 副本会查询这部分缓存,取出对应的 Leader Epoch 的起始位移,以避免数据丢失和不一致的情况。
开始时,副本 A 和副本 B 都处于正常状态,A 是 Leader 副本。某个使用了默认 acks 设置的生产者程序向 A 发送了两条消息,A 全部写入成功,此时 Kafka 会通知生产者说两条消息全部发送成功。
现在我们假设 Leader 和 Follower 都写入了这两条消息,而且 Leader 副本的高水位也已经更新了,但 Follower 副本高水位还未更新——这是可能出现的。还记得吧,Follower 端高水位的更新与 Leader 端有时间错配。倘若此时副本 B 所在的 Broker 宕机,当它重启回来后,副本 B 会执行日志截断操作,将 LEO 值调整为之前的高水位值,也就是 1。这就是说,位移值为 1 的那条消息被副本 B 从磁盘中删除,此时副本 B 的底层磁盘文件中只保存有 1 条消息,即位移值为 0 的那条消息。
只不过引用 Leader Epoch 机制后,Follower 副本 B 重启回来后,需要向 A 发送一个特殊的请求去获取 Leader 的 LEO 值。在这个例子中,该值为 2。当获知到 Leader LEO=2 后,B 发现该 LEO 值不比它自己的 LEO 值小,而且缓存中也没有保存任何起始位移值 > 2 的 Epoch 条目,因此 B 无需执行任何日志截断操作。这是对高水位机制的一个明显改进,即副本是否执行日志截断不再依赖于高水位进行判断。
现在,副本 A 宕机了,B 成为 Leader。同样地,当 A 重启回来后,执行与 B 相同的逻辑判断,发现也不用执行日志截断,至此位移值为 1 的那条消息在两个副本中均得到保留。后面当生产者程序向 B 写入新消息时,副本 B 所在的 Broker 缓存中,会生成新的 Leader Epoch 条目:[Epoch=1, Offset=2]。之后,副本 B 会使用这个条目帮助判断后续是否执行日志截断操作。这样,通过 Leader Epoch 机制,Kafka 完美地规避了这种数据丢失场景。
15、Kafka的那些设计让它有如此高的性能?
1.kafka是分布式的消息队列
2.对log文件进行了segment,并对segment创建了索引
3.(对于单节点)使用了顺序读写,速度能够达到600M/s
4.Producer生产的数据持久化到broker,采用mmap文件映射,实现顺序的快速写入;
5.Customer从broker读取数据,采用sendfile,将磁盘文件读到OS内核缓冲区后,直接转到socket buffer进行网络发送。
16、Kafka为什么不支持读写分离?
1、 这其实是分布式场景下的通用问题,因为我们知道CAP理论下,我们只能保证C(一致性)和A(可用性)取其一,如果支持读写分离,那其实对于一致性的要求可能就会有一定折扣,因为通常的场景下,副本之间都是通过同步来实现副本数据一致的,那同步过程中肯定会有时间的消耗,如果支持了读写分离,就意味着可能的数据不一致,或数据滞后。
2、 Leader/Follower模型并没有规定Follower副本不可以对外提供读服务。很多框架都是允许这么做的,只是 Kafka最初为了避免不一致性的问题,而采用了让Leader统一提供服务的方式。
3、 不过,自Kafka 2.4之后,Kafka提供了有限度的读写分离,也就是说,Follower副本能够对外提供读服务。
17、分区Leader选举策略有几种?
分区的Leader副本选举对用户是完全透明的,它是由Controller独立完成的。你需要回答的是,在哪些场景下,需要执行分区Leader选举。每一种场景对应于一种选举策略。
1、 OfflinePartition Leader选举:每当有分区上线时,就需要执行Leader选举。所谓的分区上线,可能是创建了新分区,也可能是之前的下线分区重新上线。这是最常见的分区Leader选举场景。
2、 ReassignPartition Leader选举:当你手动运行Kafka-reassign-partitions命令,或者是调用Admin的alterPartitionReassignments方法执行分区副本重分配时,可能触发此类选举。假设原来的AR是[1,2,3],Leader是1,当执行副本重分配后,副本集合AR被设置成[4,5,6],显然,Leader必须要变更,此时会发生Reassign Partition Leader选举。
3、 PreferredReplicaPartition Leader选举:当你手动运行Kafka-preferred-replica-election命令,或自动触发了Preferred Leader选举时,该类策略被激活。所谓的Preferred Leader,指的是AR中的第一个副本。比如AR是[3,2,1],那么,Preferred Leader就是3。
4、 ControlledShutdownPartition Leader选举:当Broker正常关闭时,该Broker上的所有Leader副本都会下线,因此,需要为受影响的分区执行相应的Leader选举。
这4类选举策略的大致思想是类似的,即从AR中挑选首个在ISR中的副本,作为新Leader。
18、请简述下你在哪些场景下会选择 Kafka?
- 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、HBase、Solr等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
- 流式处理:比如spark streaming和 Flink
19、请谈一谈 Kafka 数据一致性原理
一致性就是说不论是老的 Leader 还是新选举的 Leader,Consumer 都能读到一样的数据。
假设分区的副本为3,其中副本0是 Leader,副本1和副本2是 follower,并且在 ISR 列表里面。虽然副本0已经写入了 Message4,但是 Consumer 只能读取到 Message2。因为所有的 ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 读取,而 High Water Mark 取决于 ISR 列表里面偏移量最小的分区,对应于上图的副本2,这个很类似于木桶原理。
这样做的原因是还没有被足够多副本复制的消息被认为是“不安全”的,如果 Leader 发生崩溃,另一个副本成为新 Leader,那么这些消息很可能丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想,一个消费者从当前 Leader(副本0) 读取并处理了 Message4,这个时候 Leader 挂掉了,选举了副本1为新的 Leader,这时候另一个消费者再去从新的 Leader 读取消息,发现这个消息其实并不存在,这就导致了数据不一致性问题。
当然,引入了 High Water Mark 机制,会导致 Broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数 replica.lag.time.max.ms 参数配置,它指定了副本在复制消息时可被允许的最大延迟时间。
20、Kafka 缺点?
由于是批量发送,数据并非真正的实时;
•对于mqtt协议不支持;
•不支持物联网传感数据直接接入;
•仅支持统一分区内消息有序,无法实现全局消息有序;
•监控不完善,需要安装插件;
•依赖zookeeper进行元数据管理;