springboot是如何集成kafka

发布时间:2022-02-25 15:03:18 作者:小新
来源:亿速云 阅读:175

这篇文章主要介绍了springboot是如何集成kafka,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

为什么使用kafka?

业务场景

基本架构

Kafka 运行在一个由一台或多台服务器组成的集群上,并且分区可以跨集群节点分布

1、Producer 生产消息,发送到Broker中

2、Leader状态的Broker接收消息,写入到相应topic中。在一个分区内,这些消息被索引并连同时间戳存储在一起

3、Leader状态的Broker接收完毕以后,传给Follow状态的Broker作为副本备份

4、 Consumer 消费者的进程可以从分区订阅,并消费消息

常用术语

代码演示

外部依赖:

在 pom.xml 中添加 Kafka 依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

由于spring-boot-starter-parent 指定的版本号是2.1.5.RELEASE,spring boot 会对外部框架的版本号统一管理,spring-kafka 引入的版本是 2.2.6.RELEASE

配置文件:

在配置文件 application.yaml 中配置 Kafka 的相关参数,具体内容如下:

Spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      retries: 3  # 生产者发送失败时,重试次数
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer # 生产者消息key和消息value的序列化处理类
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: tomge-consumer-group  # 默认消费者group id
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 100
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

对应的配置类 org.springframework.boot.autoconfigure.kafka.KafkaProperties,来初始化kafka相关的bean实例对象,并注册到spring容器中。

发送消息:

Spring Boot 作为一款支持快速开发的集成性框架,同样提供了一批以 -Template 命名的模板工具类用于实现消息通信。对于 Kafka 而言,这个工具类就是KafkaTemplate

KafkaTemplate 提供了一系列 send 方法用来发送消息,典型的 send 方法定义如下代码所示:

public ListenableFuture<SendResult<K, V>> send(String topic, @Nullable V data) {
 。。。。 省略
}

生产端提供了一个restful接口,模拟发送一条创建新用户消息。

@GetMapping("/add_user")
public Object add() {
    try {
        Long id = Long.valueOf(new Random().nextInt(1000));
        User user = User.builder().id(id).userName("TomGE").age(29).address("上海").build();
        ListenableFuture<SendResult> listenableFuture = kafkaTemplate.send(addUserTopic, JSON.toJSONString(user));
        
        // 提供回调方法,可以监控消息的成功或失败的后续处理
        listenableFuture.addCallback(new ListenableFutureCallback<SendResult>() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("发送消息失败," + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult sendResult) {
                // 消息发送到的topic
                String topic = sendResult.getRecordMetadata().topic();
                // 消息发送到的分区
                int partition = sendResult.getRecordMetadata().partition();
                // 消息在分区内的offset
                long offset = sendResult.getRecordMetadata().offset();
                System.out.println(String.format("发送消息成功,topc:%s, partition: %s, offset:%s ", topic, partition, offset));
            }
        });
        return "消息发送成功";
    } catch (Exception e) {
        e.printStackTrace();
        return "消息发送失败";
    }
}

实际上开发使用的Kafka默认允许自动创建Topic,创建Topic时默认的分区数量是1,可以通过server.properties文件中的num.partitions=1修改默认分区数量。在生产环境中通常会关闭自动创建功能,Topic需要由运维人员先创建好。

消费消息:

在 Kafka 中消息通过服务器推送给各个消费者,而 Kafka 的消费者在消费消息时,需要提供一个监听器(Listener)对某个 Topic 实现监听,从而获取消息,这也是 Kafka 消费消息的唯一方式。

定义一个消费类,在处理具体消息业务逻辑的方法上添加 @KafkaListener 注解,并配置要消费的topic,代码如下所示:

@Component
public class UserConsumer {

    @KafkaListener(topics = "add_user")
    public void receiveMesage(String content) {
        System.out.println("消费消息:" + content);
    }
}

是不是很简单,添加kafka依赖、使用KafkaTemplate、@KafkaListener注解就完成消息的生产和消费,其实是SpringBoot在背后默默的做了很多工作

感谢你能够认真阅读完这篇文章,希望小编分享的“springboot是如何集成kafka”这篇文章对大家有帮助,同时也希望大家多多支持亿速云,关注亿速云行业资讯频道,更多相关知识等着你来学习!

推荐阅读:
  1. 什么是Kafka?
  2. CDH如何集成kafka

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

springboot kafka

上一篇:conda如何使用清华源

下一篇:Springboot中怎么实现多线程并发定时任务

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》