Disruptor、Kafka、Netty如何整合

发布时间:2021-12-08 15:48:09 作者:小新
来源:亿速云 阅读:748

这篇文章主要介绍了Disruptor、Kafka、Netty如何整合,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

NETTY应用网关

整个网关的核心是一个netty server,各个应用程序(包括web server,手机app等)连到这个netty server上请求数据;关于数据来源,需要监听多个kafka topic(而且这里的topic是可变的,也就是说需要kafka consumer的动态开始和停止),之后需要把所有这些topic的数据整合在一起,通过channel发送给客户端应用程序。

数据流图

Disruptor、Kafka、Netty如何整合

源码

下面把大部分的代码贴出来,有需要的同学可以参考。会对关键的技术点进行说明,偏业务部分大家自行忽略吧。

main函数

启动disruptor;监听一个固定的topic,把获取到的msg,交给ConsumerProcessorGroup来完成kafka consumer的创建和停止。

public static void main(String[] args) {
        DisruptorHelper.getInstance().start();
        Properties props = ConsumerProps.getConsumerProps();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("uavlst"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            ConsumerRecord<String, String> lastRecord = null;
            for (ConsumerRecord<String, String> record : records)
                lastRecord = record;

            if (lastRecord != null){
                ConsumerProcessorGroup.getInstance().recieveNewUavLst(lastRecord.value());
            }
        }
    }

DisruptorHelper

DisruptorHelper是一个单例,主要是包含了一个disruptor 对象,在new这个对象的时候,用到了ProducerType.MULTI和new BlockingWaitStrategy(),其中前者意味着我们需要多个producer共同来工作,后者其实是默认的producer的等待策略,后续根据实际情况进行调整。

public class DisruptorHelper {
    private static DisruptorHelper instance = null;

    public static DisruptorHelper getInstance() {
        if (instance == null) {
            instance = new DisruptorHelper();
        }
        return instance;
    }

    private final int BUFFER_SIZE = 1024;
    private Disruptor<MsgEvent> disruptor = null;

    private DisruptorHelper() {
        MsgEventHandler eventHandler = new MsgEventHandler();
        disruptor = new Disruptor(new MsgEventFactory(), BUFFER_SIZE, new ConsumerThreadFactory(), ProducerType.MULTI, new BlockingWaitStrategy());
        disruptor.handleEventsWith(eventHandler);
    }

    public void start() {
        disruptor.start();
    }

    public void shutdown() {
        disruptor.shutdown();
    }

    public void produce(ConsumerRecord<String, String> record) {
        RingBuffer<MsgEvent> ringBuffer = disruptor.getRingBuffer();
        long sequence = ringBuffer.next();
        try {
            ringBuffer.get(sequence).setRecord(record);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}

ConsumerProcessorGroup

ConsumerProcessorGroup是一个单例,当中包含一个fixedThreadPool,动态的启动线程来进行kafka topic的消费。

public class ConsumerProcessorGroup {
    private static ConsumerProcessorGroup instance = null;

    public static ConsumerProcessorGroup getInstance(){
        if (instance == null){
            instance = new ConsumerProcessorGroup();
        }
        return instance;
    }

    private ConsumerProcessorGroup() {

    }

    private ExecutorService fixedThreadPool = Executors.newFixedThreadPool(20);

    public List<String> uavIDLst = new Vector<String>();

    public void recieveNewUavLst(String uavIDs){
        List<String> newUavIDs = Arrays.asList(uavIDs.split(","));
        for (String uavID : newUavIDs){
            if (!uavIDLst.contains(uavID)){
                fixedThreadPool.execute(new ConsumerThread(uavID));
                uavIDLst.add(uavID);
            }
        }
        List<String> tmpLstForDel = new ArrayList<String>();
        for (String uavID : uavIDLst){
            if (!newUavIDs.contains(uavID)){
                tmpLstForDel.add(uavID);
            }
        }
        uavIDLst.removeAll(tmpLstForDel);
    }
}

ConsumerThread

对kafka topic进行消费,通过DisruptorHelper将获取的record写入disruptor的ring buffer当中。

public class ConsumerThread implements Runnable {
    private String uavID;

    public ConsumerThread(String uavID) {
        this.uavID = uavID;
    }

    public void run() {
        Properties props = ConsumerProps.getConsumerProps();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(uavID));
        System.out.println(uavID + " consumer started! Current thread id is " + Thread.currentThread().getId());
        while (ConsumerProcessorGroup.getInstance().uavIDLst.contains(uavID)) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records){
                DisruptorHelper.getInstance().produce(record);
            }
        }
        System.out.println(uavID + " consumer finished! Current thread id is " + Thread.currentThread().getId());
    }
}

MsgEventHandler

Disruptor的消费者,依次从Ring Buffer当中读取数据并执行相应的处理。

public class MsgEventHandler implements EventHandler<MsgEvent> {
    private Map<Integer, String> converterMap;

    public void onEvent(MsgEvent event, long sequence, boolean endOfBatch) throws Exception {
        ConsumerRecord<String, String> record = event.getRecord();
        System.out.printf("topic = %s, part = %d, offset = %d, key = %s, value = %s \n\r", record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
}

感谢你能够认真阅读完这篇文章,希望小编分享的“Disruptor、Kafka、Netty如何整合”这篇文章对大家有帮助,同时也希望大家多多支持亿速云,关注亿速云行业资讯频道,更多相关知识等着你来学习!

推荐阅读:
  1. Nginx整合Kafka
  2. Flume+Kafka整合

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

disruptor kafka netty

上一篇:如何进行创建代理BeanNameAutoProxyCreator分析

下一篇:DefaultAdvisorAutoProxyCreator的具体作用是什么

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》