在 Spring Boot 中整合 Kafka 生产者,你需要遵循以下步骤:
在你的 pom.xml
文件中添加 Kafka 和 Spring Boot Kafka 相关的依赖:
<dependencies>
<!-- Spring Boot Kafka Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-kafka</artifactId>
</dependency>
<!-- Kafka Client -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
</dependencies>
在 application.properties
或 application.yml
文件中配置 Kafka 生产者的相关参数:
# application.properties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
或者
# application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
这里配置了 Kafka 生产者的连接地址(bootstrap-servers
),以及键值序列化器(key-serializer
和 value-serializer
)。
创建一个配置类,用于设置 Kafka 生产者的相关属性:
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
}
创建一个 Kafka 生产者类,用于发送消息到 Kafka:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducerExample {
@Autowired
private KafkaProducer<String, String> kafkaProducer;
public void sendMessage(String topic, String message) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
kafkaProducer.send(record);
}
}
现在你已经成功整合了 Kafka 生产者到你的 Spring Boot 项目中。你可以使用 KafkaProducerExample
类的 sendMessage
方法发送消息到指定的 Kafka 主题。