作者:日常加油站 2021-09-06 08:31:11
开发
架构
Kafka Apache Kafka是由LinkedIn采用Scala和Java开发的开源流处理软件平台,并捐赠给了Apache Software Foundation。
成都创新互联坚持“要么做到,要么别承诺”的工作理念,服务领域包括:网站设计制作、成都网站建设、企业官网、英文网站、手机端网站、网站推广等服务,满足客户于互联网时代的永修网站设计、移动媒体设计的需求,帮助企业找到有效的互联网解决方案。努力成为您成熟可靠的网络建设合作伙伴!
Apache Kafka是由LinkedIn采用Scala和Java开发的开源流处理软件平台,并捐赠给了Apache Software Foundation。
该项目旨在提供统一的、高吞吐量、低延迟的平台来处理实时数据流。
Kafka可以通过Kafka Connect连接到外部系统,并提供了Kafka Streams。
Kafka是一种分布式的,基于发布/订阅的消息系统,主要特性如下:
我们在官网上下载Kafka时,会看到这样的版本:
前面的版本号是编译Kafka源代码的Scala编译器版本。
Kafka服务器端的代码完全由Scala语言编写,Scala同时支持面向对象编程和函数式编程,用Scala写成的源代码编译之后也是普通的.class文件,因此我们说Scala是JVM系的语言。
真正的Kafka版本号实际上是2.1.1。
前面的2表示大版本号,即Major Version;中间的1表示小版本号或次版本号,即Minor Version;最后的1表示修订版本号,也就是Patch号。
Kafka社区在发布1.0.0版本后写过一篇文章,宣布Kafka版本命名规则正式从4位演进到3位,比如0.11.0.0版本就是4位版本号。
有个建议,不论用的是哪个版本,都请尽量保持服务器端版本和客户端版本一致,否则你将损失很多Kafka为你提供的性能优化收益。
0.7版本:只提供了最基础的消息队列功能。
0.8版本:引入了副本机制,至此kafka成为了一个整整意义上完备的分布式可靠消息队列解决方案
0.9.0.0版本:增加了基础的安全认证/权限功能;使用Java重新了新版本消费者API;引入了Kafka Connect组件。
0.11.0.0版本:提供了幂等性Producer API以及事务API;对Kafka消息格式做了重构。
1.0和2.0版本:主要还是Kafka Streams的各种改进
发布订阅的对象是主题(Topic),可以为每 个业务、每个应用甚至是每类数据都创建专属的主题
向主题发布消息的客户端应用程序称为生产者,生产者程序通常持续不断地 向一个或多个主题发送消息
订阅这些主题消息的客户端应用程序就被称为消费者,消费者也能够同时订阅多个主题的消息
集群由多个 Broker 组成,Broker 负责接收和处理客户端发送过来的请求,以及对消息进行持久化
虽然多个 Broker 进程能够运行在同一台机器上,但更常见的做法是将 不同的 Broker 分散运行在不同的机器上,这样如果集群中某一台机器宕机,即使在它上面 运行的所有 Broker 进程都挂掉了,其他机器上的 Broker 也依然能够对外提供服务
备份的思想很简单,就是把相同的数据拷贝到多台机器上,而这些相同的数据拷贝被称为副本
定义了两类副本:领导者副本和追随者副本
前者对外提供服务,这里的对外指的是与 客户端程序进行交互;而后者只是被动地追随领导者副本而已,不能与外界进行交互
分区机制指的是将每个主题划分成多个分区,每个分区是一组有序的消息日志
生产者生产的每条消息只会被发送到一个分区中,也就是说如果向一个双分区的主题发送一条消息,这条消息要么在分区 0 中,要么在分区 1 中
每个分区下可以配置若干个副本,其中只能有 1 个领 导者副本和 N-1 个追随者副本
生产者向分区写入消息,每条消息在分区中的位置信息叫位移
多个消费者实例共同组成一个组来 消费一组主题
这组主题中的每个分区都只会被组内的一个消费者实例消费,其他消费者实例不能消费它
如果所有实例都属于同一个 Group, 那么它实现的就是消息队列模型;
如果所有实例分别属于不 同的 Group,那么它实现的就是发布/订阅模型
所谓协调者,它专门为Consumer Group服务,负责为Group执行Rebalance以及提供位移管理和组成员管理等。
具体来讲,Consumer端应用程序在提交位移时,其实是向Coordinator所在的Broker提交位移,同样地,当Consumer应用启动时,也是向Coordinator所在的Broker发送各种请求,然后由Coordinator负责执行消费者组的注册、成员管理记录等元数据管理操作。
所有Broker在启动时,都会创建和开启相应的Coordinator组件。
也就是说,「所有Broker都有各自的Coordinator组件」。
那么,Consumer Group如何确定为它服务的Coordinator在哪台Broker上呢?
通过Kafka内部主题__consumer_offsets。
目前,Kafka为某个Consumer Group确定Coordinator所在的Broker的算法有2个步骤。
首先,Kafka会计算该Group的group.id参数的哈希值。
比如你有个Group的group.id设置成了test-group,那么它的hashCode值就应该是627841412。
其次,Kafka会计算__consumer_offsets的分区数,通常是50个分区,之后将刚才那个哈希值对分区数进行取模加求绝对值计算,即abs(627841412 % 50) = 12。
此时,我们就知道了__consumer_offsets主题的分区12负责保存这个Group的数据。
有了分区号,我们只需要找出__consumer_offsets主题分区12的Leader副本在哪个Broker上就可以了,这个Broker,就是我们要找的Coordinator。
消费者消费进度,每个消费者都有自己的消费者位移。
消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。
Rebalance是Kafka消费者端实现高可用的重要手段。
「AR(Assigned Replicas)」:分区中的所有副本统称为AR。
所有消息会先发送到leader副本,然后follower副本才能从leader中拉取消息进行同步。
但是在同步期间,follower对于leader而言会有一定程度的滞后,这个时候follower和leader并非完全同步状态
「OSR(Out Sync Replicas)」:follower副本与leader副本没有完全同步或滞后的副本集合
「ISR(In Sync Replicas):「AR中的一个子集,ISR中的副本都」是与leader保持完全同步的副本」,如果某个在ISR中的follower副本落后于leader副本太多,则会被从ISR中移除,否则如果完全同步,会从OSR中移至ISR集合。
在默认情况下,当leader副本发生故障时,只有在ISR集合中的follower副本才有资格被选举为新leader,而OSR中的副本没有机会(可以通过unclean.leader.election.enable进行配置)
「HW(High Watermark)」:高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个水位 offset 之前的消息
下图表示一个日志文件,这个日志文件中只有9条消息,第一条消息的offset(LogStartOffset)为0,最有一条消息的offset为8,offset为9的消息使用虚线表示的,代表下一条待写入的消息。
日志文件的 HW 为6,表示消费者只能拉取offset在 0 到 5 之间的消息,offset为6的消息对消费者而言是不可见的。
「LEO(Log End Offset)」:标识当前日志文件中下一条待写入的消息的offset
上图中offset为9的位置即为当前日志文件的 LEO,LEO 的大小相当于当前日志分区中最后一条消息的offset值加1
分区 ISR 集合中的每个副本都会维护自身的 LEO ,而 ISR 集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 之前的消息。
一个最基本的架构是生产者发布一个消息到Kafka的一个Topic ,该Topic的消息存放于的Broker中,消费者订阅这个Topic,然后从Broker中消费消息,下面这个图可以更直观的描述这个场景:
「消息状态:」 在Kafka中,消息是否被消费的状态保存在Consumer中,Broker不会关心消息是否被消费或被谁消费,Consumer会记录一个offset值(指向partition中下一条将要被消费的消息位置),如果offset被错误设置可能导致同一条消息被多次消费或者消息丢失。
「消息持久化:」 Kafka会把消息持久化到本地文件系统中,并且具有极高的性能。
「批量发送:」 Kafka支持以消息集合为单位进行批量发送,以提高效率。
「Push-and-Pull:」 Kafka中的Producer和Consumer采用的是Push-and-Pull模式,即Producer向Broker Push消息,Consumer从Broker Pull消息。
「分区机制(Partition):」 Kafka的Broker端支持消息分区,Producer可以决定把消息发到哪个Partition,在一个Partition中消息的顺序就是Producer发送消息的顺序,一个Topic中的Partition数是可配置的,Partition是Kafka高吞吐量的重要保证。
通常情况下,一个kafka体系架构包括「多个Producer」、「多个Consumer」、「多个broker」以及「一个Zookeeper集群」。
「Producer」:生产者,负责将消息发送到kafka中。
「Consumer」:消费者,负责从kafka中拉取消息进行消费。
「Broker」:Kafka服务节点,一个或多个Broker组成了一个Kafka集群
「Zookeeper集群」:负责管理kafka集群元数据以及控制器选举等。
Kafka的消息组织方式实际上是三级结构:主题-分区-消息。
主题下的每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。
其实分区的作用就是提供负载均衡的能力,或者说对数据进行分区的主要原因,就是为了实现系统的高伸缩性(Scalability)。
不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理,并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。
「所谓分区策略是决定生产者将消息发送到哪个分区的算法。」
Kafka为我们提供了默认的分区策略,同时它也支持你自定义分区策略。
如果要自定义分区策略,你需要显式地配置生产者端的参数partitioner.class。
在编写生产者程序时,你可以编写一个具体的类实现org.apache.kafka.clients.producer.Partitioner接口。
这个接口也很简单,只定义了两个方法:partition()和close(),通常你只需要实现最重要的partition方法。
我们来看看这个方法的方法签名:
- int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
这里的topic、key、keyBytes、value和valueBytes都属于消息数据,cluster则是集群信息(比如当前Kafka集群共有多少主题、多少Broker等)。
Kafka给你这么多信息,就是希望让你能够充分地利用这些信息对消息进行分区,计算出它要被发送到哪个分区中。
只要你自己的实现类定义好了partition方法,同时设置partitioner.class参数为你自己实现类的Full Qualified Name,那么生产者程序就会按照你的代码逻辑对消息进行分区。
也称Round-robin策略,即顺序分配。
比如一个主题下有3个分区,那么第一条消息被发送到分区0,第二条被发送到分区1,第三条被发送到分区2,以此类推。当生产第4条消息时又会重新开始,即将其分配到分区0
这就是所谓的轮询策略。轮询策略是Kafka Java生产者API默认提供的分区策略。
「轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是我们最常用的分区策略之一。」
也称Randomness策略。所谓随机就是我们随意地将消息放置到任意一个分区上。
如果要实现随机策略版的partition方法,很简单,只需要两行代码即可:
- List partitions = cluster.partitionsForTopic(topic);
- return ThreadLocalRandom.current().nextInt(partitions.size());
先计算出该主题总的分区数,然后随机地返回一个小于它的正整数。
本质上看随机策略也是力求将数据均匀地打散到各个分区,但从实际表现来看,它要逊于轮询策略,所以「如果追求数据的均匀分布,还是使用轮询策略比较好」。事实上,随机策略是老版本生产者使用的分区策略,在新版本中已经改为轮询了。
Kafka允许为每条消息定义消息键,简称为Key。
这个Key的作用非常大,它可以是一个有着明确业务含义的字符串,比如客户代码、部门编号或是业务ID等;也可以用来表征消息元数据。
特别是在Kafka不支持时间戳的年代,在一些场景中,工程师们都是直接将消息创建时间封装进Key里面的。
一旦消息被定义了Key,那么你就可以保证同一个Key的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略
实现这个策略的partition方法同样简单,只需要下面两行代码即可:
- List partitions = cluster.partitionsForTopic(topic);
- return Math.abs(key.hashCode()) % partitions.size();
前面提到的Kafka默认分区策略实际上同时实现了两种策略:如果指定了Key,那么默认实现按消息键保序策略;如果没有指定Key,则使用轮询策略。
其实还有一种比较常见的,即所谓的基于地理位置的分区策略。
当然这种策略一般只针对那些大规模的Kafka集群,特别是跨城市、跨国家甚至是跨大洲的集群。
我们可以根据Broker所在的IP地址实现定制化的分区策略。比如下面这段代码:
- List partitions = cluster.partitionsForTopic(topic);
- return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();
我们可以从所有分区中找出那些Leader副本在南方的所有分区,然后随机挑选一个进行消息发送。
目前Kafka共有两大类消息格式,社区分别称之为V1版本和V2版本。
V2版本是Kafka 0.11.0.0中正式引入的。
不论是哪个版本,Kafka的消息层次都分为两层:消息集合以及消息。
一个消息集合中包含若干条日志项,而日志项才是真正封装消息的地方。
Kafka底层的消息日志由一系列消息集合日志项组成。
Kafka通常不会直接操作具体的一条条消息,它总是在消息集合这个层面上进行写入操作。
V2版本主要是针对V1版本的一些弊端做了修正,比如把消息的公共部分抽取出来放到外层消息集合里面,这样就不用每条消息都保存这些信息了。
举个例子:原来在V1版本中,每条消息都需要执行CRC校验,但有些情况下消息的CRC值是会发生变化的。
比如在Broker端可能会对消息时间戳字段进行更新,那么重新计算之后的CRC值也会相应更新;再比如Broker端在执行消息格式转换时(主要是为了兼容老版本客户端程序),也会带来CRC值的变化。
鉴于这些情况,再对每条消息都执行CRC校验就有点没必要了,不仅浪费空间还耽误CPU时间,因此在V2版本中,消息的CRC校验工作就被移到了消息集合这一层。
V2版本还有一个和压缩息息相关的改进,就是保存压缩消息的方法发生了变化。
之前V1版本中保存压缩消息的方法是把多条消息进行压缩然后保存到外层消息的消息体字段中;而V2版本的做法是对整个消息集合进行压缩,显然后者应该比前者有更好的压缩效果。
在Kafka中,压缩可能发生在两个地方:生产者端和Broker端。
生产者程序中配置compression.type参数即表示启用指定类型的压缩算法。
比如下面这段程序代码展示了如何构建一个开启GZIP的Producer对象:
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092");
- props.put("acks", "all");
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 开启GZIP压缩
- props.put("compression.type", "gzip");
- Producer producer = new KafkaProducer<>(props);
这里比较关键的代码行是props.put(“compression.type”, “gzip”),它表明该Producer的压缩算法使用的是GZIP。
这样Producer启动后生产的每个消息集合都是经GZIP压缩过的,故而能很好地节省网络传输带宽以及Kafka Broker端的磁盘占用。
有两种例外情况就可能让Broker重新压缩消息:
一旦你在Broker端设置了不同的compression.type值,就一定要小心了,因为可能会发生预料之外的压缩/解压缩操作,通常表现为Broker端CPU使用率飙升。
所谓的消息格式转换主要是为了兼容老版本的消费者程序。
在一个生产环境中,Kafka集群中同时保存多种版本的消息格式非常常见。
为了兼容老版本的格式,Broker端会对新版本消息执行向老版本格式的转换。
这个过程中会涉及消息的解压缩和重新压缩。
一般情况下这种消息格式转换对性能是有很大影响的,除了这里的压缩之外,它还让Kafka丧失了Zero Copy特性。
有压缩必有解压缩!通常来说解压缩发生在消费者程序中,也就是说Producer发送压缩消息到Broker后,Broker照单全收并原样保存起来。当Consumer程序请求这部分消息时,Broker依然原样发送出去,当消息到达Consumer端后,由Consumer自行解压缩还原成之前的消息。
注意:除了在Consumer端解压缩,Broker端也会进行解压缩。
每个压缩过的消息集合在Broker端写入时都要发生解压缩操作,目的就是为了对消息执行各种验证。
我们必须承认这种解压缩对Broker端性能是有一定影响的,特别是对CPU的使用率而言。
在Kafka 2.1.0版本之前,Kafka支持3种压缩算法:GZIP、Snappy和LZ4。
从2.1.0开始,Kafka正式支持Zstandard算法(简写为zstd)。
它是Facebook开源的一个压缩算法,能够提供超高的压缩比。
在实际使用中,GZIP、Snappy、LZ4和zstd的表现各有千秋。
但对于Kafka而言,在吞吐量方面:LZ4 > Snappy > zstd和GZIP;而在压缩比方面,zstd > LZ4 > GZIP > Snappy。
具体到物理资源,使用Snappy算法占用的网络带宽最多,zstd最少;
在CPU使用率方面,各个算法表现得差不多,只是在压缩时Snappy算法使用的CPU较多一些,而在解压缩时GZIP算法则可能使用更多的CPU。
何时启用压缩是比较合适的时机呢?
启用压缩的一个条件就是Producer程序运行机器上的CPU资源要很充足。
除了CPU资源充足这一条件,如果你的环境中带宽资源有限,那么建议你开启压缩。
既然是一个组,那么组内必然可以有多个消费者或消费者实例,它们共享一个公共的ID,这个ID被称为Group ID。
组内的所有消费者协调在一起来消费订阅主题的所有分区。
当Consumer Group订阅了多个主题后,组内的每个实例不要求一定要订阅主题的所有分区,它只会消费部分分区中的消息。
Consumer Group之间彼此独立,互不影响,它们能够订阅相同的一组主题而互不干涉。
如果所有实例都属于同一个Group,那么它实现的就是消息队列模型;
如果所有实例分别属于不同的Group,那么它实现的就是发布/订阅模型。
「一个Group下该有多少个Consumer实例呢?」
「理想情况下,Consumer实例的数量应该等于该Group订阅主题的分区总数。」
假设一个Consumer Group订阅了3个主题,分别是A、B、C,它们的分区数依次是1、2、3,那么通常情况下,为该Group设置6个Consumer实例是比较理想的情形,因为它能最大限度地实现高伸缩性。
老版本的Consumer Group把位移保存在ZooKeeper中。
Apache ZooKeeper是一个分布式的协调服务框架,Kafka重度依赖它实现各种各样的协调管理。
将位移保存在ZooKeeper外部系统的做法,最显而易见的好处就是减少了Kafka Broker端的状态保存开销。
不过,慢慢地发现了一个问题,即ZooKeeper这类元框架其实并不适合进行频繁的写更新,而Consumer Group的位移更新却是一个非常频繁的操作。
这种大吞吐量的写操作会极大地拖慢ZooKeeper集群的性能。
于是,在新版本的Consumer Group中,Kafka社区重新设计了Consumer Group的位移管理方式,采用了将位移保存在Kafka内部主题的方法。
这个内部主题就是__consumer_offsets。
默认,也叫轮循,说的是对于同一组消费者来说,使用轮训分配的方式,决定消费者消费的分区
对一个消费者组来说决定消费方式是以分区总数除以消费者总数来决定,一般如果不能整除,往往是从头开始将剩余的分区分配开
是在0.11.x,新增的,它和前面两个不是很一样,它是在Range上的一种升华,且前面两个当同组内有新的消费者加入或者旧的消费者退出的时候,会从新开始决定消费者消费方式,但是Sticky,在同组中有新的新的消费者加入或者旧的消费者退出时,不会直接开始新的Range分配,而是保留现有消费者原来的消费策略,将退出的消费者所消费的分区平均分配给现有消费者,新增消费者同理,同其他现存消费者的消费策略中分离
假设一个分区中有10条消息,位移分别是0到9。
某个Consumer应用已消费了5条消息,这就说明该Consumer消费了位移为0到4的5条消息,此时Consumer的位移是5,指向了下一条消息的位移。
因为Consumer能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即「Consumer需要为分配给它的每个分区提交各自的位移数据」。
「位移提交分为自动提交和手动提交;从Consumer端的角度来说,位移提交分为同步提交和异步提交」。
开启自动提交位移的方法:Consumer端有个参数enable.auto.commit,把它设置为true或者压根不设置它就可以了。
因为它的默认值就是true,即Java Consumer默认就是自动提交位移的。
如果启用了自动提交,Consumer端还有个参数:auto.commit.interval.ms。
它的默认值是5秒,表明Kafka每5秒会为你自动提交一次位移。
- Properties props = new Properties();
- props.put("bootstrap.servers", "localhost:9092");
- props.put("group.id", "test");
- props.put("enable.auto.commit", "true");
- props.put("auto.commit.interval.ms", "2000");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList("foo", "bar"));
- while (true) {
- ConsumerRecords records = consumer.poll(100);
- for (ConsumerRecord record : records)
- System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
- }
上面的第3、第4行代码,就是开启自动提交位移的方法。
开启手动提交位移的方法就是设置enable.auto.commit为false。
还需要调用相应的API手动提交位移。最简单的API就是「KafkaConsumer#commitSync()」。
该方法会提交KafkaConsumer#poll()返回的最新位移。
从名字上来看,它是一个同步操作,即该方法会一直等待,直到位移被成功提交才会返回。
如果提交过程中出现异常,该方法会将异常信息抛出。
下面这段代码展示了commitSync()的使用方法:
- while (true) {
- ConsumerRecords records =
- consumer.poll(Duration.ofSeconds(1));
- process(records); // 处理消息
- try {
- consumer.commitSync();
- } catch (CommitFailedException e) {
- handle(e); // 处理提交失败异常
- }
- }
一旦设置了enable.auto.commit为true,Kafka会保证在开始调用poll方法时,提交上次poll返回的所有消息。
从顺序上来说,poll方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。
但自动提交位移的一个问题在于,
而手动提交位移,它的好处就在于更加灵活,你完全能够把控位移提交的时机和频率。
但是,它也有一个缺陷,就是在调用commitSync()时,Consumer程序会处于阻塞状态,直到远端的Broker返回提交结果,这个状态才会结束。
鉴于这个问题,Kafka社区为手动提交位移提供了另一个API方法:
从名字上来看它就不是同步的,而是一个异步操作。
调用commitAsync()之后,它会立即返回,不会阻塞,因此不会影响Consumer应用的TPS。
由于它是异步的,Kafka提供了回调函数(callback),供你实现提交之后的逻辑,比如记录日志或处理异常等。
下面这段代码展示了调用commitAsync()的方法:
- while (true) {
- ConsumerRecords records =
- consumer.poll(Duration.ofSeconds(1));
- process(records); // 处理消息
- consumer.commitAsync((offsets, exception) -> {
- if (exception != null)
- handle(exception);
- });
- }
commitAsync的问题在于,出现问题时它不会自动重试。
显然,如果是手动提交,我们需要将commitSync和commitAsync组合使用才能到达最理想的效果,原因有两个:
我们来看一下下面这段代码,它展示的是如何将两个API方法结合使用进行手动提交。
- try {
- while(true) {
- ConsumerRecords records =
- consumer.poll(Duration.ofSeconds(1));
- process(records); // 处理消息
- commitAysnc(); // 使用异步提交规避阻塞
- }
- } catch(Exception e) {
- handle(e); // 处理异常
- } finally {
- try {
- consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
- } finally {
- consumer.close();
- }
- }
这样一个场景:你的poll方法返回的不是500条消息,而是5000条。
那么,你肯定不想把这5000条消息都处理完之后再提交位移,因为一旦中间出现差错,之前处理的全部都要重来一遍。
比如前面这个5000条消息的例子,你可能希望每处理完100条消息就提交一次位移,这样能够避免大批量的消息重新消费。
Kafka Consumer API为手动提交提供了这样的方法:commitSync(Map)和commitAsync(Map)。
它们的参数是一个Map对象,键就是TopicPartition,即消费的分区,而值是一个OffsetAndMetadata对象,保存的主要是位移数据。
在这里,我以commitAsync为例,展示一段代码,实际上,commitSync的调用方法和它是一模一样的。
- private Map offsets = new HashMap<>();
- int count = 0;
- ……
- while (true) {
- ConsumerRecords records =
- consumer.poll(Duration.ofSeconds(1));
- for (ConsumerRecord record: records) {
- process(record); // 处理消息
- offsets.put(new TopicPartition(record.topic(), record.partition()),
- new OffsetAndMetadata(record.offset() + 1);
- if(count % 100 == 0)
- consumer.commitAsync(offsets, null); // 回调处理逻辑是null
- count++;
- }
- }
程序先是创建了一个Map对象,用于保存Consumer消费处理过程中要提交的分区位移,之后开始逐条处理消息,并构造要提交的位移值。
代码的最后部分是做位移的提交。设置了一个计数器,每累计100条消息就统一提交一次位移。
与调用无参的commitAsync不同,这里调用了带Map对象参数的commitAsync进行细粒度的位移提交。
这样,这段代码就能够实现每处理100条消息就提交一次位移,不用再受poll方法返回的消息总数的限制了。
比如某个Group下有20个Consumer实例,它订阅了一个具有100个分区的Topic。
正常情况下,Kafka平均会为每个Consumer分配5个分区。这个分配的过程就叫Rebalance。
Rebalance发生时,Group下所有的Consumer实例都会协调在一起共同参与。
当前Kafka默认提供了3种分配策略,每种策略都有一定的优势和劣势,社区会不断地完善这些策略,保证提供最公平的分配策略,即每个Consumer实例都能够得到较为平均的分区数。
比如一个Group内有10个Consumer实例,要消费100个分区,理想的分配策略自然是每个实例平均得到10个分区。
这就叫公平的分配策略。
举个简单的例子来说明一下Consumer Group发生Rebalance的过程。
假设目前某个Consumer Group下有两个Consumer,比如A和B,当第三个成员C加入时,Kafka会触发Rebalance,并根据默认的分配策略重新为A、B和C分配分区
Rebalance之后的分配依然是公平的,即每个Consumer实例都获得了2个分区的消费权。
在Rebalance过程中,所有Consumer实例都会停止消费,等待Rebalance完成,这是Rebalance为人诟病的一个方面。
目前Rebalance的设计是所有Consumer实例共同参与,全部重新分配所有分区。
当Consumer Group完成Rebalance之后,每个Consumer实例都会定期地向Coordinator发送心跳请求,表明它还存活着。
如果某个Consumer实例不能及时地发送这些心跳请求,Coordinator就会认为该Consumer已经死了,从而将其从Group中移除,然后开启新一轮Rebalance。
Consumer端有个参数,叫session.timeout.ms。
该参数的默认值是10秒,即如果Coordinator在10秒之内没有收到Group下某Consumer实例的心跳,它就会认为这个Consumer实例已经挂了。
除了这个参数,Consumer还提供了一个允许你控制发送心跳请求频率的参数,就是heartbeat.interval.ms。
这个值设置得越小,Consumer实例发送心跳请求的频率就越高。
频繁地发送心跳请求会额外消耗带宽资源,但好处是能够更加快速地知晓当前是否开启Rebalance,因为,目前Coordinator通知各个Consumer实例开启Rebalance的方法,就是将REBALANCE_NEEDED标志封装进心跳请求的响应体中。
除了以上两个参数,Consumer端还有一个参数,用于控制Consumer实际消费能力对Rebalance的影响,即max.poll.interval.ms参数。
它限定了Consumer端应用程序两次调用poll方法的最大时间间隔。
它的默认值是5分钟,表示你的Consumer程序如果在5分钟之内无法消费完poll方法返回的消息,那么Consumer会主动发起离开组的请求,Coordinator也会开启新一轮Rebalance。
第一类Rebalance是因为未能及时发送心跳,导致Consumer被踢出Group而引发的
因此可以设置「session.timeout.ms和heartbeat.interval.ms」的值。
将session.timeout.ms设置成6s主要是为了让Coordinator能够更快地定位已经挂掉的Consumer。
你要为你的业务处理逻辑留下充足的时间,这样Consumer就不会因为处理这些消息的时间太长而引发Rebalance了。
它要求这个提交过程不仅要实现高持久性,还要支持高频的写操作。
__consumer_offsets主题就是普通的Kafka主题。你可以手动地创建它、修改它,甚至是删除它。
虽说__consumer_offsets主题是一个普通的Kafka主题,但「它的消息格式却是Kafka自己定义的」,用户不能修改,也就是说你不能随意地向这个主题写消息,因为一旦你写入的消息不满足Kafka规定的格式,那么Kafka内部无法成功解析,就会造成Broker的崩溃。
Kafka Consumer有API帮你提交位移,也就是向__consumer_offsets主题写消息,千万不要自己写个Producer随意向该主题发送消息。
__consumer_offsets有3种消息格式:
第2种格式它有个专属的名字:tombstone消息,即墓碑消息,也称delete mark,它的主要特点是它的消息体是null,即空消息体。
一旦某个Consumer Group下的所有Consumer实例都停止了,而且它们的位移数据都已被删除时,Kafka会向__consumer_offsets主题的对应分区写入tombstone消息,表明要彻底删除这个Group的信息。
__consumer_offsets是怎么被创建的?
通常来说,「当Kafka集群中的第一个Consumer程序启动时,Kafka会自动创建位移主题」。
目前Kafka Consumer提交位移的方式有两种:「自动提交位移和手动提交位移。」
Consumer端有个参数叫enable.auto.commit,如果值是true,则Consumer在后台默默地为你定期提交位移,提交间隔由一个专属的参数auto.commit.interval.ms来控制。
自动提交位移有一个显著的优点,就是省事,你不用操心位移提交的事情,就能保证消息消费不会丢失。
但这一点同时也是缺点,丧失了很大的灵活性和可控性,你完全没法把控Consumer端的位移管理。
Kafka Consumer API为你提供了位移提交的方法,如consumer.commitSync等。
当调用这些方法时,Kafka会向__consumer_offsets主题写入相应的消息。
如果你选择的是自动提交位移,那么就可能存在一个问题:只要Consumer一直启动着,它就会无限期地向位移主题写入消息。
假设Consumer当前消费到了某个主题的最新一条消息,位移是100,之后该主题没有任何新消息产生,故Consumer无消息可消费了,所以位移永远保持在100。
由于是自动提交位移,位移主题中会不停地写入位移=100的消息。
显然Kafka只需要保留这类消息中的最新一条就可以了,之前的消息都是可以删除的。
这就要求Kafka必须要有针对位移主题消息特点的消息删除策略,否则这种消息会越来越多,最终撑爆整个磁盘。
Kafka使用「Compact策略」来删除__consumer_offsets主题中的过期消息,避免该主题无限期膨胀。
比如对于同一个Key的两条消息M1和M2,如果M1的发送时间早于M2,那么M1就是过期消息。
Compact的过程就是扫描日志的所有消息,剔除那些过期的消息,然后把剩下的消息整理在一起。
我在这里贴一张来自官网的图片,来说明Compact过程。
图中位移为0、2和3的消息的Key都是K1,Compact之后,分区只需要保存位移为3的消息,因为它是最新发送的。
这个后台线程叫Log Cleaner。
很多实际生产环境中都出现过位移主题无限膨胀占用过多磁盘空间的问题,如果你的环境中也有这个问题,建议你去检查一下Log Cleaner线程的状态,通常都是这个线程挂掉了导致的。
根据Kafka副本机制的定义,同一个分区下的所有副本保存有相同的消息序列,这些副本分散保存在不同的Broker上,从而能够对抗部分Broker宕机带来的数据不可用。
下面展示的是一个有3台Broker的Kafka集群上的副本分布情况。
从这张图中,我们可以看到,主题1分区0的3个副本分散在3台Broker上,其他主题分区的副本也都散落在不同的Broker上,从而实现数据冗余。
在Kafka中,副本分成两类:领导者副本(Leader Replica)和追随者副本(Follower Replica)。
每个分区在创建时都要选举一个副本,称为领导者副本,其余的副本自动称为追随者副本。
在Kafka中,追随者副本是不对外提供服务的。这就是说,任何一个追随者副本都不能响应消费者和生产者的读写请求。所有的请求都必须由领导者副本来处理,或者说,所有的读写请求都必须发往领导者副本所在的Broker,由该Broker负责处理。
追随者副本不处理客户端请求,它唯一的任务就是从领导者副本「异步拉取」消息,并写入到自己的提交日志中,从而实现与领导者副本的同步。
当领导者副本挂掉了,或者说领导者副本所在的Broker宕机时,Kafka依托于ZooKeeper提供的监控功能能够实时感知到,并立即开启新一轮的领导者选举,从追随者副本中选一个作为新的领导者。老Leader副本重启回来后,只能作为追随者副本加入到集群中。
对于客户端用户而言,Kafka的追随者副本没有任何作用,Kafka为什么要这样设计呢?
这种副本机制有两个方面的好处。
所谓Read-your-writes,顾名思义就是,当你使用生产者API向Kafka成功写入消息后,马上使用消费者API去读取刚才生产的消息。
假设当前有2个追随者副本F1和F2,它们异步地拉取领导者副本数据。倘若F1拉取了Leader的最新消息而F2还未及时拉取,那么,此时如果有一个消费者先从F1读取消息之后又从F2拉取消息,它可能会看到这样的现象:第一次消费时看到的最新消息在第二次消费时不见了,这就不是单调读一致性。
但是,如果所有的读请求都是由Leader来处理,那么Kafka就很容易实现单调读一致性。
In-sync Replicas,也就是所谓的ISR副本集合。
ISR中的副本都是与Leader同步的副本,相反,不在ISR中的追随者副本就被认为是与Leader不同步的。
Leader副本天然就在ISR中。也就是说,「ISR不只是追随者副本集合,它必然包括Leader副本。甚至在某些情况下,ISR只有Leader这一个副本」。
另外,能够进入到ISR的追随者副本要满足一定的条件。
这个参数的含义是Follower副本能够落后Leader副本的最长时间间隔,当前默认值是10秒。
这就是说,只要一个Follower副本落后Leader副本的时间不连续超过10秒,那么Kafka就认为该Follower副本与Leader是同步的,即使此时Follower副本中保存的消息明显少于Leader副本中的消息。
Follower副本唯一的工作就是不断地从Leader副本拉取消息,然后写入到自己的提交日志中。
倘若该副本后面慢慢地追上了Leader的进度,那么它是能够重新被加回ISR的。
ISR是一个动态调整的集合,而非静态不变的。
Unclean领导者选举「Kafka把所有不在ISR中的存活副本都称为非同步副本」。
通常来说,非同步副本落后Leader太多,因此,如果选择这些副本作为新Leader,就可能出现数据的丢失。
毕竟,这些副本中保存的消息远远落后于老Leader中的消息。
在Kafka中,选举这种副本的过程称为Unclean领导者选举。
开启Unclean领导者选举可能会造成数据丢失,但好处是,它使得分区Leader副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止Unclean领导者选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。
对于kafka集群中对于任意的topic的分区以及副本leader的设定,都需要考虑到集群整体的负载能力的平衡性,会尽量分配每一个partition的副本leader在不同的broker中,这样会避免多个leader在同一个broker,导致集群中的broker负载不平衡
kafka引入了优先副本的概念,优先副本的意思在AR(分区中的所有副本)集合列表中的第一个副本,在理想状态下该副本就是该分区的leader副本
例如kafka集群由3台broker组成,创建了一个名为topic-partitions的topic,设置partition为3,副本数为3,partition0中AR列表为 [1,2,0],那么分区0的优先副本为1
kafka使用多副本机制提高可靠性,但是只有leader副本对外提供读写服务,follow副本只是做消息同步。
比如我们在包含3个broker节点的kafka集群上创建一个分区数为3,副本因子为3的主题topic-partitions时,leader副本会均匀的分布在3台broker节点上。
我们可以把leader副本所在的节点叫作分区的leader节点,把follower副本所在的节点叫作follower节点。
在上面的例子中,分区0的leader节点是broker1,分区1的leader节点是broker2,分区2的leader节点是broker0。
当分区leader节点发生故障时,其中的一个follower节点就会选举为新的leader节点。
当原来leader的节点恢复之后,它只能成为一个follower节点,此时就导致了集群负载不均衡。
比如分区1的leader节点broker2崩溃了,此时选举了在broker1上的分区1follower节点作为新的leader节点。
当broker2重新恢复时,此时的kafka集群状态如下:
可以看到,此时broker1上负载更大,而broker2上没有负载。
比如上面的分区1,它的AR集合是[2,0,1],表示分区1的优先副本就是在broker2上。
理想情况下,优先副本应该就是leader副本,kafka保证了优先副本的均衡分布,而这与broker节点宕机与否没有关系。
「优先副本选举就是对分区leader副本进行选举的时候,尽可能让优先副本成为leader副本」,针对上述的情况,只要再触发一次优先副本选举就能保证分区负载均衡。
kafka支持自动优先副本选举功能,默认每5分钟触发一次优先副本选举操作。
Broker 中有个Acceptor(mainReactor)监听新连接的到来,与新连接建连之后轮询选择一个Processor(subReactor)管理这个连接。
而Processor会监听其管理的连接,当事件到达之后,读取封装成Request,并将Request放入共享请求队列中。
然后IO线程池不断的从该队列中取出请求,执行真正的处理。处理完之后将响应发送到对应的Processor的响应队列中,然后由Processor将Response返还给客户端。
每个listener只有一个Acceptor线程,因为它只是作为新连接建连再分发,没有过多的逻辑,很轻量。
Processor 在Kafka中称之为网络线程,默认网络线程池有3个线程,对应的参数是num.network.threads,并且可以根据实际的业务动态增减。
还有个 IO 线程池,即KafkaRequestHandlerPool,执行真正的处理,对应的参数是num.io.threads,默认值是 8。
IO线程处理完之后会将Response放入对应的Processor中,由Processor将响应返还给客户端。
可以看到网络线程和IO线程之间利用的经典的生产者 - 消费者模式,不论是用于处理Request的共享请求队列,还是IO处理完返回的Response。
在Kafka中,Producer默认不是幂等性的,但我们可以创建幂等性Producer。
它其实是0.11.0.0版本引入的新功能,在此之前,Kafka向分区发送数据时,可能会出现同一条消息被发送了多次,导致消息重复的情况。
在0.11之后,指定Producer幂等性的方法很简单,仅需要设置一个参数即可,即
- props.put(“enable.idempotence”, ture),
- 或props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。
enable.idempotence被设置成true后,Producer自动升级成幂等性Producer,其他所有的代码逻辑都不需要改变。
Kafka自动帮你做消息的重复去重。
底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在Broker端多保存一些字段。
当Producer发送了具有相同字段值的消息后,Broker能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们丢弃掉。
首先,它只能保证单分区上的幂等性,即一个幂等性Producer能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。
其次,它只能实现单会话上的幂等性,不能实现跨会话的幂等性。
这里的会话,你可以理解为Producer进程的一次运行,当你重启了Producer进程之后,这种幂等性保证就丧失了。
Kafka自0.11版本开始也提供了对事务的支持,目前主要是在read committed隔离级别上做事情。
它能保证多条消息原子性地写入到目标分区,同时也能保证Consumer只能看到事务成功提交的消息。
事务型Producer能够保证将消息原子性地写入到多个分区中。
这批消息要么全部写入成功,要么全部失败,另外,事务型Producer也不惧进程的重启。
Producer重启回来后,Kafka依然保证它们发送消息的精确一次处理。
设置事务型Producer的方法也很简单,满足两个要求即可:
此外,你还需要在Producer代码中做一些调整,如这段代码所示:
- producer.initTransactions();
- try {
- producer.beginTransaction();
- producer.send(record1);
- producer.send(record2);
- producer.commitTransaction();
- } catch (KafkaException e) {
- producer.abortTransaction();
- }
和普通Producer代码相比,事务型Producer的显著特点是调用了一些事务API,如initTransaction、beginTransaction、commitTransaction和abortTransaction,它们分别对应事务的初始化、事务开始、事务提交以及事务终止。
这段代码能够保证Record1和Record2被当作一个事务统一提交到Kafka,要么它们全部提交成功,要么全部写入失败。
实际上即使写入失败,Kafka也会把它们写入到底层的日志中,也就是说Consumer还是会看到这些消息。
有一个isolation.level参数,这个参数有两个取值:
标题名称:一篇带给你Kafka核心知识总结!
当前链接:http://www.shufengxianlan.com/qtweb/news19/18219.html
网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联