FlinkSQL怎么搭建

发布时间:2021-12-23 16:19:55 作者:iii
来源:亿速云 阅读:170

本篇内容主要讲解“FlinkSQL怎么搭建”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“FlinkSQL怎么搭建”吧!

1.背景

由于公司内部需求较多,并不想每次都写一个 streaming 程序,故而开始搭建 flinksql 平台,基于 jdk1.8,flink1.12.x

2.效果

传一个 sql 文件给 jar 包,然后 sql 文件内的 sql 将自动执行

3. jar 包 vs web 界面

调研了基于 web 的 zeppline

  1. zeppline 设计的初衷其实是为了交互式分析

  2. 基于 zeppline rest api 与现有的监控不兼容,需要修改现有监控的代码

  3. 虽然带有 web 界面的对用户很是友好,对于分析人员来说,是一个不错的选择,但对于开发人员来说,真正的线上长时间的运行程序,开发成 HA 的 server 还是有必要的

基于以上 3 点最终选择 jar 作为最终的方式

4. 使用

  1. 将 sql 写入 xxx.sql 文件中,如

CREATE TEMPORARY FUNCTION MillisecondsToDateStr AS 'io.github.shengjk.udf.MillisecondsToDateStr' LANGUAGE JAVA;-- ExecutionCheckpointingOptionsset execution.checkpointing.mode=EXACTLY_ONCE;set execution.checkpointing.timeout=30 min;--  30minset execution.checkpointing.interval=1 min ; -- 1minset execution.checkpointing.externalized-checkpoint-retention=RETAIN_ON_CANCELLATION;-- ExecutionConfigOptionsset table.exec.state.ttl=1 day;  -- 1 dayset table.exec.mini-batch.enabled=true; -- enable mini-batch optimizationset table.exec.mini-batch.allow-latency=1 s; -- 1sset table.exec.mini-batch.size=1000;set table.exec.sink.not-null-enforcer=drop;-- -- dadadadadadaCREATE TABLE orders(
   status      int,
   courier_id  bigint,
   id          bigint,
   finish_time BIGINT)WITH (
   'connector' = 'kafka','topic' = 'canal_monitor_order',
   'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup',
   'format' = 'ss-canal-json','ss-canal-json.table.include' = 'orders','scan.startup.mode' = 'earliest-offset');-- flink.partition-discovery.interval-millis;CREATE TABLE infos(
   info_index int,
   order_id   bigint)WITH (
   'connector' = 'kafka','topic' = 'canal_monitor_order',
   'properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup',
   'format' = 'ss-canal-json','ss-canal-json.table.include' = 'infos','scan.startup.mode' = 'earliest-offset');CREATE TABLE redisCache(
   finishOrders BIGINT,
   courier_id   BIGINT,
   dayStr       String)WITH (
   'connector' = 'redis',
   'hostPort'='localhost:6400',
   'keyType'='hash',
   'keyTemplate'='test2_${courier_id}',
   'fieldTemplate'='${dayStr}',
   'valueNames'='finishOrders',
   'expireTime'='259200');create view temp asselect o.courier_id,  (CASE   WHEN sum(infosMaxIndex.info_index) is null then 0   else sum(infosMaxIndex.info_index) end) finishOrders,  o.status,  dayStrfrom ((select courier_id, id, last_value(status)                             status, MillisecondsToDateStr(finish_time, 'yyyyMMdd') dayStr      from orders      where status = 60  group by courier_id, id, MillisecondsToDateStr(finish_time, 'yyyyMMdd'))) oleft join (select max(info_index) info_index, order_id                   from infos                   group by order_id) infosMaxIndex on o.id = infosMaxIndex.order_idgroup by o.courier_id, o.status, dayStr;INSERT INTO redisCache SELECT finishOrders,courier_id,dayStr FROM temp;
  1. 将 flinksql-platform 打包并上传至服务器

  2. 将必要的 connector jar 放入到相应的目录下

  3. 执行,如

flink-1.12.0/bin/flink  run -p 3 -yt ./flinkjar/  -C file:///home/shengjk/flinkjar/test-udf.jar -C file:///home/shengjk/flinkjar/jedis-2.10.2.jar  -m yarn-cluster -ynm sqlDemo  -c io.github.shengjk.Main ./flinksql-platform-1.0-SNAPSHOT.jar --sqlPath ./xxx.sql

其中
-C 添加 udfJar 等第三方 jar 包 -C 参数apply到了client端生成的JobGraph里,然后提交JobGraph来运行的
-yt 目录 将 udfJar 等第三方 jar 包提交到 TaskManager 上

到此,相信大家对“FlinkSQL怎么搭建”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

推荐阅读:
  1. FlinkSQL中窗口的功能及实例用法
  2. 如何使用FlinkSQL内置函数

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

flinksql

上一篇:flume的使用方法是什么

下一篇:mysql中出现1053错误怎么办

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》