您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
今天就跟大家聊聊有关如何进行flink中的kafka源码分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
最近一直在弄flink sql相关的东西,第一阶段的目标是从解决kafka的消费和写入的问题。不过也有些同学并不是很了解,今天我们来详细分析一下包的继承层次。
flink源码如下:
public class KafkaTableSourceFactory implements StreamTableSourceFactory<Row>{ private ConcurrentHashMap<String, KafkaTableSource> kafkaTableSources = new ConcurrentHashMap<>(); @Override public Map<String, String> requiredContext() { Map<String, String> context = new HashMap<>(); context.put(CONNECTOR_TYPE(), KafkaConnectorDescriptor.CONNECTOR_TYPE); context.put(CONNECTOR_PROPERTY_VERSION(),String.valueOf(KafkaConnectorDescriptor.CONNECTOR_PROPERTY_VERSION)); return context; } @Override public List<String> supportedProperties() { List<String> properties = new ArrayList<>(); properties.add(KafkaConnectorDescriptor.DATABASE_KEY); properties.add(KafkaConnectorDescriptor.TABLE_KEY); return properties; } @Override public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) { //避免频繁的触发 是否需要加缓存 KafkaTableSource kafkaTableSource; String dataBase = properties.get(KafkaConnectorDescriptor.DATABASE_KEY); String table = properties.get(KafkaConnectorDescriptor.TABLE_KEY); if (!kafkaTableSources.containsKey(dataBase + table)) { Kafka08UDMPBTableSource.Builder builder = new Kafka08UDMPBTableSource.Builder(); kafkaTableSource = builder .cluster(dataBase) .subject(table) .build(); kafkaTableSources.put(dataBase + table,kafkaTableSource); } else { kafkaTableSource = kafkaTableSources.get(dataBase + table); } return kafkaTableSource; } }
class Kafka08PBTableSource protected(topic: String, properties: Properties, schema: TableSchema, typeInformation: TypeInformation[Row], paramMap: util.LinkedHashMap[String, AnyRef], entryClass: String) extends KafkaTableSource(schema, topic, properties, new PBRowDeserializationSchema(typeInformation, paramMap,entryClass)) { override def createKafkaConsumer(topic: String, properties: Properties, deserializationSchema: DeserializationSchema[Row]): FlinkKafkaConsumerBase[Row] = { this.setStartupMode(StartupMode.EARLIEST) new FlinkKafkaConsumer08(topic, deserializationSchema, properties).setStartFromEarliest() } }
下面用户自定义的kafka的sink类:
class Kafka08UDMPBTableSink (topic: String, properties: Properties, partitioner: Optional[FlinkKafkaPartitioner[Row]], paramMap: util.LinkedHashMap[String, AnyRef], serializationSchema: SerializationSchema[Row], fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]] ) extends KafkaTableSink(topic, properties, partitioner.orElse(new FlinkFixedPartitioner[Row])) { override def createKafkaProducer(topic: String, properties: Properties, serializationSchema: SerializationSchema[Row], partitioner: Optional[FlinkKafkaPartitioner[Row]]): SinkFunction[Row]={ new FlinkKafkaProducer08[Row](topic, serializationSchema, properties, partitioner.orElse(new FlinkFixedPartitioner[Row])) } override def createSerializationSchema(rowSchema: RowTypeInfo) = serializationSchema override def createCopy = new Kafka08UDMPBTableSink(topic, properties, this.partitioner, paramMap, serializationSchema, fieldNames, fieldTypes) override def configure(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): KafkaTableSink = { super.configure(this.fieldNames, this.fieldTypes) } override def getFieldNames: Array[String]=this.fieldNames /** Returns the types of the table fields. */ override def getFieldTypes: Array[TypeInformation[_]]=this.fieldTypes override def emitDataStream(dataStream: DataStream[Row]): Unit = { val kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner) dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass, fieldNames)) } }
public class TrackRowDeserializationSchema implements SerializationSchema<Row>, DeserializationSchema<Row> { private static final long serialVersionUID = -2885556750743978636L; /** Type information describing the input type. */ private TypeInformation<Row> typeInfo = null; private LinkedHashMap paraMap; private String inSchema; private String outSchema; private String inClass; private String outClass; }
public class TrackRowFormatFactory extends TableFormatFactoryBase<Row> implements SerializationSchemaFactory<Row>, DeserializationSchemaFactory<Row> { public TrackRowFormatFactory() { super(TrackValidator.FORMAT_TYPE_VALUE, 1, false); } public TrackRowFormatFactory(String type, int version, boolean supportsSchemaDerivation) { super(type, version, supportsSchemaDerivation); } @Override protected List<String> supportedFormatProperties() { final List<String> properties = new ArrayList<>(); properties.add(TrackValidator.FORMAT_IN_SCHEMA); properties.add(TrackValidator.FORMAT_IN_CLASS); properties.add(TrackValidator.FORMAT_OUT_CLASS); properties.add(TrackValidator.FORMAT_OUT_SCHEMA); properties.add(TrackValidator.FORMAT_TYPE_INFORMATION); properties.add(TrackValidator.FORMAT_TYPE_VALUE); return properties; } }
看完上述内容,你们对如何进行flink中的kafka源码分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注亿速云行业资讯频道,感谢大家的支持。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。