在 Spring Boot 中整合 Kafka 并实现消息过滤,可以通过以下步骤来完成:
在 pom.xml
文件中添加 Kafka 和 Spring Boot Kafka 相关的依赖:
<dependencies>
<!-- Spring Boot Kafka Starter -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-kafka</artifactId>
</dependency>
</dependencies>
在 application.yml
或 application.properties
文件中配置 Kafka 相关属性:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
创建一个实现 ConsumerInterceptor
接口的类,用于实现消息过滤逻辑:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
@Component
public class MessageFilterInterceptor implements ConsumerInterceptor<String, String> {
@Value("${kafka.topic}")
private String topic;
@Override
public List<ConsumerRecord<String, String>> onConsume(ConsumerRecords<String, String> records) {
List<ConsumerRecord<String, String>> filteredRecords = Collections.emptyList();
for (ConsumerRecord<String, String> record : records) {
if (filter(record)) {
filteredRecords.add(record);
}
}
return filteredRecords;
}
@Override
public void onAcknowledge(OffsetAndMetadata offsetAndMetadata) {
// 无需实现
}
@Override
public void onCommitOffsets(OffsetAndMetadata offsetAndMetadata) {
// 无需实现
}
@Override
public void onError(Exception e) {
// 无需实现
}
private boolean filter(ConsumerRecord<String, String> record) {
// 实现消息过滤逻辑,例如根据消息内容判断是否过滤
String messageValue = record.value();
return !messageValue.contains("filtered");
}
}
创建一个配置类,用于配置 Kafka 消费者,并将自定义的消息过滤器添加到消费者配置中:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.config.MethodKafkaListenerEndpointRegistrar;
import org.springframework.kafka.listener.config.KafkaListenerEndpoint;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConsumerConfig implements KafkaListenerConfigurer {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
MethodKafkaListenerEndpointRegistrar methodRegistrar = new MethodKafkaListenerEndpointRegistrar(registrar);
methodRegistrar.setBean(this);
methodRegistrar.setMethod(this.getClass().getDeclaredMethods()[0]);
methodRegistrar.setTopics(Collections.singletonList(this.topic()));
methodRegistrar.setConsumerFactory(consumerFactory());
methodRegistrar.setInterceptor(new MessageFilterInterceptor());
methodRegistrar.registerEndpoints();
}
@Bean
public String topic() {
return "my-topic";
}
}
创建一个类,用于处理接收到的消息:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaMessageListener {
@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.consumer.group-id}")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
现在,当消费者接收到消息时,MessageFilterInterceptor
会先对消息进行过滤,只有符合条件的消息才会被 KafkaMessageListener
处理。