在Debian上配置Kafka消费者组时,需要考虑以下关键步骤和配置参数:
配置消费者属性:
group.id
):这是消费者组的唯一标识符,确保同一消费者组内的消费者实例不会重复消费同一个分区。bootstrap.servers
):Kafka集群的服务器地址列表。enable.auto.commit
):设置为false
可以手动提交偏移量,以便更好地控制数据的一致性。auto.commit.interval.ms
):如果使用自动提交,可以设置这个参数来控制提交偏移量的频率。创建Kafka消费者实例:
订阅主题:
subscribe
方法,订阅一个或多个主题。拉取消息:
poll
方法从Kafka服务器拉取消息。处理消息:
手动提交偏移量(可选):
commitSync
或commitAsync
方法。以下是一个简单的Java示例代码,展示了如何在Debian上配置Kafka消费者组:
package com.xz.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
public class CustomConsumer {
public static void main(String[] args) {
// 配置 Properties
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test5");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor");
// 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 订阅主题
ArrayList<String> topics = new ArrayList<>();
topics.add("firstTopic");
kafkaConsumer.subscribe(topics);
// 消费数据
while (true) {
// 每一秒拉取一次数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
// 输出数据
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
kafkaConsumer.commitAsync();
}
}
}
fetch.min.bytes
:消费者一次拉取中拉取的最小数据量,默认值为1B。fetch.max.bytes
:消费者一次拉取中拉取的最大数据量,默认值为52428800B,即50MB。fetch.max.wait.ms
:指定Kafka的等待时间,默认值为500ms。max.partition.fetch.bytes
:配置从每个分区里返回给consumer的最大数据量。max.poll.records
:配置consumer在一次拉取请求中拉取的最大消息数,默认为500条。connections.max.idle.ms
:空连接超时限制。exclude.internal.topics
:指定Kafka中的内部主题是否可以向消费者公开,默认为true。receive.buffer.bytes
:设置socket接收消息缓冲区大小,默认值为65536B,即64KB。send.buffer.bytes
:设置socket发送消息缓冲区大小,默认值为131072B,即128KB。request.timeout.ms
:consumer等待请求响应的最长时间,默认为30000ms。metadata.max.age.ms
:元数据过期时间,默认300000ms,即5分钟。reconnect.backoff.ms
:尝试重新连接主机之前等待时间,默认50ms。retry.backoff.ms
:尝试重新发送失败的请求到指定主题分区之前的等待时间,默认100ms。isolation.level
:事务隔离级别,有“read_uncommitted”和“read_committed”两种,默认情况为“read_uncommitted”。通过合理配置这些参数,可以确保消费者组在Debian上的高效运行和可靠性。