kafka

springboot 整合 kafka 配置文件详解

小樊
113
2024-12-14 22:35:33
栏目: 大数据

在Spring Boot项目中整合Kafka主要涉及添加依赖、配置文件以及相关类的定义。以下是基于Spring Boot 2.x版本整合Kafka的详细步骤和配置文件详解:

依赖添加

pom.xml文件中添加Spring Kafka组件的依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-kafka</artifactId>
</dependency>

配置文件

application.propertiesapplication.yml文件中配置Kafka的相关属性。以下是一个基本的配置示例:

application.properties

# 第一个Kafka实例的配置
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=group1

# 第二个Kafka实例的配置
spring.kafka.second.bootstrap-servers=localhost:9093
spring.kafka.second.consumer.group-id=group2

application.yml

spring:
  kafka:
    bootstrap-servers:
      - localhost:9092
      - localhost:9093
    consumer:
      group-id:
        - group1
        - group2

创建KafkaTemplate和消费者监听器

在配置类中创建KafkaTemplate Bean对象和消费者监听器。

@Configuration
public class KafkaConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfigs()));
    }

    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @KafkaListener(topics = "topic1", groupId = "group1")
    public void consumeMessage1(String message) {
        System.out.println("Consumer 1: " + message);
    }

    @KafkaListener(topics = "topic2", groupId = "group2")
    public void consumeMessage2(String message) {
        System.out.println("Consumer 2: " + message);
    }
}

通过以上步骤,您可以在Spring Boot项目中成功整合Kafka,并实现消息的生产和消费。

以上信息仅供参考,具体配置可能会根据实际需求有所不同。

0
看了该问题的人还看了