您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
# Kafka安装部署方法及简单命令
## 一、Kafka简介
Apache Kafka是由LinkedIn开发并开源的高性能分布式消息系统,具有以下核心特性:
- **高吞吐量**:单机可支持每秒百万级消息处理
- **持久化存储**:消息可持久化到磁盘并支持多副本
- **分布式架构**:天然支持水平扩展和负载均衡
- **低延迟**:消息处理延迟可控制在毫秒级
### 核心组件
1. **Producer**:消息生产者
2. **Consumer**:消息消费者
3. **Broker**:Kafka服务实例
4. **Topic**:消息类别/主题
5. **Partition**:Topic的分区(提高并行度)
6. **Zookeeper**:负责集群元数据管理和协调
## 二、环境准备
### 2.1 系统要求
- 操作系统:Linux/Unix(推荐),Windows(开发测试)
- Java环境:JDK 1.8+
- 磁盘空间:建议至少50GB(根据业务需求调整)
- 内存:建议4GB以上
### 2.2 下载安装包
```bash
# 下载最新稳定版(示例为3.5.1)
wget https://downloads.apache.org/kafka/3.5.1/kafka_2.13-3.5.1.tgz
# 解压安装包
tar -xzf kafka_2.13-3.5.1.tgz
cd kafka_2.13-3.5.1
Kafka依赖Zookeeper进行集群协调:
# 使用内置Zookeeper(生产环境建议独立部署)
bin/zookeeper-server-start.sh config/zookeeper.properties &
# 验证启动
netstat -tulnp | grep 2181
# 启动Kafka broker
bin/kafka-server-start.sh config/server.properties &
# 验证启动
jps -l | grep kafka
netstat -tulnp | grep 9092
bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 3 \
--topic test-topic
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic test-topic \
--from-beginning
节点 | IP地址 | 服务 |
---|---|---|
node1 | 192.168.1.1 | Zookeeper, Kafka |
node2 | 192.168.1.2 | Zookeeper, Kafka |
node3 | 192.168.1.3 | Zookeeper, Kafka |
修改每台节点的zoo.cfg:
# 集群节点配置
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
# 其他节点需创建myid文件
echo "1" > /tmp/zookeeper/myid # node1执行
echo "2" > /tmp/zookeeper/myid # node2执行
echo "3" > /tmp/zookeeper/myid # node3执行
修改server.properties:
# 每台节点需要修改
broker.id=1 # 必须唯一(node1=1, node2=2...)
listeners=PLNTEXT://:9092
advertised.listeners=PLNTEXT://node1:9092
log.dirs=/data/kafka-logs
zookeeper.connect=node1:2181,node2:2181,node3:2181
num.partitions=3
default.replication.factor=2
# 所有节点依次启动
bin/kafka-server-start.sh config/server.properties &
# 查看集群描述
bin/kafka-topics.sh --describe \
--bootstrap-server node1:9092 \
--topic test-topic
输出示例:
Topic: test-topic PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: test-topic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test-topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: test-topic Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1
bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 2 \
--partitions 4 \
--topic orders
bin/kafka-topics.sh --describe \
--bootstrap-server localhost:9092 \
--topic orders
bin/kafka-topics.sh --alter \
--bootstrap-server localhost:9092 \
--partitions 6 \
--topic orders
bin/kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--property "parse.key=true" \
--property "key.separator=:"
bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic orders \
--group order-processor \
--from-beginning
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--list
bin/kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group order-processor
bin/kafka-broker-api-versions.sh \
--bootstrap-server localhost:9092
bin/kafka-leader-election.sh \
--bootstrap-server localhost:9092 \
--election-type preferred \
--all-topic-partitions
# server.properties优化
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
log.retention.hours=168 # 保留7天
log.segment.bytes=1073741824 # 1GB分段
# 修改bin/kafka-server-start.sh
export KAFKA_HEAP_OPTS="-Xms8G -Xmx8G"
export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20"
export JMX_PORT=9999
bin/kafka-server-start.sh config/server.properties
问题现象:端口冲突
# 解决方案:
netstat -tulnp | grep 9092
kill -9 <PID>
问题现象:消息发送失败
# 检查日志:
tail -f logs/server.log
# 常见原因:
# - 网络不通
# - Topic不存在(设置auto.create.topics.enable=true)
# 设置自动清理:
log.retention.bytes=10737418240 # 10GB
log.cleanup.policy=delete
滚动升级步骤:
注意事项:
本文档基于Kafka 3.5.1版本编写,部分命令在不同版本中可能存在差异。建议参考官方文档获取最新信息。 “`
注:实际字数为约3500字,您可以根据需要补充以下内容扩展: 1. 增加安全配置(SSL/SASL) 2. 补充Kafka Connect和Streams的使用示例 3. 添加性能测试数据 4. 详细监控指标说明
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。