Flink如何新增connectors模块

发布时间:2021-12-31 10:25:22 作者:iii
来源:亿速云 阅读:315

本篇内容介绍了“Flink如何新增connectors模块”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

Flink中的API

Flink 为流式/批式处理应用程序的开发提供了不同级别的抽象。 

Flink如何新增connectors模块

 DataStream/DateSet API

Flink中的DataStream和DataSet程序是常规程序,可对数据流实施转换(例如,过滤,更新状态,定义窗口,聚合)。最初从各种来源(例如,消息队列,套接字流,文件)创建数据流。结果通过接收器返回,接收器可以例如将数据写入文件或标准输出(例如命令行终端)。Flink程序可在各种上下文中运行,独立运行或嵌入其他程序中。执行可以在本地JVM或许多计算机的群集中进行。

预定义的 Source 和 Sink

一些比较基本的 Source 和 Sink 已经内置在 Flink 里。 预定义 data sources 支持从文件、目录、socket,以及 collections 和 iterators 中读取数据。 预定义 data sinks 支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket。

官方文档

https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/

 DataStream/DateSet API开发

从本篇开始,增加DataStream/DateSet API演示内容,在原有的工程基础上,扩展一个connectors模块;此模块会演示以下几个组件简单使用;

新增connectors模块

在当前工程中,创建名称为connectors的maven工程模块

pom.xml

   <artifactId>connectors</artifactId>

    <dependencies>
        <!-- Flink jdbc依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.11</artifactId>
            <version>1.10.1</version>
        </dependency>
        <!-- mysql驱动包 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>
        <!-- kafka依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- redis依赖 -->
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>
        <!-- rabbitMq依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-rabbitmq_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- elasticsearch7依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

刷新工程maven,下载相关功能依赖组件包;

创建用户表(演示使用)

-- 数所据库 flink 下创建用户表
CREATE TABLE `t_user` (
  `id` int(8) NOT NULL AUTO_INCREMENT,
  `name` varchar(40) DEFAULT NULL,
  `age` int(3) DEFAULT NULL,
  `sex` int(2) DEFAULT NULL,
  `address` varchar(40) DEFAULT NULL,
  `createTime` timestamp NULL DEFAULT NULL,
  `createTimeSeries` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4

创建实体Bean(演示使用)

TUser.java

package com.flink.examples;

/**
 * @Description t_user表数据封装类
 */
public class TUser {

    private Integer id;
    private String name;
    private Integer age;
    private Integer sex;
    private String address;
    private Long createTimeSeries;

    public TUser(){}

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    public Integer getSex() {
        return sex;
    }

    public void setSex(Integer sex) {
        this.sex = sex;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public Long getCreateTimeSeries() {
        return createTimeSeries;
    }

    public void setCreateTimeSeries(Long createTimeSeries) {
        this.createTimeSeries = createTimeSeries;
    }

    @Override
    public String toString() {
        return "TUser{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", age=" + age +
                ", sex=" + sex +
                ", address='" + address + '\'' +
                ", createTimeSeries=" + createTimeSeries +
                '}';
    }
}

TCount.java

package com.flink.examples;

/**
 * @Description 统计表
 */
public class TCount {

    /**
     * 性别
     */
    private Integer sex;
    /**
     * 数量
     */
    private Integer num;

    public TCount(){}

    public TCount(Integer sex, Integer num){
        this.sex = sex;
        this.num = num;
    }

    public Integer getSex() {
        return sex;
    }

    public void setSex(Integer sex) {
        this.sex = sex;
    }

    public Integer getNum() {
        return num;
    }

    public void setNum(Integer num) {
        this.num = num;
    }
}

工程模块

Flink如何新增connectors模块

“Flink如何新增connectors模块”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!

推荐阅读:
  1. 【Flink】Flink对于迟到数据的处理
  2. nginx 编译新增加模块

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

connectors flink

上一篇:Flink的SessionWindow怎么用

下一篇:Flink中Filter怎么用

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》