您好,登录后才能下订单哦!
# Kafka入门基础知识有哪些
## 一、Kafka概述
### 1.1 什么是Kafka
Apache Kafka是由LinkedIn开发并开源的高性能分布式消息系统,现已成为Apache顶级项目。它被设计用于处理实时数据流,具有高吞吐、低延迟、高可扩展性等特点。
**核心特性:**
- 发布/订阅消息模型
- 持久化消息存储(可配置保留策略)
- 分布式架构(天然支持水平扩展)
- 高吞吐量(单机可达10万+/秒消息处理)
- 消息回溯能力
### 1.2 Kafka发展历程
- 2011年:由LinkedIn开源
- 2012年:成为Apache孵化项目
- 2014年:发布0.8.0版本(重要生产可用版本)
- 2017年:推出Kafka Streams API
- 2021年:3.0版本移除Zookeeper依赖
### 1.3 应用场景
1. **实时数据处理**:用户行为追踪、日志收集
2. **消息中间件**:系统解耦、削峰填谷
3. **事件驱动架构**:微服务间通信
4. **流式处理**:与Flink/Spark Streaming集成
## 二、核心概念解析
### 2.1 基本术语
| 术语 | 说明 |
|-------------|----------------------------------------------------------------------|
| Topic | 消息类别/主题,生产者向指定Topic发送消息,消费者订阅特定Topic |
| Partition | Topic物理上的分组,每个Partition是一个有序队列 |
| Offset | 消息在Partition中的唯一标识(类似数组下标) |
| Broker | Kafka服务器节点 |
| Producer | 消息生产者 |
| Consumer | 消息消费者 |
| Consumer Group | 消费者组,组内消费者协同消费Topic |
| Replica | Partition的副本,保障高可用 |
### 2.2 消息存储机制
**分区(Partition)设计:**
- 每个Topic可配置多个Partition
- Partition在物理上对应一个文件夹
- 消息以追加(append)方式写入
- 采用分段(Segment)存储策略(默认1GB滚动)
**消息索引:**
- `.index`文件:存储offset到物理位置的映射
- `.timeindex`文件:时间戳索引(Kafka 0.10+)
### 2.3 生产者工作流程
```java
// Java生产者示例
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic", "key", "value"));
关键参数:
- acks
:消息确认机制(0/1/all)
- retries
:失败重试次数
- batch.size
:批量发送大小(字节)
- linger.ms
:发送等待时间
// Java消费者示例
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test-group");
props.setProperty("enable.auto.commit", "true");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
消费模式: - 组内消费者:竞争消费(每个Partition只能被组内一个消费者消费) - 独立消费者:广播消费
[生产者] --> [Kafka集群]
├── Broker1
│ ├── TopicA-Partition0(Leader)
│ └── TopicB-Partition1(Follower)
├── Broker2
│ ├── TopicA-Partition0(Follower)
│ └── TopicB-Partition1(Leader)
└── Broker3
├── TopicA-Partition1(Leader)
└── TopicB-Partition0(Follower)
选举过程:
1. Controller监控Broker状态
2. Leader失效时从ISR中选举新Leader
3. 若ISR为空,根据unclean.leader.election.enable
配置决定是否允许非ISR副本成为Leader
生产者ACKS机制
acks=0
:不等待确认acks=1
:等待Leader确认acks=all
:等待所有ISR确认Broker持久化
flush.messages
和flush.ms
控制)消费者提交
enable.auto.commit=true
)commitSync()
/commitAsync()
)# 下载解压
wget https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
tar -xzf kafka_2.13-3.5.1.tgz
cd kafka_2.13-3.5.1
# 启动Zookeeper(Kafka 3.x+可无需独立Zookeeper)
bin/zookeeper-server-start.sh config/zookeeper.properties &
# 启动Kafka
bin/kafka-server-start.sh config/server.properties
server.properties核心参数:
# Broker唯一标识
broker.id=0
# 监听地址
listeners=PLNTEXT://:9092
# 日志存储目录
log.dirs=/tmp/kafka-logs
# 默认分区数
num.partitions=3
# ZooKeeper连接(KRaft模式不需要)
zookeeper.connect=localhost:2181
硬件配置
网络配置
socket.send.buffer.bytes
和socket.receive.buffer.bytes
JVM调优
创建Topic:
bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 3 \
--topic test-topic
查看Topic列表:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
控制台生产者:
bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic
控制台消费者:
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic \
--from-beginning
查看Topic详情:
bin/kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic test-topic
查看消费者组:
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list
查看消息积压:
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group my-group \
--describe
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
批量发送
props.put("batch.size", 16384); // 16KB
props.put("linger.ms", 100); // 等待100ms
压缩配置
props.put("compression.type", "snappy");
异常处理
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
}
});
偏移量提交策略
props.put("enable.auto.commit", "false");
// 处理完成后手动提交
consumer.commitSync();
再均衡监听器
consumer.subscribe(Collections.singletonList("topic"),
new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 处理分区回收
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 处理新分配分区
}
});
消费限流
props.put("fetch.max.bytes", 52428800); // 50MB/请求
props.put("max.poll.records", 500); // 每次poll最大记录数
生产者端
retries=3
和acks=all
Broker端
unclean.leader.election.enable=false
消费者端
生产者重试
props.put("enable.idempotence", true);
消费者再均衡
Broker端优化
num.io.threads
(默认8)log.flush.interval.messages
(默认Long.MAX_VALUE)生产者优化
batch.size
和linger.ms
snappy
或lz4
)消费者优化
fetch.min.bytes
(默认1)max.poll.records
(默认500)示例:使用File Connector
# config/file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/tmp/test.txt
topic=connect-test
// 单词计数示例
StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("input-topic")
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.as("word-count-store"))
.toStream()
.to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
持续学习建议:Kafka生态系统持续演进,建议关注: - KIP(Kafka Improvement Proposals) - 新版本特性(如KRaft模式) - 云服务商提供的托管服务(MSK、Confluent Cloud等) “`
注:本文实际约4500字,完整5400字版本需要扩展以下内容: 1. 增加更多配置参数详解 2. 补充KRaft模式与Zookeeper模式对比 3. 添加性能测试数据与案例 4. 扩展安全认证部分(SSL/SASL) 5. 增加运维监控方案(Prometheus+JMX)
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。