您好,登录后才能下订单哦!
动态路由:
方案1: 定制一个特殊的KafkaDynamicSink,内嵌多个原生的FlinkKafkaProducer,每个对应一个下游的KAFKA队列
在OPEN方法中读取所有KAFKA渠道配置并构建FlinkKafkaProducer并构建一个Map: kafka channelId -> FlinkKafkaProducer 
重载INVOKE方法 
根据路由规则找到当前流数据对应所有的ChannelId (允许多个),再从MAP中获取对 FlinkKafkaProducer 并调用其INVOKE方法
核心代码: 
public class DynamicKafkaSink<IN> extends RichSinkFunction<IN> {
    @Override
    public void open(Configuration parameters) throws Exception {
        List<ChannelModel> allChannels = channelRepository.getAll();
        for(ChannelModel nextChannel: allChannels) {
            FlinkKafkaProducer010 nextProducer = (FlinkKafkaProducer010<String>)channelFactory.createChannelProducer(nextChannel, 
            FlinkKafkaProducer010.class, Collections.emptyMap());
            nextProducer.setRuntimeContext(this.getRuntimeContext());
            nextProducer.open(parameters);
            producers.put(nextChannel.getChannelId(), nextProducer);
        }
    }
    
    @Override
    public void invoke(IN value) throws Exception {
        List<String> channelIds = channelRouteStrategy.route(value);
        for (String nextChannelId: channelIds) {
            FlinkKafkaProducer010 nextProducer = producers.get(nextChannelId);
            nextProducer.invoke(converted);
        }
    }
}
注意:
Map不能在构造函数中初始化,而要在OPEN方法中初始化,FLINK分布式特性决定了构造函数和OPEN不在同一个JVM里执行
类级别的变量需要可序列化,否则需要声明为TRANSIENT
每个新构建的FlinkKafkaProducer需要先调用
setRuntimeContext(this.getRuntimeContext())
再调用open 方法才能被使用
优点:
可以路由到不同的BROKER上的TOPIC,在不同的BROKER上隔离性更好
缺陷:
所有的FlinkKafkaProducer只在OPEN的时候创建一次,后面如果添加了新的KAFKA队列无法被动态感知并路由
更改了FlinkKafkaProducer创建和初始化的过程,从MAIN函数中转到了KafkaDynamicSink的OPEN方法里,未经过全面测试,可能存在问题
方案2:方案1的升级版,利用FLINK SPLIT STREAM的特性,根据路由规则将原生数据流分成多个,每个子数据流对应一个下游KAFKA队列
在FLINK Main 函数中读取所有KAFKA渠道配置并构建FlinkKafkaProducer并构建一个Map: kafka channelId -> FlinkKafkaProducer
在输入流上构建一个SplitStream, OutputSelector 中根据路由逻辑返回一组ChannelId
遍历Map,对于Map中的每个Key (ChannelID) 调用 SplitStream 的 select方法获取对应的分支流数据,然后路由到对应的 FlinkKafkaProducer
核心代码:
public static void main(String[] args) {
    List<ChannelModel> allChannels = channelRepository.getAll();
    for(ChannelModel nextChannel: allChannels) {
        FlinkKafkaProducer010 nextProducer = (FlinkKafkaProducer010<String>)channelFactory.createChannelProducer(nextChannel, 
        FlinkKafkaProducer010.class, Collections.emptyMap());
        nextProducer.setRuntimeContext(this.getRuntimeContext());
        nextProducer.open(parameters);
        producers.put(nextChannel.getChannelId(), nextProducer);
    }
    
    DataStreamSource<T> source = ....
    SplitStream<T> splitStream = source.split(new OutputSelector<T>() {
        @Override
        public Iterable<String> select(String value) {
            List<String> channelIds = channelRouteStrategy.route(value);
            return channeIds;
        }
    });
    
    for(String nextChannel: producers.keySet()) {
        FlinkKafkaProducer010 target = producers.get(nextChannel);
        splitStream.select(nextChannel).addSink(target);
    }
}
优点:
可以路由到不同的BROKER上的TOPIC,在不同的BROKER上隔离性更好
完全利用FLINK原生的特性,更加简洁优雅,解决了方案1的第二点不足
缺陷:
所有的FlinkKafkaProducer只在MAIN函数中创建一次,后面如果添加了新的KAFKA队列无法被动态感知并路由
方案3: 利用FLINK的 KeyedSerializationSchema中的getTargetTopic函数,KeyedSerializationSchema 除了将对象转化Kafka ProducerRecord
的键值对之外还可以动态指定Topic
在FLINK Main 函数中将输入流通过flatMap 转化为 Tuple2, 其中key 是目标所属的Topic, value 是原生数据
实现一个KeyedSerializationSchema作为构造函数传给FlinkKafkaProducer,重载getTargetTopic方法: 返回 tuple2.f0
核心代码:
class DynaRouteSerializationSchema implements KeyedSerializationSchema {
    
    String getTargetTopic(T element) {
        Tuple2 tuple = (Tuple2)element;
        return tuple.f0;
    }
}
public static void main(String[] args) {
    DataStreamSource<T> source = ....
    DataStream<Tuple2<String, T>> converted = source
    .flatMap(new RichFlatMapFunction<Object, Tuple2<String, T>>() {
        @Override
        public void flatMap(T value, Collector<Tuple2<String, Object>> out)
        throws Exception {
            List<String> channelIds = channelRouteStrategy.route(value);
            for(String nextChannel: channelIds) {
                out.collect(Tuple2.valueOf(nextChannel, value));
            }
        }
    });
    
    
}
优点:
完全利用FLINK原生的特性,代码量非常少 
新增加的TOPIC也可以被路由到,不需要启停流处理
缺陷:
无法像前两个方案实现Broker级别的路由,只能做到Topic级别的路由
断流功能:
有时系统升级或者其他组件不可用,需要暂时停止KAFKA PRODUCER 
FLINK 原生机制: 
被动反压:
Kafka09Fetcher 包含了一根独立的 KafkaConsumerThread,从KAFKA中读取数据,再交给HANDOVER
HANDOVER可以理解为一个大小为1的队列, Kafka09Fetcher 再从队列中获取并处理数据,一旦当处理速度变慢,KafkaConsumerThread
无法将数据写入HANDOVER, 线程就会被阻塞
另外KeyedDeserializationSchema定义了一个isEndOfStream方法,如果返回true, Kafka09Fetcher就会停止循环并退出,导致整个流处理结束
设计思路:
SignalService: 注册SignalListener, 利用Curator TreeCache 监听一个Zookeeper 路径获取起动/停止流处理的信号量
SignalListener: 接收ZOOKEEPER变更信息的回调接口
PausableKafkaFetcher: 继承Flink原生的KafkaFetcher, 监听到信号变化阻塞ConsumerThread的处理
PausableKafkaConsumer: 继承Flink原生的KafkaConsumer, 创建PausableKafkaFetcher 
核心代码:
public class PausableKafkaFetcher<T> extends Kafka010Fetcher<T> implements SignalListener {
private final ReentrantLock pauseLock = new ReentrantLock(true);
private final Condition pauseCond = pauseLock.newCondition();
private volatile boolean paused = false;
    
public void onSignal(String path, String value) {
try {
            pauseLock.lockInterruptibly(); 
} catch(InterruptedException e) {
}
try {
if (SIGNAL_PAUSE.equals(value)) {
               paused = true;
} else if (SIGNAL_START.equals(value)) {
               paused = false;
}
           pauseCond.signal();  
       }
finally {
           pauseLock.unlock();
       }  
}
protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord<?,?> consumerRecord) throws Exception {
super.emitRecord(record, partition, offset, consumerRecord);
pauseLock.lockInterruptibly();
try {
while (paused) {
            pauseCond.await();
         }
} finally {
         pauseLock.unlock();
      }
  }
}
public class PausableKafkaConsumer<T> extends FlinkKafkaConsumer010<T> {
public void open(Configuration configuration) {
        signalService = ZKSignalService.getInstance(); 
        signalService.initialize(zkConfig);
}
 
public void cancel() {
super.cancel();
         unregisterSignal();
}
 
public void close() {
super.close();
        unregisterSignal(); 
}
private void unregisterSignal() {
if (signalService != null) {
String fullPath = WATCH_PREFIX + "/" + watchPath;
            signalService.unregisterSignalListener(fullPath);
         }
}
protected AbstractFetcher createFetcher(...) throws Exception {
PausableKafkaFetcher<T> fetcher = new PausableKafkaFetcher<> (...);
if (signalService != null) {
String fullPath = WATCH_PREFIX + "/" + watchPath;
            signalService.registerSignalListener(fullPath, fetcher);
}
        return fetcher
     }
}
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。