Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取

发布时间:2021-12-15 11:01:10 作者:柒染
来源:亿速云 阅读:465

这篇文章将为大家详细讲解有关Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

使用Tbale&SQL与Flink Kafka连接器从kafka的消息队列中获取数据

示例环境

java.version: 1.8.xflink.version: 1.11.1kafka:2.11

示例数据源 (项目码云下载)

Flink 系例 之 搭建开发环境与数据

示例模块 (pom.xml)

Flink 系例 之 TableAPI & SQL 与 示例模块

SelectToKafka.java

package com.flink.examples.kafka;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * @Description 使用Tbale&SQL与Flink Kafka连接器从kafka的消息队列中获取数据
 */
public class SelectToKafka {
    /**
     官方参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/kafka.html
     开始偏移位置
     config选项scan.startup.mode指定Kafka使用者的启动模式。有效的枚举是:
         group-offsets:从特定消费者组的ZK / Kafka经纪人中的承诺抵消开始。
         earliest-offset:从最早的偏移量开始。
         latest-offset:从最新的偏移量开始。
         timestamp:从每个分区的用户提供的时间戳开始。
         specific-offsets:从每个分区的用户提供的特定偏移量开始。
     默认选项值group-offsets表示从ZK / Kafka经纪人中最后提交的偏移量消费

     一致性保证
     sink.semantic选项来选择三种不同的操作模式:
         NONE:Flink不能保证任何事情。产生的记录可能会丢失或可以重复。
         AT_LEAST_ONCE (默认设置):这样可以确保不会丢失任何记录(尽管它们可以重复)。
         EXACTLY_ONCE:Kafka事务将用于提供一次精确的语义。每当您使用事务写入Kafka时,请不要忘记为使用Kafka记录的任何应用程序设置所需的设置isolation.level(read_committed 或read_uncommitted-后者是默认值)。
     */
    static String table_sql = "CREATE TABLE KafkaTable (\n" +
            "  `user_id` BIGINT,\n" +
            "  `item_id` BIGINT,\n" +
            "  `behavior` STRING,\n" +
            "  `ts` TIMESTAMP(3)\n" +
            ") WITH (\n" +
            "  'connector' = 'kafka',\n" +
            "  'topic' = 'user_behavior',\n" +
            "  'properties.bootstrap.servers' = '192.168.110.35:9092',\n" +
            "  'properties.group.id' = 'testGroup',\n" +
            "  'scan.startup.mode' = 'earliest-offset',\n" +
            "  'format' = 'json'\n" +
            ")";

    public static void main(String[] args) throws Exception {
        //构建StreamExecutionEnvironment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //默认流时间方式
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        //构建EnvironmentSettings 并指定Blink Planner
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        //构建StreamTableEnvironment
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
        //注册kafka数据维表
        tEnv.executeSql(table_sql);

        String sql = "select user_id,item_id,behavior,ts from KafkaTable";
        Table table = tEnv.sqlQuery(sql);
        //打印字段结构
        table.printSchema();

        //table 转成 dataStream 流
        DataStream<Row> behaviorStream = tEnv.toAppendStream(table, Row.class);
        behaviorStream.print();

        env.execute();
    }
}

打印结果

root
 |-- user_id: BIGINT
 |-- item_id: BIGINT
 |-- behavior: STRING
 |-- ts: TIMESTAMP(3)

3> 1,1,normal,2021-01-26T10:25:44

关于Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

推荐阅读:
  1. Flink实现Kafka到Mysql的Exactly-Once
  2. 携程基于Flink的实时特征平台

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

kafka tableapi sql

上一篇:php中gbk转utf8用哪个函数

下一篇:php中2进制如何转文本流

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》