要消费Kafka数据并将其写入数据库,可以按照以下步骤进行操作:
pip install kafka-python
from kafka import KafkaConsumer
import json
import pymysql
consumer = KafkaConsumer('<topic_name>', bootstrap_servers='<kafka_server_address>')
conn = pymysql.connect(host='<db_host>', port=<db_port>, user='<db_user>', password='<db_password>', db='<db_name>')
cursor = conn.cursor()
for message in consumer:
# 解析JSON格式的消息
data = json.loads(message.value)
# 提取所需的数据字段
field1 = data['field1']
field2 = data['field2']
# ...
# 构造插入数据库的SQL语句
sql = "INSERT INTO <table_name> (field1, field2) VALUES (%s, %s)"
values = (field1, field2)
# 执行SQL语句
cursor.execute(sql, values)
conn.commit()
cursor.close()
conn.close()
consumer.close()
以上是一个简单的示例,根据实际情况可能需要根据需要进行一些调整,如处理消息的格式、解析更多字段等。