debian

如何在Debian上配置Kafka消费者

小樊
40
2025-04-03 06:17:01
栏目: 智能运维

在Debian上配置Kafka消费者涉及几个步骤,包括安装Java(Kafka的运行环境)、下载和配置Kafka、以及编写和运行Kafka消费者代码。以下是一个详细的步骤指南:

1. 安装Java

Kafka需要Java运行环境,因此首先需要安装Java。

sudo apt update
sudo apt install openjdk-11-jdk

验证Java安装:

java -version

2. 下载和配置Kafka

从Apache Kafka官方网站下载Kafka。

wget https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz
tar -xzf kafka_2.13-3.3.1.tgz
cd kafka_2.13-3.3.1

启动Zookeeper(Kafka的协调服务):

bin/zookeeper-server-start.sh config/zookeeper.properties

在另一个终端启动Kafka服务器:

bin/kafka-server-start.sh config/server.properties

3. 创建主题

创建一个Kafka主题,消费者将从中读取消息。

bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1

4. 编写Kafka消费者代码

使用Java编写一个简单的Kafka消费者。以下是一个示例代码:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "test-group");
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                records.forEach(record -> {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                });
            }
        } finally {
            consumer.close();
        }
    }
}

5. 编译和运行消费者代码

确保你已经安装了Maven或Gradle来管理依赖项。以下是使用Maven的示例:

使用Maven

创建一个pom.xml文件:

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>kafka-consumer</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.3.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

在项目目录中运行:

mvn clean package

运行消费者:

java -cp target/kafka-consumer-1.0-SNAPSHOT.jar SimpleConsumer

6. 验证消费者

确保Kafka服务器正在运行,并且主题已经创建。然后运行消费者代码,你应该能够看到从主题中读取的消息。

通过以上步骤,你可以在Debian上成功配置和运行一个Kafka消费者。

0
看了该问题的人还看了