ubuntu

Ubuntu Kafka配置中的消费者设置

小樊
57
2025-03-29 11:28:22
栏目: 智能运维

在Ubuntu上配置Kafka消费者时,需要考虑多个方面,包括消费者组的ID、服务器地址、自动提交偏移量等。以下是一些关键步骤和配置参数:

Kafka消费者配置步骤

  1. 配置消费者属性:设置消费者组ID、服务器地址、自动提交偏移量等参数。
  2. 创建Kafka消费者实例:使用上述配置的属性创建一个Kafka消费者实例。
  3. 订阅主题:通过调用subscribe方法,订阅一个或多个主题。
  4. 拉取消息:调用poll方法从Kafka服务器拉取消息。
  5. 处理消息:对拉取到的消息进行业务处理。
  6. 手动提交偏移量(可选):根据需求选择手动提交偏移量,调用commitSynccommitAsync方法。

Kafka消费者重要参数

使用SpringBoot配置Kafka消费者的示例代码

@Component
@Slf4j
public class MessageReceiveListener {
    @KafkaListener(topics = "ifaas-test", containerFactory = "ifaasContainerFactory")
    public void receiveMessage1(List<ConsumerRecord> consumerRecords, Acknowledgment ack) {
        try {
            log.info("receiveMessage1 接收的kafka消息:" + consumerRecords.size());
            ack.acknowledge();
        } catch (Exception e) {
            log.error("kafka失败消息:{}", JSON.toJSONString(consumerRecords));
        }
    }

    @Bean("ifaasContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> ifaasContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        Map<String, Object> props = staticConsumerProps(ifaasServers, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.setBatchListener(true);
        factory.setConcurrency(1);
        return factory;
    }

    private Map<String, Object> staticConsumerProps(String servers, String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 350000);
        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 4000);
        return props;
    }
}

以上信息提供了在Ubuntu上配置Kafka消费者的一些基本步骤和关键参数,以及使用SpringBoot框架的示例代码。

0
看了该问题的人还看了