@KafkaListener怎么使用

发布时间:2023-02-25 11:58:43 作者:iii
来源:亿速云 阅读:258

@KafkaListener怎么使用

在现代的分布式系统中,消息队列扮演着至关重要的角色。Apache Kafka 高吞吐量、分布式的消息系统,被广泛应用于日志收集、流处理、事件驱动架构等场景。Spring Kafka 是 Spring 框架对 Kafka 的集成,提供了简洁的 API 来与 Kafka 进行交互。其中,@KafkaListener 注解是 Spring Kafka 中用于监听 Kafka 消息的核心注解之一。本文将详细介绍 @KafkaListener 的使用方法,帮助开发者更好地理解和应用这一功能。

1. 什么是 @KafkaListener

@KafkaListener 是 Spring Kafka 提供的一个注解,用于标记一个方法作为 Kafka 消息的监听器。当 Kafka 主题中有新消息到达时,被注解的方法会被自动调用,从而处理这些消息。通过 @KafkaListener,开发者可以轻松地将 Kafka 消息与业务逻辑进行绑定,实现消息的消费和处理。

2. 基本用法

2.1 添加依赖

首先,在 Spring Boot 项目中,我们需要添加 spring-kafka 依赖。在 pom.xml 文件中添加以下依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.2 配置 Kafka 消费者

application.propertiesapplication.yml 文件中配置 Kafka 消费者的相关属性:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest

2.3 使用 @KafkaListener 注解

接下来,我们可以在 Spring 管理的 Bean 中使用 @KafkaListener 注解来监听 Kafka 主题中的消息。以下是一个简单的示例:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listen(String message) {
        System.out.println("Received message: " + message);
    }
}

在这个示例中,listen 方法被标记为 @KafkaListener,并且指定了要监听的 Kafka 主题 my-topic 和消费者组 my-group。当 my-topic 主题中有新消息到达时,listen 方法会被自动调用,并将消息内容作为参数传递给方法。

3. 高级用法

3.1 监听多个主题

@KafkaListener 注解允许同时监听多个主题。可以通过 topics 属性指定多个主题名称,或者使用 topicPattern 属性通过正则表达式匹配多个主题:

@KafkaListener(topics = {"topic1", "topic2"}, groupId = "my-group")
public void listenMultipleTopics(String message) {
    System.out.println("Received message from multiple topics: " + message);
}

@KafkaListener(topicPattern = "my-topic-.*", groupId = "my-group")
public void listenTopicPattern(String message) {
    System.out.println("Received message from topic pattern: " + message);
}

3.2 处理消息头

Kafka 消息可以包含消息头(headers),这些消息头可以用于传递额外的元数据信息。@KafkaListener 允许通过 @Header 注解来获取消息头:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listenWithHeaders(String message, @Header("custom-header") String customHeader) {
        System.out.println("Received message: " + message);
        System.out.println("Custom header: " + customHeader);
    }
}

3.3 处理消息键

Kafka 消息可以包含一个键(key),用于分区和消息路由。@KafkaListener 允许通过 @Header(KafkaHeaders.RECEIVED_KEY) 注解来获取消息键:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listenWithKey(String message, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
        System.out.println("Received message: " + message);
        System.out.println("Message key: " + key);
    }
}

3.4 处理消息分区和偏移量

@KafkaListener 还允许获取消息的分区和偏移量信息。可以通过 @Header(KafkaHeaders.RECEIVED_PARTITION_ID)@Header(KafkaHeaders.OFFSET) 注解来获取这些信息:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void listenWithPartitionAndOffset(String message,
                                            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                                            @Header(KafkaHeaders.OFFSET) long offset) {
        System.out.println("Received message: " + message);
        System.out.println("Partition: " + partition);
        System.out.println("Offset: " + offset);
    }
}

3.5 手动提交偏移量

默认情况下,Spring Kafka 会自动提交消费者的偏移量。但在某些场景下,开发者可能需要手动控制偏移量的提交。可以通过设置 enableAutoCommitfalse 并手动调用 Acknowledgment 对象的 acknowledge() 方法来实现:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my-topic", groupId = "my-group", containerFactory = "kafkaManualAckListenerContainerFactory")
    public void listenWithManualAck(String message, Acknowledgment ack) {
        System.out.println("Received message: " + message);
        // 处理消息
        ack.acknowledge(); // 手动提交偏移量
    }
}

在配置类中,需要定义一个 ConcurrentKafkaListenerContainerFactory 并设置 enableAutoCommitfalse

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaManualAckListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

3.6 批量消费

在某些场景下,可能需要一次性消费多条消息。可以通过设置 batchListenertrue 来实现批量消费:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my-topic", groupId = "my-group", containerFactory = "kafkaBatchListenerContainerFactory")
    public void listenBatch(List<String> messages) {
        System.out.println("Received batch of messages: " + messages);
    }
}

在配置类中,需要定义一个 ConcurrentKafkaListenerContainerFactory 并设置 batchListenertrue

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaBatchListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        return factory;
    }
}

3.7 异常处理

在消费 Kafka 消息时,可能会遇到各种异常情况。Spring Kafka 提供了多种方式来处理这些异常。可以通过 errorHandler 属性指定一个自定义的异常处理器:

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my-topic", groupId = "my-group", errorHandler = "myErrorHandler")
    public void listenWithErrorHandler(String message) {
        System.out.println("Received message: " + message);
        throw new RuntimeException("Simulated error");
    }
}

在配置类中,定义一个自定义的异常处理器:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.stereotype.Component;

@Component
public class MyErrorHandler implements ErrorHandler {

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> data) {
        System.err.println("Error occurred while processing message: " + data.value());
        System.err.println("Exception: " + thrownException.getMessage());
    }
}

4. 总结

@KafkaListener 是 Spring Kafka 中用于监听 Kafka 消息的核心注解,提供了丰富的功能来满足不同的消费需求。通过本文的介绍,开发者可以掌握 @KafkaListener 的基本用法和高级特性,包括监听多个主题、处理消息头和键、手动提交偏移量、批量消费以及异常处理等。希望本文能够帮助开发者更好地理解和应用 @KafkaListener,从而构建高效、可靠的 Kafka 消费者应用。

推荐阅读:
  1. laravel 使用 phpword使用说明
  2. SpringBoot使用NoSQL——Redis的使用

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

@kafkalistener

上一篇:Redis数据结构类型实例代码分析

下一篇:jQuery怎么自定义属性

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》