在Ubuntu上配置Kafka消费者时,需要考虑多个方面,包括消费者组的ID、服务器地址、自动提交偏移量等。以下是一些关键步骤和配置参数:
subscribe
方法,订阅一个或多个主题。poll
方法从Kafka服务器拉取消息。commitSync
或commitAsync
方法。fetch.min.bytes
:消费者一次拉取中拉取的最小数据量,默认值为1B。fetch.max.bytes
:消费者一次拉取中拉取的最大数据量,默认值为52428800B,即50MB。fetch.max.wait.ms
:指定Kafka的等待时间,默认值为500ms。max.partition.fetch.bytes
:配置从每个分区里返回给consumer的最大数据量。max.poll.records
:配置consumer在一次拉取请求中拉取的最大消息数,默认为500条。connections.max.idle.ms
:空连接超时限制。exclude.internal.topics
:指定Kafka中的内部主题是否可以向消费者公开,默认为true。receive.buffer.bytes
:设置socket接收消息缓冲区大小,默认值为65536B,即64KB。send.buffer.bytes
:设置socket发送消息缓冲区大小,默认值为131072B,即128KB。request.timeout.ms
:consumer等待请求响应的最长时间,默认为30000ms。metadata.max.age.ms
:元数据过期时间,默认300000ms,即5分钟。reconnect.backoff.ms
:尝试重新连接主机之前等待时间,默认50ms。retry.backoff.ms
:尝试重新发送失败的请求到指定主题分区之前的等待时间,默认100ms。isolation.level
:事务隔离级别,有“read_uncommitted”和“read_committed”两种,默认情况为“read_uncommitted”。@Component
@Slf4j
public class MessageReceiveListener {
@KafkaListener(topics = "ifaas-test", containerFactory = "ifaasContainerFactory")
public void receiveMessage1(List<ConsumerRecord> consumerRecords, Acknowledgment ack) {
try {
log.info("receiveMessage1 接收的kafka消息:" + consumerRecords.size());
ack.acknowledge();
} catch (Exception e) {
log.error("kafka失败消息:{}", JSON.toJSONString(consumerRecords));
}
}
@Bean("ifaasContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> ifaasContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
Map<String, Object> props = staticConsumerProps(ifaasServers, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setBatchListener(true);
factory.setConcurrency(1);
return factory;
}
private Map<String, Object> staticConsumerProps(String servers, String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 350000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 4000);
return props;
}
}
以上信息提供了在Ubuntu上配置Kafka消费者的一些基本步骤和关键参数,以及使用SpringBoot框架的示例代码。