Kafka与CentOS其他服务的协同工作机制及实现方法
Kafka作为分布式消息队列,通过与CentOS上的其他服务(如数据存储、日志收集、NoSQL、监控系统等)集成,构建实时数据流处理管道。以下是常见协同场景的具体实现方式:
EFK是CentOS上经典的日志处理方案,Kafka作为中间缓冲层,解决日志收集与存储的解耦问题。
yum或源码安装);nginx-logs),设置分区数(如3)和副本数(如2);/etc/filebeat/filebeat.yml),添加Kafka输出插件:output.kafka:
enabled: true
hosts: ["kafka1.centos:9092", "kafka2.centos:9092"]
topic: "nginx-logs"
④ 启动服务:systemctl start filebeat kafka elasticsearch;bin/logstash -f /etc/logstash/conf.d/kafka-to-es.conf
⑥ 验证:通过Kibana(Elasticsearch可视化工具)查看Nginx日志。Kafka与HDFS结合,实现实时数据写入HDFS,支持离线分析与历史数据存储。
order-data),设置合适的分区数;val sparkConf = new SparkConf().setAppName("KafkaToHDFS")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "kafka1.centos:9092,kafka2.centos:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "hdfs-writer",
"auto.offset.reset" -> "latest"
)
val topics = Array("order-data")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => record.value()).saveAsTextFile("hdfs://namenode:8020/user/hadoop/order-data")
ssc.start()
ssc.awaitTermination()
④ 启动Spark Streaming程序,验证HDFS中是否有数据写入。Kafka与HBase结合,实现实时数据写入HBase,支持低延迟随机读写。
hbase-site.xml),设置Zookeeper地址和HDFS路径:<property>
<name>hbase.rootdir</name>
<value>hdfs://namenode:8020/hbase</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>zookeeper1.centos:2181,zookeeper2.centos:2181</value>
</property>
③ 编写Kafka生产者程序,发送数据到Topic(如user-data);// 初始化HBase连接
Configuration config = HBaseConfiguration.create();
try (Connection connection = ConnectionFactory.createConnection(config);
Table table = connection.getTable(TableName.valueOf("user_table"))) {
// 消费Kafka数据
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
consumer.subscribe(Arrays.asList("user-data"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 解析数据并写入HBase
Put put = new Put(Bytes.toBytes(record.key()));
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("info"), Bytes.toBytes(record.value()));
table.put(put);
}
}
}
⑤ 启动HBase和Kafka服务,运行生产者和消费者程序,验证HBase中是否有数据写入。Kafka在CentOS上的稳定运行需要完善的监控,常见工具包括Kafka Exporter+Prometheus+Grafana、Kafka Manager、Burrow等。
./kafka_exporter --kafka.server=kafka1.centos:9092 --web.listen-address=:9308
② 配置Prometheus(采集Kafka指标):修改prometheus.yml,添加Job:scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['kafka1.centos:9308']
③ 配置Grafana(可视化指标):添加Prometheus数据源,导入Kafka仪表盘(如ID:3955);kafka_server_brokertopicmetrics_messages_in_total)、延迟(kafka_consumer_fetch_manager_metrics_records_lag)、分区状态(kafka_controller_kafkacontroller_offline_partitions_count)等。Kafka作为数据源,Spark Streaming作为流处理引擎,实现实时数据处理(如实时统计、ETL)。
createDirectStream方法读取Kafka Topic数据,进行处理后写入HDFS、数据库或Kafka。spark-submit --class com.example.KafkaToHDFS --master yarn --deploy-mode cluster kafka-to-hdfs.jar;