Kafka中怎么实现日志存储

发布时间:2021-06-18 17:32:25 作者:Leah
来源:亿速云 阅读:195

Kafka中怎么实现日志存储,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

1. 日志存储格式

        最新版本的kafka日志是以批为单位进行日志存储的,所谓的批指的是kafka会将多条日志压缩到同一个batch中,然后以batch为单位进行后续的诸如索引的创建和消息的查询等工作。对于每个批次而言,其默认大小为4KB,并且保存了整个批次的起始位移和时间戳等元数据信息,而对于每条消息而言,其位移和时间戳等元数据存储的则是相对于整个批次的元数据的增量,通过这种方式,kafka能够减少每条消息中数据占用的磁盘空间。这里我们首先展示一下每个批次的数据格式:

Kafka中怎么实现日志存储

        图中消息批次的每个元数据都有固定的长度大小,而只有最后面的消息个数的是可变的。如下是batch中主要的属性的含义:

        通过上面的介绍可以看出,每个batch的头部数据中占用的字节数固定为61个字节,可变部分主要是与具体的消息有关,下面我们来看一下batch中每条消息的格式:

Kafka中怎么实现日志存储

        这里的消息的头部数据就与batch的大不相同,可以看到,其大部分数据的长度都是可变的。既然是可变的,这里我们需要强调两个问题:

        kafka使用这种编码方式的优点在于,大部分的数据增量都是非常小的数字,因此使用一个字节即可保存,这比直接使用原始类型的数据要节约大概七倍的内存。

        对于上面的每条消息的格式,除了消息key和value相关的字段,其还有属性字段和header,属性字段的主要作用是存储当前消息key和value的压缩方式,而header则供给用户进行添加一些动态的属性,从而实现一些定制化的工作。        通过对kafka消息日志的存储格式我们可以看出,其使用batch的方式将一些公共信息进行提取,从而保证其只需要存储一份,虽然看起来每个batch的头部信息比较多,但其平摊到每条消息上之后使用的字节更少了;在消息层面,kafka使用了数据增量的方式和Zig-Zag编码方式对数据进行的压缩,从而极大地减少其占用的字节数。总体而言,这种存储方式极大的减少了kafka占用的磁盘空间大小。

2. 日志存储方式

        在使用kafka时,消息都是推送到某个topic中,然后由producer计算当前消息会发送到哪个partition,在partition中,kafka会为每条消息设置一个偏移量,也就是说,如果要唯一定位一条消息,使用<topic, partition, offset>三元组即可。基于kafka的架构模式,其会将各个分区平均分配到每个broker中,也就是说每个broker会被分配用来提供一个或多个分区的日志存储服务。在broker服务器上,kafka的日志也是按照partition进行存储的,其会在指定的日志存储目录中为每个topic的partition分别创建一个目录,目录中存储的就是这些分区的日志数据,而目录的名称则会以<topic-patition>的格式进行创建。如下是kafka日志的存储目录示意图:

Kafka中怎么实现日志存储

        这里我们需要注意的是,图中对于分区日志的存储,当前broker只会存储分配给其的分区的日志,比如图中的connect-status就只有分区1和分区4的目录,而没有分区2和分区3的目录,这是因为这些分区被分配在了集群的其他节点上。在每个分区日志目录中,存在有三种类型的日志文件,即后缀分别为log、index和timeindex的文件。其中log文件就是真正存储消息日志的文件,index文件存储的是消息的位移索引数据,而timeindex文件则存储的是时间索引数据。如下图所示为一个分区的消息日志数据:

Kafka中怎么实现日志存储

        从图中可以看出,每种类型的日志文件都是分段的,这里关于分段的规则主要有如下几点需要说明:

3. 索引文件

        kafka主要有两种类型的索引文件:位移索引文件和时间戳索引文件。位移索引文件中存储的是消息的位移与该位移所对应的消息的物理地址;时间戳索引文件中则存储的是消息的时间戳与该消息的位移值。也就是说,如果需要通过时间戳查询消息记录,那么其首先会通过时间戳索引文件查询该时间戳对应的位移值,然后通过位移值在位移索引文件中查询消息具体的物理地址。关于位移索引文件,这里有两点需要说明:

        关于时间戳索引文件,由于时间戳的变化比位移的变化幅度要大一些,其即使采用了增量的方式存储时间戳索引,但也没法有效地使用Zig-Zag方式对数据进行编码,因而时间戳索引文件是直接存储的消息的时间戳数据,但是对于时间戳索引文件中存储的位移数据,由于其变化幅度不大,因而其还是使用相对位移的方式进行的存储,并且这种存储方式也可以直接映射到位移索引文件中而无需进行计算。如下图所示为时间戳索引文件的格式图:

Kafka中怎么实现日志存储

        如下则是时间戳索引文件的一个存储示例:

Kafka中怎么实现日志存储

        可以看到,如果需要通过时间戳来定位消息,就需要首先在时间戳索引文件中定位到具体的位移,然后通过位移在位移索引文件中定位到消息的具体物理地址。

4. 日志压缩

        所谓的日志压缩功能,其主要是针对这样的场景的,比如对某个用户的邮箱数据进行修改,其总共修改了三次,修改过程如下:

email=john@gmail.com
email=john@yahoo.com.cn
email=john@163.com

        在这么进行修改之后,很明显,我们主要需要关心的是最后一次修改,因为其是最终数据记录,但是如果我们按顺序处理上述消息,则需要处理三次消息。kafka的日志压缩就是为了解决这个问题而存在的,对于使用相同key的消息,其会只保留最新的一条消息的记录,而中间过程的消息都会被kafka cleaner给清理掉。但是需要注意的是,kafka并不会清理当前处于活跃状态的日志文件中的消息记录。所谓当前处于活跃状态的日志文件,也就是当前正在写入数据的日志文件。如下图所示为一个kafka进行日志压缩的示例图:

Kafka中怎么实现日志存储

        图中K1的数据有V1、V3和V4,经过压缩之后只有V4保留了下来,K2的数据则有V2、V6和V10,压缩之后也只有V10保留了下来;同理可推断其他的Key的数据。另外需要注意的是,kafka开启日志压缩使用的是log.cleanup.policy,其默认值为delete,也即我们正常使用的策略,如果将其设置为compaction,则开启了日志压缩策略,但是需要注意的是,开启了日志压缩策略并不代表kafka会清理历史数据,只有将log.cleaner.enable设置为true才会定时清理历史数据。

        在kafka中,其本身也在使用日志压缩策略,主要体现在kafka消息的偏移量存储。在旧版本中,kafka将每个consumer分组当前消费的偏移量信息保存在zookeeper中,但是由于zookeeper是一款分布式协调工具,其对于读操作具有非常高的性能,但是对于写操作性能比较低,而consumer的位移提交动作是非常频繁的,这势必会导致zookeeper称为kafka消息消费的瓶颈。因而在最新版本中,kafka将分组消费的位移数据存储在了一个特殊的topic中,即__consumer_offsets,由于每个分组group的位移信息都会提交到该topic,因而kafka默认为其设置了非常多的分区,也即50个分区。另外,consumer在提交位移时,使用的key为groupId+topic+partition,而值则为当前提交的位移,也就是说,对于每一个分组所消费的topic的partition,其都只会保留最新的位移。如果consumer需要读取位移,那么只需要按照上述格式组装key,然后在该topic中读取最新的消息数据即可。

关于Kafka中怎么实现日志存储问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注亿速云行业资讯频道了解更多相关知识。

推荐阅读:
  1. windows下如何实现kafka+ELK的日志系统
  2. logstash将Kafka中的日志数据订阅到HDFS

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

kafka

上一篇:如何将Python远控隐藏在文档图片中

下一篇:python清洗文件中数据的方法

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》