Kafka 在 Ubuntu 上的数据迁移操作指南
一、迁移场景与总体方案
- 同一集群内 Broker 间数据/副本重分配:使用 kafka-reassign-partitions.sh 生成并执行再均衡计划,适合新增 Broker、磁盘扩容、下线节点、打散热点等场景。该过程会复制副本并在副本同步完成后清理旧日志。操作前务必设置限流,避免影响线上业务。扩容仅能增加分区数,历史数据不会自动重分布。
- 跨集群迁移(同版本或跨版本、IDC/云上):使用 MirrorMaker 2(MM2) 进行全量与增量同步,支持 Topic 配置、分区数、副本数、ACL 等元数据同步;若仅需消息复制且版本较老,可用 MirrorMaker 1。适用于版本升级、云迁移、多活/灾备等。
- 带 ETL 或复杂转换的迁移:基于 Flink/Kafka Connect 自行编写同步任务,可在同步过程中做过滤、清洗、格式转换等,灵活但需一定开发量。
二、同一集群内迁移步骤(Broker 间副本重分配)
- 步骤 1 准备与检查
- 确认目标 Broker ID 已加入集群并健康;记录待迁移 Topic/Partition 现状。
- 规划目标副本布局(每个 Partition 的 replica 列表),避免将副本集中到同一台机器或同一磁盘。
- 步骤 2 生成迁移计划
- 方式 A(按 Topic 批量生成):创建 topics-to-move.json
{
“topics”: [{“topic”: “your_topic”}],
“version”: 1
}
生成计划:
./bin/kafka-reassign-partitions.sh --bootstrap-server broker:9092
–topics-to-move-json-file topics-to-move.json
–broker-list “<target_broker_ids>” --generate
- 方式 B(精确控制每个分区):直接编写 reassignment.json,显式指定每个 Partition 的 replicas。
- 步骤 3 执行迁移(务必限流)
- 设置限流(示例将复制流量限制在 50MB/s/ broker):
./bin/kafka-configs.sh --bootstrap-server broker:9092
–entity-type broker --entity-name <broker_id>
–alter --add-config ‘leader.replication.throttled.rate=52428800,replica.replication.throttled.rate=52428800’
./bin/kafka-configs.sh --bootstrap-server broker:9092
–entity-type topic --entity-name <topic_name>
–alter --add-config ‘leader.replication.throttled.replicas=,replica.replication.throttled.replicas=’
- 执行再均衡:
./bin/kafka-reassign-partitions.sh --bootstrap-server broker:9092
–reassignment-json-file reassignment.json --execute
- 观察迁移进度与限流状态:
./bin/kafka-reassign-partitions.sh --bootstrap-server broker:9092
–reassignment-json-file reassignment.json --verify
./bin/kafka-topics.sh --bootstrap-server broker:9092 --describe --topic <topic_name>
- 步骤 4 收尾
- 迁移完成后清除限流配置,避免长期影响集群吞吐。
- 如为扩容场景,仅增加分区数并不会重分布历史数据,可结合再均衡将部分分区迁至新 Broker,缓解热点。
- 重要限制与提示
- Kafka 不支持减少分区数;分区扩容后历史数据仍在原分区,需要再均衡打散。
- 再均衡会触发 Leader/Consumer Rebalance,请在低峰期执行并监控 Lag。
- 迁移工具按 Broker 粒度调度,无法指定到具体磁盘目录;若磁盘不均衡,可分批迁移降低风险。
三、跨集群迁移步骤(MirrorMaker 2)
-
步骤 1 准备
- 确保源/目标集群网络互通,版本兼容;目标集群已创建必要的 Topic/ACL(若使用 MM2 元数据同步,可自动创建或映射)。
-
步骤 2 配置 MM2(mm2.properties 示例)
clusters = source,target
source.bootstrap.servers = <src_broker:9092>
target.bootstrap.servers = <dst_broker:9092>
source->target.enabled = true
source->target.topics = .*
如需双向复制(多活),可开启 target->source
复制消费者组偏移量(需要目标集群允许)
source->target.emit.checkpoints.interval.seconds = 60
source->target.sync.group.offsets.enabled = true
source->target.sync.group.offsets.interval.seconds = 60
其他调优参数:tasks.max、replication.factor、security.* 等
-
步骤 3 启动与监控
- 启动:./bin/connect-mirror-maker.sh mm2.properties
- 监控:查看 Connect 日志与目标集群 Topic 增长、Lag、错误日志。
-
步骤 4 校验与切换
- 元数据校验:对比 Topic 数量、分区数、副本数、ACL 等是否一致(MM2 可同步元数据)。
- 数据校验:抽样对比消息数量/关键字段一致性(必要时基于业务键做校验)。
- 切换:业务侧灰度/切流至目标集群,确认稳定后停止 MirrorMaker。
四、生产环境注意事项与常见问题
- 限流与回滚
- 再均衡务必设置 leader.replication.throttled.rate / replica.replication.throttled.rate 与 topic 级限流;迁移完成及时清除。
- 执行前保存当前分配方案,出现问题时可用原方案回滚。
- 分区与顺序
- 分区数只增不减;扩容不会重分布历史数据,需要配合再均衡。
- 消息顺序仅在分区内保证,扩容会改变分区布局,消费者需能容忍重均衡与顺序变化。
- 资源与稳定性
- 迁移会显著增加 网络带宽、磁盘 I/O、CPU 压力,建议在低峰期进行并预留资源。
- 关注 Under Replicated Partitions、Request Time、Lag 等指标,必要时暂停/限速。
- 版本与工具选择
- 跨集群优先 MM2(支持元数据/偏移量同步);老版本或不需元数据时用 MM1。
- 如需 ETL/清洗,考虑 Flink/Kafka Connect 方案。