linux

Linux Kafka与Spark集成实践

小樊
38
2025-10-25 13:52:51
栏目: 智能运维

Linux环境下Kafka与Spark集成实践指南

一、集成前环境准备

在开始集成前,需确保Linux系统已安装以下组件(版本需兼容):

二、核心集成步骤

1. 创建Kafka主题

使用Kafka命令行工具创建用于数据传输的主题(如test_topic),指定Broker地址、分区数和副本数:

bin/kafka-topics.sh --create --topic test_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

主题创建后,可通过bin/kafka-topics.sh --list --bootstrap-server localhost:9092验证主题是否存在。

2. 配置Spark读取Kafka数据

使用Spark Structured Streaming API从Kafka消费数据,关键配置项包括:

PySpark示例

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("KafkaSparkIntegration") \
    .getOrCreate()

kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "test_topic") \
    .load()

# 将Kafka的value字段(字节数组)转换为字符串
processed_df = kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
processed_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start() \
    .awaitTermination()

Scala示例

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
    .appName("KafkaSparkIntegration")
    .getOrCreate()

val kafkaDF = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test_topic")
    .load()

val processedDF = kafkaDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
processedDF.writeStream
    .outputMode("append")
    .format("console")
    .start()
    .awaitTermination()

上述代码会从Kafka读取数据,并将key/value以字符串形式输出到控制台。

3. 配置Spark写入Kafka数据

若需将Spark处理后的数据写回Kafka,可使用writeStream结合Kafka sink,关键配置项包括:

PySpark示例

# 假设processed_df是处理后的DataFrame,包含"value"列(需转换为字节数组)
processed_df.selectExpr("CAST(value AS STRING)") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "output_topic") \
    .option("checkpointLocation", "/tmp/spark_kafka_checkpoint") \
    .start() \
    .awaitTermination()

Scala示例

processedDF.selectExpr("CAST(value AS STRING)") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "output_topic") \
    .option("checkpointLocation", "/tmp/spark_kafka_checkpoint") \
    .start() \
    .awaitTermination()

上述代码会将processed_df中的数据写入Kafka的output_topic主题。

三、实战场景示例:传感器数据实时处理

假设Kafka中有一个sensor_data主题,存储了物联网设备的温度、湿度数据(JSON格式),以下是Spark实时过滤高温数据的完整流程:

1. Kafka数据生产者(Python模拟)

使用kafka-python库模拟传感器数据发送,每秒发送一条包含sensor_idtemperaturehumidity的JSON消息:

from kafka import KafkaProducer
import json
import random
import time

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

while True:
    data = {
        'sensor_id': random.randint(1, 100),
        'temperature': round(random.uniform(20.0, 30.0), 2),
        'humidity': round(random.uniform(30.0, 70.0), 2),
        'timestamp': int(time.time())
    }
    producer.send('sensor_data', data)
    time.sleep(1)

2. Spark实时处理(过滤高温数据)

使用Spark Structured Streaming读取sensor_data主题,过滤出温度超过25℃的数据,并将结果写入控制台:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, TimestampType

spark = SparkSession.builder \
    .appName("SensorDataProcessing") \
    .getOrCreate()

# 定义数据模式(用于解析JSON)
schema = StructType([
    StructField("sensor_id", IntegerType()),
    StructField("temperature", FloatType()),
    StructField("humidity", FloatType()),
    StructField("timestamp", TimestampType())
])

# 从Kafka读取数据
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "sensor_data") \
    .load()

# 解析JSON数据并过滤
processed_df = kafka_df.selectExpr("CAST(value AS STRING)") \
    .selectExpr("from_json(value, 'sensor_id INT, temperature FLOAT, humidity FLOAT, timestamp TIMESTAMP') as data") \
    .select("data.*") \
    .filter("temperature > 25.0")

# 输出结果到控制台
query = processed_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()

运行后,Spark会实时打印温度超过25℃的传感器数据。

四、常见问题排查与性能优化

1. 常见问题

2. 性能优化

通过以上步骤,可实现Linux环境下Kafka与Spark的高效集成,支撑实时数据采集、处理与分析需求。

0
看了该问题的人还看了