在Debian上部署Kafka时,保证消息顺序性是一个关键需求,尤其是在需要严格顺序的场景中。以下是一些关键措施和配置步骤,以确保Kafka在Debian系统上运行时能够保持消息的顺序性。
enable.idempotence=true和max.in.flight.requests.per.connection=1来防止乱序。安装和配置Java: 确保系统上安装了Java运行时环境(JRE)或Java开发工具包(JDK)。
sudo apt update
sudo apt install openjdk-11-jdk -y
下载和解压Kafka: 从Apache Kafka官网下载最新版本的Kafka,并解压到指定目录。
wget https://downloads.apache.org/kafka/3.5.2/kafka_2.12-3.5.2.tgz
tar -xzf kafka_2.12-3.5.2.tgz
cd kafka_2.12-3.5.2
配置Kafka:
编辑Kafka的config/server.properties文件,确保以下配置项正确设置:
broker.id=1
listeners=PLAINTEXT://your.kafka.broker.address:9092
log.dirs=/path/to/kafka/logs
zookeeper.connect=localhost:2181
num.partitions=1 # 为了保证全局顺序性,可以设置为1个分区
default.replication.factor=1
启动Kafka和Zookeeper: 启动Zookeeper和Kafka服务器。
./zookeeper-server-start.sh ../config/zookeeper.properties
./kafka-server-start.sh ../config/server.properties
生产者配置: 使用带有Key的消息生产,确保顺序性。
producer.send(new ProducerRecord<>("orders", "order-123", "支付"));
producer.send(new ProducerRecord<>("orders", "order-123", "发货"));
消费者配置: 配置消费者以单线程顺序处理消息。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("max.poll.records", "1"); // 一次只处理一条消息
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("orders"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record); // 保证单线程顺序处理
}
}