如何使用spring boot整合kafka和延迟启动消费者

发布时间:2021-08-09 11:43:24 作者:小新
来源:亿速云 阅读:405

这篇文章给大家分享的是有关如何使用spring boot整合kafka和延迟启动消费者的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

spring boot 整合kafka,延迟启动消费者

spring boot整合kafka的时候一般使用@KafkaListener来设置消费者,但是这种方式在spring启动的时候就会立即开启消费者。如果有需要根据配置信息延迟开启指定的消费者就不能使用这种方式。

为了方便使用,我自定义了一个注解:

import org.springframework.kafka.annotation.TopicPartition; 
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target; 
@Target({ ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface DelayKafkaConsumer { 
    String id() default ""; 
    String[] topics() default {}; 
    String errorHandler() default ""; 
    String groupId() default ""; 
    TopicPartition[] topicPartitions() default {}; 
    String beanRef() default "__listener";
}

配合注解使用的factory:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.config.*;
import org.springframework.context.expression.StandardBeanExpressionResolver;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.convert.converter.GenericConverter;
import org.springframework.format.Formatter;
import org.springframework.format.FormatterRegistry;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.handler.annotation.support.*;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils; 
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
 
@Service
public class MyKafkaConsumerFactory implements KafkaListenerConfigurer,BeanFactoryAware { 
    private static final Logger logger = LoggerFactory.getLogger(MyKafkaConsumerFactory.class); 
    private KafkaListenerEndpointRegistrar kafkaListenerEndpointRegistrar; 
    private final AtomicInteger counter = new AtomicInteger();
    private BeanFactory beanFactory; 
    private BeanExpressionResolver resolver = new StandardBeanExpressionResolver(); 
    private BeanExpressionContext expressionContext; 
    private final ListenerScope listenerScope = new ListenerScope(); 
    private final KafkaHandlerMethodFactoryAdapter messageHandlerMethodFactory =
            new KafkaHandlerMethodFactoryAdapter();
 
    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        this.kafkaListenerEndpointRegistrar = registrar;
        addFormatters(messageHandlerMethodFactory.defaultFormattingConversionService);
    }
 
    public void startConsumer(KafkaListenerEndpoint endpoint){
        kafkaListenerEndpointRegistrar.registerEndpoint(endpoint);
    }
 
    public void startConsumer(Object target){
        logger.info("start consumer {} ...",target.getClass());
        Class<?> targetClass = AopUtils.getTargetClass(target);
        Map<Method, Set<DelayKafkaConsumer>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                new MethodIntrospector.MetadataLookup<Set<DelayKafkaConsumer>>() {
 
                    @Override
                    public Set<DelayKafkaConsumer> inspect(Method method) {
                        Set<DelayKafkaConsumer> listenerMethods = findListenerAnnotations(method);
                        return (!listenerMethods.isEmpty() ? listenerMethods : null);
                    }
 
                });
        if (annotatedMethods.size()==0)
            throw new IllegalArgumentException(target.getClass()+" need have method with @DelayKafkaConsumer");
        for (Map.Entry<Method, Set<DelayKafkaConsumer>> entry : annotatedMethods.entrySet()) {
            Method method = entry.getKey();
            logger.info("find message listen handler method : {} , object : {}",method.getName(),target.getClass());
            for (DelayKafkaConsumer listener : entry.getValue()) {
                if(listener.topics().length==0) {
                    logger.info("topics value is empty , will skip it , method : {} , target object : {}",method.getName(),target.getClass());
                    continue;
                }
                processKafkaListener(listener,method,target);
                logger.info("register method {} success , target object : {}",method.getName(),target.getClass());
            }
        }
        logger.info("{} consumer start complete .",target.getClass());
    }
 
    protected void processKafkaListener(DelayKafkaConsumer kafkaListener, Method method, Object bean) {
        Method methodToUse = checkProxy(method, bean);
        MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint();
        endpoint.setMethod(methodToUse);
        endpoint.setBeanFactory(this.beanFactory);
        String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
        if (StringUtils.hasText(errorHandlerBeanName)) {
            endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
        }
        processListener(endpoint, kafkaListener, bean, methodToUse);
    }
 
    protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, DelayKafkaConsumer kafkaListener, Object bean,
                                   Object adminTarget) {
        String beanRef = kafkaListener.beanRef();
        if (StringUtils.hasText(beanRef)) {
            this.listenerScope.addListener(beanRef, bean);
        }
        endpoint.setBean(bean);
        endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
        endpoint.setId(getEndpointId(kafkaListener));
        endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
        endpoint.setTopics(resolveTopics(kafkaListener));
        endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
        kafkaListenerEndpointRegistrar.registerEndpoint(endpoint);
        if (StringUtils.hasText(beanRef)) {
            this.listenerScope.removeListener(beanRef);
        }
    }
 
    private String getEndpointId(DelayKafkaConsumer kafkaListener) {
        if (StringUtils.hasText(kafkaListener.id())) {
            return resolve(kafkaListener.id());
        }
        else {
            return "Custom-Consumer" + this.counter.getAndIncrement();
        }
    }
 
    private String getEndpointGroupId(DelayKafkaConsumer kafkaListener, String id) {
        String groupId = null;
        if (StringUtils.hasText(kafkaListener.groupId())) {
            groupId = resolveExpressionAsString(kafkaListener.groupId(), "groupId");
        }
        if (groupId == null && StringUtils.hasText(kafkaListener.id())) {
            groupId = id;
        }
        return groupId;
    }
 
    private String[] resolveTopics(DelayKafkaConsumer kafkaListener) {
        String[] topics = kafkaListener.topics();
        List<String> result = new ArrayList<>();
        if (topics.length > 0) {
            for (int i = 0; i < topics.length; i++) {
                Object topic = resolveExpression(topics[i]);
                resolveAsString(topic, result);
            }
        }
        return result.toArray(new String[result.size()]);
    }
 
    private void resolveAsString(Object resolvedValue, List<String> result) {
        if (resolvedValue instanceof String[]) {
            for (Object object : (String[]) resolvedValue) {
                resolveAsString(object, result);
            }
        }
        else if (resolvedValue instanceof String) {
            result.add((String) resolvedValue);
        }
        else if (resolvedValue instanceof Iterable) {
            for (Object object : (Iterable<Object>) resolvedValue) {
                resolveAsString(object, result);
            }
        }
        else {
            throw new IllegalArgumentException(String.format(
                    "@DelayKafkaConsumer can't resolve '%s' as a String", resolvedValue));
        }
    }
 
    private TopicPartitionInitialOffset[] resolveTopicPartitions(DelayKafkaConsumer kafkaListener) {
        TopicPartition[] topicPartitions = kafkaListener.topicPartitions();
        List<TopicPartitionInitialOffset> result = new ArrayList<>();
        if (topicPartitions.length > 0) {
            for (TopicPartition topicPartition : topicPartitions) {
                result.addAll(resolveTopicPartitionsList(topicPartition));
            }
        }
        return result.toArray(new TopicPartitionInitialOffset[result.size()]);
    }
 
    private List<TopicPartitionInitialOffset> resolveTopicPartitionsList(TopicPartition topicPartition) {
        Object topic = resolveExpression(topicPartition.topic());
        Assert.state(topic instanceof String,
                "topic in @TopicPartition must resolve to a String, not " + topic.getClass());
        Assert.state(StringUtils.hasText((String) topic), "topic in @TopicPartition must not be empty");
        String[] partitions = topicPartition.partitions();
        PartitionOffset[] partitionOffsets = topicPartition.partitionOffsets();
        Assert.state(partitions.length > 0 || partitionOffsets.length > 0,
                "At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'");
        List<TopicPartitionInitialOffset> result = new ArrayList<>();
        for (int i = 0; i < partitions.length; i++) {
            resolvePartitionAsInteger((String) topic, resolveExpression(partitions[i]), result);
        }
 
        for (PartitionOffset partitionOffset : partitionOffsets) {
            Object partitionValue = resolveExpression(partitionOffset.partition());
            Integer partition;
            if (partitionValue instanceof String) {
                Assert.state(StringUtils.hasText((String) partitionValue),
                        "partition in @PartitionOffset for topic '" + topic + "' cannot be empty");
                partition = Integer.valueOf((String) partitionValue);
            }
            else if (partitionValue instanceof Integer) {
                partition = (Integer) partitionValue;
            }
            else {
                throw new IllegalArgumentException(String.format(
                        "@PartitionOffset for topic '%s' can't resolve '%s' as an Integer or String, resolved to '%s'",
                        topic, partitionOffset.partition(), partitionValue.getClass()));
            }
 
            Object initialOffsetValue = resolveExpression(partitionOffset.initialOffset());
            Long initialOffset;
            if (initialOffsetValue instanceof String) {
                Assert.state(StringUtils.hasText((String) initialOffsetValue),
                        "'initialOffset' in @PartitionOffset for topic '" + topic + "' cannot be empty");
                initialOffset = Long.valueOf((String) initialOffsetValue);
            }
            else if (initialOffsetValue instanceof Long) {
                initialOffset = (Long) initialOffsetValue;
            }
            else {
                throw new IllegalArgumentException(String.format(
                        "@PartitionOffset for topic '%s' can't resolve '%s' as a Long or String, resolved to '%s'",
                        topic, partitionOffset.initialOffset(), initialOffsetValue.getClass()));
            }
 
            Object relativeToCurrentValue = resolveExpression(partitionOffset.relativeToCurrent());
            Boolean relativeToCurrent;
            if (relativeToCurrentValue instanceof String) {
                relativeToCurrent = Boolean.valueOf((String) relativeToCurrentValue);
            }
            else if (relativeToCurrentValue instanceof Boolean) {
                relativeToCurrent = (Boolean) relativeToCurrentValue;
            }
            else {
                throw new IllegalArgumentException(String.format(
                        "@PartitionOffset for topic '%s' can't resolve '%s' as a Boolean or String, resolved to '%s'",
                        topic, partitionOffset.relativeToCurrent(), relativeToCurrentValue.getClass()));
            }
 
            TopicPartitionInitialOffset topicPartitionOffset =
                    new TopicPartitionInitialOffset((String) topic, partition, initialOffset, relativeToCurrent);
            if (!result.contains(topicPartitionOffset)) {
                result.add(topicPartitionOffset);
            }
            else {
                throw new IllegalArgumentException(
                        String.format("@TopicPartition can't have the same partition configuration twice: [%s]",
                                topicPartitionOffset));
            }
        }
        return result;
    }
 
    private void resolvePartitionAsInteger(String topic, Object resolvedValue,
                                           List<TopicPartitionInitialOffset> result) {
        if (resolvedValue instanceof String[]) {
            for (Object object : (String[]) resolvedValue) {
                resolvePartitionAsInteger(topic, object, result);
            }
        }
        else if (resolvedValue instanceof String) {
            Assert.state(StringUtils.hasText((String) resolvedValue),
                    "partition in @TopicPartition for topic '" + topic + "' cannot be empty");
            result.add(new TopicPartitionInitialOffset(topic, Integer.valueOf((String) resolvedValue)));
        }
        else if (resolvedValue instanceof Integer[]) {
            for (Integer partition : (Integer[]) resolvedValue) {
                result.add(new TopicPartitionInitialOffset(topic, partition));
            }
        }
        else if (resolvedValue instanceof Integer) {
            result.add(new TopicPartitionInitialOffset(topic, (Integer) resolvedValue));
        }
        else if (resolvedValue instanceof Iterable) {
            for (Object object : (Iterable<Object>) resolvedValue) {
                resolvePartitionAsInteger(topic, object, result);
            }
        }
        else {
            throw new IllegalArgumentException(String.format(
                    "@DelayKafkaConsumer for topic '%s' can't resolve '%s' as an Integer or String", topic, resolvedValue));
        }
    }
 
    private Set<DelayKafkaConsumer> findListenerAnnotations(Method method) {
        Set<DelayKafkaConsumer> listeners = new HashSet<>();
        DelayKafkaConsumer ann = AnnotationUtils.findAnnotation(method, DelayKafkaConsumer.class);
        if (ann != null) {
            listeners.add(ann);
        }
        return listeners;
    } 
 
    private Method checkProxy(Method methodArg, Object bean) {
        Method method = methodArg;
        if (AopUtils.isJdkDynamicProxy(bean)) {
            try {
                method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
                Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();
                for (Class<?> iface : proxiedInterfaces) {
                    try {
                        method = iface.getMethod(method.getName(), method.getParameterTypes());
                        break;
                    }
                    catch (NoSuchMethodException noMethod) {
                    }
                }
            }
            catch (SecurityException ex) {
                ReflectionUtils.handleReflectionException(ex);
            }
            catch (NoSuchMethodException ex) {
                throw new IllegalStateException(String.format(
                        "target method '%s' found on bean target class '%s', " +
                                "but not found in any interface(s) for bean JDK proxy. Either " +
                                "pull the method up to an interface or switch to subclass (CGLIB) " +
                                "proxies by setting proxy-target-class/proxyTargetClass " +
                                "attribute to 'true'", method.getName(), method.getDeclaringClass().getSimpleName()), ex);
            }
        }
        return method;
    }
 
    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
        if (beanFactory instanceof ConfigurableListableBeanFactory) {
            this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
            this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,
                    this.listenerScope);
        }
    }
 
    private String resolveExpressionAsString(String value, String attribute) {
        Object resolved = resolveExpression(value);
        if (resolved instanceof String) {
            return (String) resolved;
        }
        else {
            throw new IllegalStateException("The [" + attribute + "] must resolve to a String. "
                    + "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
        }
    }
 
    private Object resolveExpression(String value) {
        String resolvedValue = resolve(value);
        return this.resolver.evaluate(resolvedValue, this.expressionContext);
    }
 
    /**
     * Resolve the specified value if possible.
     * @param value the value to resolve
     * @return the resolved value
     * @see ConfigurableBeanFactory#resolveEmbeddedValue
     */
    private String resolve(String value) {
        if (this.beanFactory instanceof ConfigurableBeanFactory) {
            return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);
        }
        return value;
    }
 
    private void addFormatters(FormatterRegistry registry) {
        for (Converter<?, ?> converter : getBeansOfType(Converter.class)) {
            registry.addConverter(converter);
        }
        for (GenericConverter converter : getBeansOfType(GenericConverter.class)) {
            registry.addConverter(converter);
        }
        for (org.springframework.format.Formatter<?> formatter : getBeansOfType(Formatter.class)) {
            registry.addFormatter(formatter);
        }
    }
 
    private <T> Collection<T> getBeansOfType(Class<T> type) {
        if (this.beanFactory instanceof ListableBeanFactory) {
            return ((ListableBeanFactory) this.beanFactory).getBeansOfType(type).values();
        }else {
            return Collections.emptySet();
        }
    }
 
    private static class ListenerScope implements Scope { 
        private final Map<String, Object> listeners = new HashMap<>(); 
        ListenerScope() {
            super();
        }
 
        public void addListener(String key, Object bean) {
            this.listeners.put(key, bean);
        }
 
        public void removeListener(String key) {
            this.listeners.remove(key);
        }
 
        @Override
        public Object get(String name, ObjectFactory<?> objectFactory) {
            return this.listeners.get(name);
        }
 
        @Override
        public Object remove(String name) {
            return null;
        }
 
        @Override
        public void registerDestructionCallback(String name, Runnable callback) {
        }
 
        @Override
        public Object resolveContextualObject(String key) {
            return this.listeners.get(key);
        }
 
        @Override
        public String getConversationId() {
            return null;
        }
 
    }
 
    private class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory {
 
        private final DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService();
 
        private MessageHandlerMethodFactory messageHandlerMethodFactory;
 
        public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {
            this.messageHandlerMethodFactory = kafkaHandlerMethodFactory1;
        }
 
        @Override
        public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {
            return getMessageHandlerMethodFactory().createInvocableHandlerMethod(bean, method);
        }
 
        private MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
            if (this.messageHandlerMethodFactory == null) {
                this.messageHandlerMethodFactory = createDefaultMessageHandlerMethodFactory();
            }
            return this.messageHandlerMethodFactory;
        }
 
        private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
            DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
            defaultFactory.setBeanFactory(MyKafkaConsumerFactory.this.beanFactory);
 
            ConfigurableBeanFactory cbf =
                    (MyKafkaConsumerFactory.this.beanFactory instanceof ConfigurableBeanFactory ?
                            (ConfigurableBeanFactory) MyKafkaConsumerFactory.this.beanFactory : null); 
 
            defaultFactory.setConversionService(this.defaultFormattingConversionService); 
            List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();
 
            // Annotation-based argument resolution
            argumentResolvers.add(new HeaderMethodArgumentResolver(this.defaultFormattingConversionService, cbf));
            argumentResolvers.add(new HeadersMethodArgumentResolver());
 
            // Type-based argument resolution
            final GenericMessageConverter messageConverter = new GenericMessageConverter(this.defaultFormattingConversionService);
            argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter));
            argumentResolvers.add(new PayloadArgumentResolver(messageConverter) {
 
                @Override
                protected boolean isEmptyPayload(Object payload) {
                    return payload == null || payload instanceof KafkaNull;
                }
 
            });
            defaultFactory.setArgumentResolvers(argumentResolvers); 
            defaultFactory.afterPropertiesSet();
            return defaultFactory;
        } 
    }
}

通过startConsumer来启动一个消费者(多次调用会启动多个消费者)。target必须至少包含一个有@DelayKafkaConsumer注解的方法。这里类似@KafkaListener。我去掉了一部分功能,保留了比较常用的部分。

这里提供了一个通过注解的方式在spring boot项目中动态控制consumer的方法。还有其他的方法来达到这种效果,不过我觉得这种方法比较方便。

java项目集成springboot使用kafka消费者,启动失败报错 Failed to construct kafka consumer

之前博客里面提到本公司为物联网项目。项目中使用mqtt+kafka进行与设备端的通讯,之前的协议格式为json格式,现在改成字节数组byte[]格式进行通信。

集成springboot后,具体的demo网上很多,接下来有时间会出一份kafka的demo。

报错信息如下:

Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry';
nested exception is org.apache.kafka.common.KafkaException:Failed to construct kafka consumer

原因分析:

之前json格式通信时候,构建kafka消费工厂的时候,其中ConcurrentMessageListenerContainer的key为String类型,而value现在为byte[]类型,所以构建消费者工厂的时候需要指定正确的value类型。

代码如下:

 public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, byte[]>> kafkaListenerContainerByteFactory() {
     ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<String, byte[]>();
     factory.setConsumerFactory(consumerByteFactory());
     factory.setConcurrency(concurrency);
     factory.getContainerProperties().setPollTimeout(1500);
     return factory;
    }

整体kafka生产者+kafka消费者的demo会在接下来的博客中陆续整理。

感谢各位的阅读!关于“如何使用spring boot整合kafka和延迟启动消费者”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

推荐阅读:
  1. spring boot中怎么整合kafka
  2. RocketMQ怎么在Spring Boot项目中使用

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

springboot kafka

上一篇:mysql数据库导入导出的用法

下一篇:Linux下MySql数据库的安装配置以及简单操作介绍

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》