flink streaming sql怎么使用

发布时间:2021-12-31 10:35:09 作者:iii
来源:亿速云 阅读:167

本篇内容主要讲解“flink streaming sql怎么使用”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“flink streaming sql怎么使用”吧!

背景

SQL,Structured Query Language:结构化查询语言,作为一个通用、流行的查询语言,不仅仅是在传统的数据库,在大数据领域也变得越来越流行,hive、spark、kafka、flink等大数据组件都支持sql的查询,使用sql可以让一些不懂这些组件原理的人,轻松的来操作,大大的降低了使用的门槛,今天我们先来简单的讲讲在flink的流处理中如何使用sql. 

实例讲解 

构造StreamTableEnvironment对象

在flink的流处理中,要使用sql,需要首先构造一个StreamTableEnvironment对象,方法比较简单。

sql中用到的catalog、table、function等都需要注册到StreamTableEnvironment才能使用。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
   

注册table

接下来要将相应的表的信息注册到StreamTableEnvironment对象中,有以下几种方式可以选择.
以下的代码是基于flink 1.10.0版本进行讲解的,各个版本略有不同。 

使用Tuple


  //使用flink的二元组,这个时候需要自定义字段名称
  Tuple2<String,Integer> tuple2 = Tuple2.of("jack", 10);
  //构造一个Tuple的DataStream
  DataStream<Tuple2<String,Integer>> tupleStream = env.fromElements(tuple2);
//  注册到StreamTableEnvironment,并且指定对应的字段名
  tableEnv.createTemporaryView("usersTuple", tupleStream, "name,age");
  //执行一个sql 查询. 然后返回一个table对象
  Table table = tableEnv.sqlQuery("select name,age from usersTuple");
//  将table对象转成flink的DataStream,以便后续操作,我们这里将其输出
  tableEnv.toAppendStream(table, Row.class).print();

使用Row

flink中提供的元组Tuple是有限制的,最多到Tuple25,所以如果我们有更多的字段,可以选择使用flink中的Row对象.


 //使用Row
  Row row = new Row(2);
  row.setField(0, "zhangsan");
  row.setField(1, 20);
  DataStream<Row> rowDataStream = env.fromElements(row);
  tableEnv.createTemporaryView("usersRow", rowDataStream, "name,age");
  Table tableRow = tableEnv.sqlQuery("select name,age from usersRow");
  tableEnv.toAppendStream(tableRow, Row.class).print();
   

使用java的Pojo类

首先定一个pojo类


 public static class User{
  private String name;
  private int age;

  public String getName(){
   return name;
  }

  public void setName(String name){
   this.name = name;
  }

  public int getAge(){
   return age;
  }

  public void setAge(int age){
   this.age = age;
  }
 }
 

定义这个pojo类是要符合flink的序列化规则,是有一定要求的,具体的可以参考【1】:

  1. 该类是public类型并且没有非静态内部类
  2. 该类拥有公有的无参构造器
  3. 类(以及所有超类)中的所有非静态、非 transient 字段都是公有的(非 final 的);或者遵循 Java bean 规则,字段是private的,但是具有public类型的 getter 和 setter 方法

User user = new User();
  user.setName("Tom");
  user.setAge(20);
  DataStream<User> userDataStream = env.fromElements(user);
  tableEnv.createTemporaryView("usersPojo", userDataStream);
  Table tablePojo = tableEnv.sqlQuery("select name,age from usersPojo");
  tableEnv.toAppendStream(tablePojo, Row.class).print();

 

如果使用的是java pojo类型的DataStream,就不用声明字段名称了,flink会自动解析pojo类中的字段名称和类型来作为table的字段和类型。 

使用外部存储


  //连接外部系统,比如文件,kafka等
  Schema schema = new Schema()
    .field("name", DataTypes.STRING())
    .field("age", DataTypes.INT());
  tableEnv.connect(new FileSystem().path("...."))
          .withFormat(new Csv())
          .withSchema(schema)
          .createTemporaryTable("usersFile");
  Table tableFile = tableEnv.sqlQuery("select name,age from usersFile");
  tableEnv.toAppendStream(tableFile, Row.class).print();
 

使用外部存储的时候需要指定以下对象:

  1. tableEnv.connect(ConnectorDescriptor ...) 指定连接符,目前flink支持Elasticsearch、hbase、kafka、filesystem这几类
  2. withFormat(FormatDescriptor format) 这个就是指定我们从上述数据源读取的数据的格式,比如json、csv、parquet等等
  3. .withSchema(Schema schema) 给我们的table定义一个schema,也就是字段的名称和类型,用于sql查询
  4. .createTemporaryTable("usersFile") 给表起一个名字,并且注册到StreamTableEnvironment中

到此,相信大家对“flink streaming sql怎么使用”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

推荐阅读:
  1. Flink实现Kafka到Mysql的Exactly-Once
  2. 初识Flink,你应该知道这些!

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

sql flink streaming

上一篇:如何解决php连接mssql失败的问题

下一篇:SAP Analytics Cloud里的Smart Insight功能是什么

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》