flink例子-读取数据库

发布时间:2020-07-24 01:11:08 作者:大海之中
来源:网络 阅读:2427
private final static Logger logger = LoggerFactory.getLogger(GetData.class);

    public static void main(String[] arg) throws Exception {

        TypeInformation[] fieldTypes = new TypeInformation[] {

                BasicTypeInfo.STRING_TYPE_INFO

        };

        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);

        JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()

                .setDrivername("com.mysql.jdbc.Driver")

                .setDBUrl("jdbc:mysql://ip:3306/tablename?characterEncoding=utf8")

                .setUsername("*")

                .setPassword("*")

                .setQuery("select name from words")

                .setRowTypeInfo(rowTypeInfo)

                .finish();

    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    DataSource s = env.createInput(jdbcInputFormat); // datasource

    BatchTableEnvironment tableEnv = new BatchTableEnvironment(env, TableConfig.DEFAULT());

    tableEnv.registerDataSet("t2", s);

    tableEnv.sqlQuery("select * from t2").printSchema();

    Table query = tableEnv.sqlQuery("select * from t2");

    DataSet result = tableEnv.toDataSet(query, Row.class);

    result.print();

    System.out.println(s.count());

}

通过插件将所需的类打到一个jar中

<plugin>
                            <artifactId>maven-assembly-plugin</artifactId>
                            <configuration>
                                    <appendAssemblyId>false</appendAssemblyId>
                                    <descriptorRefs>
                                            <descriptorRef>jar-with-dependencies</descriptorRef>
                                    </descriptorRefs>
                                    <archive>
                                            <manifest>
                                                    <!-- 此处指定main方法入口的class -->
                                                    <mainClass>*</mainClass>
                                            </manifest>
                                    </archive>
                            </configuration>
                            <executions>
                                    <execution>
                                            <id>make-assembly</id>
                                            <phase>package</phase>
                                            <goals>
                                                    <goal>assembly</goal>
                                            </goals>
                                    </execution>
                            </executions>
                    </plugin>

然后执行

./bin/flink run  /flink-1.8.0/collector-api-0.1.jar
推荐阅读:
  1. 【Flink】Flink对于迟到数据的处理
  2. flink 读取hive的数据

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

flink例子-读取数据库 取数据

上一篇:Linux 基础命令

下一篇:不修改数组找出重复的数字(c语言)

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》