在Linux上配置Kafka的生产者与消费者,可以按照以下步骤进行:
Kafka是用Java编写的,因此首先需要安装Java。可以使用以下命令安装OpenJDK:
sudo apt update
sudo apt install openjdk-11-jdk
从Apache Kafka官方网站下载最新版本的Kafka,并解压到本地目录:
wget https://downloads.apache.org/kafka/3.2.0/kafka_2.13-3.2.0.tgz
tar -xzf kafka_2.13-3.2.0.tgz
cd kafka_2.13-3.2.0
Kafka使用Zookeeper来管理集群配置和元数据。启动Zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
在另一个终端窗口中启动Kafka服务器:
bin/kafka-server-start.sh config/server.properties
创建一个Kafka主题,例如my-topic:
bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
创建一个生产者配置文件producer.properties:
bootstrap.servers=localhost:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
使用配置文件启动生产者并发送消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic --property print.key=true --property print.value=true < producer.properties
在提示符下输入消息并按回车键发送。
创建一个消费者配置文件consumer.properties:
bootstrap.servers=localhost:9092
group.id=my-group
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
auto.offset.reset=earliest
使用配置文件启动消费者并读取消息:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning --property group.id=my-group < consumer.properties
确保生产者能够成功发送消息,并且消费者能够接收到这些消息。
在完成测试后,可以停止Kafka和Zookeeper:
# 停止消费者
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my-topic --from-beginning --property group.id=my-group --quit-on-eof < consumer.properties
# 停止生产者
Ctrl+C
# 停止Kafka服务器
bin/kafka-server-stop.sh
# 停止Zookeeper
bin/zookeeper-server-stop.sh
通过以上步骤,你可以在Linux上成功配置Kafka的生产者与消费者。