在Spring Boot中,我们可以使用Kafka的ConsumerMessageConverter
和ProducerMessageConverter
进行消息转换。这两个类分别用于将Kafka消费者和生产者接收到的消息进行序列化和反序列化。为了实现自定义的消息转换,我们可以创建一个实现org.springframework.kafka.support.converter.MessageConverter
接口的类,并重写convertFromInternal
和convertToInternal
方法。
以下是一个简单的示例,展示了如何创建一个自定义的消息转换器并将其应用于Spring Boot Kafka配置:
MessageConverter
接口的类:import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.MessageConversionException;
import org.springframework.kafka.support.serializer.RecordMetadata;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.Header;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
public class CustomKafkaMessageConverter extends MappingJackson2MessageConverter {
public CustomKafkaMessageConverter() {
super();
setCharset(Charset.forName("UTF-8"));
addPayloadDeserializer(new JsonDeserializer<>());
addHeaderDeserializer(new StringDeserializer());
}
@Override
protected Object convertFromInternal(Object payload, MessageHeaders headers, byte[] bytes) throws MessageConversionException {
// 在这里实现自定义的反序列化逻辑
return super.convertFromInternal(payload, headers, bytes);
}
@Override
protected byte[] convertToInternal(Object payload, MessageHeaders headers) throws MessageConversionException {
// 在这里实现自定义的序列化逻辑
return super.convertToInternal(payload, headers);
}
}
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig implements KafkaListenerConfigurer {
@Bean
public CustomKafkaMessageConverter customKafkaMessageConverter() {
return new CustomKafkaMessageConverter();
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory,
ProducerFactory<String, String> producerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setProducerFactory(producerFactory);
factory.setMessageConverter(customKafkaMessageConverter());
return factory;
}
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProps);
registrar.registerEndpoint(new MethodKafkaListenerEndpoint<>(
"testEndpoint",
"testMethod",
new StringDeserializer(),
new StringDeserializer(),
kafkaListenerContainerFactory(consumerFactory, producerFactory)
));
}
@Override
public void configureKafkaProducers(KafkaProducerFactory<String, String> factory) {
// 配置生产者属性,如果需要的话
}
}
现在,当使用@KafkaListener
注解监听Kafka主题时,消息将使用自定义的消息转换器进行序列化和反序列化。