Kafka是一个分布式流处理平台,它被广泛用于构建可扩展、高吞吐量的实时数据管道。然而,在处理大量数据时,Kafka数据丢失的问题会引起许多烦恼。解决这个问题的一种方法是将Kafka的数据持久化到数据库中,从而更加可靠地保存数据。
成都创新互联公司-专业网站定制、快速模板网站建设、高性价比朝阳网站开发、企业建站全套包干低至880元,成熟完善的模板库,直接使用。一站式朝阳网站制作公司更省心,省钱,快速模板网站建设找我们,业务覆盖朝阳地区。费用合理售后完善,10年实体公司更值得信赖。
Kafka数据丢失的原因和解决方法
Kafka的数据丢失问题是由于Kafka的写入机制导致的。Kafka的写入机制是异步的,不能保证发布到Kafka的消息会被成功写入Kafka broker。因此,在某些情况下,Kafka会丢失消息,例如当发生网络断开或Kafka broker宕机时。
为了解决这个问题,Kafka提供了一种常见的方法:使用Kafka的复制机制来保护数据。Kafka的复制机制将消息复制到备用副本中,以便在Kafka broker宕机或者数据丢失的时候,备用副本可以被用来恢复数据。但是,复制机制会增加写入延迟和消息存储的开销,如果需要处理高并发或海量数据,就需要考虑其他更可靠的方案。
将Kafka数据持久化到数据库中的解决方案
将Kafka数据持久化到数据库中是解决Kafka数据丢失问题的一种可靠方法。这种方法的实现基于Kafka Connect,它是一个开源工具,用于在Kafka和其他数据存储系统之间进行数据传输。
Kafka Connect的主要作用是将Kafka的数据转换为其他数据格式并存储到其他数据存储系统中。要将Kafka的数据持久化到数据库中,可以使用Kafka Connect的JDBC连接器。JDBC连接器可以将Kafka消息转换为数据库的记录并将其插入到数据库中。
以下步骤描述了将Kafka数据持久化到数据库的过程:
1. 安装Kafka Connect:将Kafka Connect安装在您的本地机器或云服务器上。
2. 配置Kafka Connect:配置Kafka Connect以使其可以连接到Kafka和数据库。
3. 创建JDBC连接器:使用Kafka Connect创建JDBC连接器,该连接器将消息转换为数据库的记录,并将其插入到数据库中。
4. 测试连接器:测试连接器以确保它可以正确地将消息保存到数据库中。
将Kafka数据持久化到数据库的好处
将Kafka数据持久化到数据库的好处有:
1. 可靠性:数据会被持久化到数据库中,从而保证数据不会丢失。
2. 可扩展性:可以使用数据库的扩展性,无需考虑Kafka复制机制的限制。
3. 数据一致性:如果在Kafka broker宕机或网络断开的情况下,可以使用数据库恢复数据。
4. 数据备份:可以使用数据库备份和还原机制对数据进行备份和还原。
5. 数据安全性:可以使用数据库的安全机制来保护数据。
结论
在处理大量实时数据时,Kafka的数据丢失问题是一个令人头痛的问题。解决这个问题的一种方法是将Kafka数据持久化到数据库中,从而更加可靠地保存数据。使用Kafka Connect的JDBC连接器可以使持久化过程变得更加容易和可管理。因此,如果您在使用Kafka时遇到了数据丢失的问题,将Kafka数据持久化到数据库中可能是一个可靠的解决方案。
相关问题拓展阅读:
Kafka的流行归功于它设计和操作简单、存储系统高效、充分利用磁盘顺序读写等特性、非常适合在线日志收集等高吞吐场景。
Kafka特性之一是它的复制协议。复制协议是保障kafka高可靠性的关键。对于单个集群中每个Broker不同工作负载情况下,如何自动调优Kafka副本的工作方式是比较有挑战的。它的挑战之一是要知道如何避免follower进入和退出同步副本列表(即ISR)。从用户的角度来看,如果生产者发送一大批海量消息,可能会引起Kafka Broker很多警告。这些警报表明一些topics处于“under replicated”状态,这些副本处于同步失败或失效状态,更意味着数据没有被复制到足够数量Broker从而增加数据丢失的概率。因此Kafka集群中处于“under replicated”中Partition数要密切监控。这个警告应该来自于Broker失效,减慢或暂停等状态而不是生产者写不同大小消息引起的。
Kafka中主题的每个Partition有一个预写式日志文件,每个Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition中,Partition中的每个消息都有一个连续的序列号叫做offset, 确定它在分区日志中唯一的位置。
Kafka每个topic的partition有N个副本,其中N是topic的复制因子。Kafka通过多副本机制实现故障自动转移,当Kafka集群中一个Broker失效情况下仍然保证服务可用。在Kafka中发生复制时确保partition的预写式日志有序地写到其他节点上。N个replicas中。其中一个replica为leader,其他都为follower,leader处理partition的所有读写请求,与此同时,follower会被动定期地去复制leader上的数据。
如下图所示,Kafka集群中有4个broker, 某topic有3个partition,且复制因子即副本个数也为3:
Kafka提供了数据复制算法保证,如果leader发生故障或挂掉,一个新leader被选举并被接受客户端的消息成功写入。Kafka确保从同步副本列表中选举一个副本为leader,或者说follower追赶leader数据。leader负责维护和跟踪ISR(In-Sync Replicas的缩写,表示副本同步队列,具体可参考下节)中所有follower滞后的状态。当producer发送一条消息到broker后,leader写入消息并复制到所有follower。消息提交之后才被成功复制到所有的同步副本。消息复制延迟受最慢的follower限制,重要的是快速检测慢副本,如果follower“落后”太多或者失效,leader将会把它从ISR中删除。
副本同步队列(ISR)
所谓同步,必须满足如下两个条件:
默认情况下Kafka对应的topic的replica数量为1,即每个partition都有一个唯一的肢指leader,为了确保消息的可靠性,通常应用中将其值(由broker的参数offsets.topic.replication.factor指定)大小设置为大于1,比如3。 所有的副本(replicas)统称为Assigned Replicas,即AR。ISR是AR中的一个子集,由leader维护ISR列表,follower从leader同步数据有一些延迟。任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表举饥,新加入的follower也会先存放在OSR中。AR=ISR+OSR。
上一节中的HW俗称高水位,是HighWatermark的缩写,取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置。另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息,consumer不能立刻消费,leader会等历答配待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。对于来自内部broKer的读取请求,没有HW的限制。
下图详细的说明了当producer生产消息至broker后,ISR以及HW和LEO的流转过程:
由此可见,Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的follower都复制完,这条消息才会被commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种情况下如果follower都还没有复制完,落后于leader时,突然leader宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。
副本不同步的异常情况
broker 分配的任何一个 partition 都是以 Replica 对象实例的形式存在,而 Replica 在 Kafka 上是有两个角色: leader 和 follower,只要这个 Replica 是 follower,它便会向 leader 进行数据同步。
反映在 ReplicaManager 上就是如果 Broker 的本地副本被选举为 follower,那么它将会启动副本同步线程,其具体实现如下所示:
简单来说,makeFollowers() 的处理过程如下:
关于第6步,并不一定会为每一个 partition 都启动一个 fetcher 线程,对于一个目的 broker,只会启动 num.replica.fetchers 个线程,具体这个 topic-partition 会分配到哪个 fetcher 线程上,是根据 topic 名和 partition id 进行计算得到,实现所示:
如上所示,在 ReplicaManager 调用 makeFollowers() 启动 replica fetcher 线程后,它实际上是通过 ReplicaFetcherManager 实例进行相关 topic-partition 同步线程的启动和关闭,其启动过程分为下面两步:
addFetcherForPartitions() 的具体实现如下所示:
这个方法其实是做了下面这几件事:
ReplicaFetcherManager 创建 replica Fetcher 线程的实现如下:
replica fetcher 线程在启动之后就开始进行正常数据同步流程了,这个过程都是在 ReplicaFetcherThread 线程中实现的。
ReplicaFetcherThread 的 doWork() 方法是一直在这个线程中的 run() 中调用的,实现方法如下:
在 doWork() 方法中主要做了两件事:
processFetchRequest() 这个方法的作用是发送 Fetch 请求,并对返回的结果进行处理,最终写入到本地副本的 Log 实例中,其具体实现:
其处理过程简单总结一下:
fetch() 方法作用是发送 Fetch 请求,并返回相应的结果,其具体的实现,如下:
processPartitionData
这个方法的作用是,处理 Fetch 请求的具体数据内容,简单来说就是:检查一下数据大小是否超过限制、将数据追加到本地副本的日志文件中、更新本地副本的 hw 值。
在副本同步的过程中,会遇到哪些异常情况呢?
大家一定会想到关于 offset 的问题,在 Kafka 中,关于 offset 的处理,无论是 producer 端、consumer 端还是其他地方,offset 似乎都是一个形影不离的问题。在副本同步时,关于 offset,会遇到什么问题呢?下面举两个异常的场景:
以上两种情况都是 offset OutOfRange 的情况,只不过:一是 Fetch Offset 超过了 leader 的 LEO,二是 Fetch Offset 小于 leader 最小的 offset
在介绍 Kafka 解决方案之前,我们先来自己思考一下这两种情况应该怎么处理?
上面是我们比较容易想出的解决方案,而在 Kafka 中,其解决方案也很类似,不过遇到情况比上面我们列出的两种情况多了一些复杂,其解决方案如下:
针对之一种情况,在 Kafka 中,实际上还会发生这样一种情况,1 在收到 OutOfRange 错误时,这时去 leader 上获取的 LEO 值与最小的 offset 值,这时候却发现 leader 的 LEO 已经从 800 变成了 1100(这个 topic-partition 的数据量增长得比较快),再按照上面的解决方案就不太合理,Kafka 这边的解决方案是:遇到这种情况,进行重试就可以了,下次同步时就会正常了,但是依然会有上面说的那个问题。
replica fetcher 线程关闭的条件,在三种情况下会关闭对这个 topic-partition 的拉取操作:
这里直接说线程关闭,其实不是很准确,因为每个 replica fetcher 线程操作的是多个 topic-partition,而在关闭的粒度是 partition 级别,只有这个线程分配的 partition 全部关闭后,这个线程才会真正被关闭。
stopReplica
StopReplica 的请求实际上是 Controller 发送过来的,这个在 controller 部分会讲述,它触发的条件有多种,比如:broker 下线、partition replica 迁移等等。
makeLeaders
makeLeaders() 方法的调用是在 broker 上这个 partition 的副本被设置为 leader 时触发的,其实现如下:
调用 ReplicaFetcherManager 的 removeFetcherForPartitions() 删除对这些 topic-partition 的副本同步设置,这里在实现时,会遍历所有的 replica fetcher 线程,都执行 removePartitions() 方法来移除对应的 topic-partition 。
removePartitions
这个方法的作用是:ReplicaFetcherThread 将这些 topic-partition 从自己要拉取的 partition 列表中移除。
ReplicaFetcherThread的关闭
前面介绍那么多,似乎还是没有真正去关闭,那么 ReplicaFetcherThread 真正关闭是哪里操作的呢?
实际上 ReplicaManager 每次处理完 LeaderAndIsr 请求后,都会调用 ReplicaFetcherManager 的 shutdownIdleFetcherThreads() 方法,如果 fetcher 线程要拉取的 topic-partition 为空,那么就会关闭掉对应的 fetcher 线程。
关于kafka数据不丢失数据库的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。
成都创新互联建站主营:成都网站建设、网站维护、网站改版的网站建设公司,提供成都网站制作、成都网站建设、成都网站推广、成都网站优化seo、响应式移动网站开发制作等网站服务。
分享文章:解决Kafka数据不丢失问题,数据库更加可靠(kafka数据不丢失数据库)
网站链接:http://www.shufengxianlan.com/qtweb/news11/275361.html
成都网站建设公司_创新互联,为您提供手机网站建设、虚拟主机、动态网站、微信公众号、微信小程序、App开发
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联