Flink 算子状态怎么用

发布时间:2021-12-31 10:46:52 作者:小新
来源:亿速云 阅读:258

这篇文章主要介绍Flink 算子状态怎么用,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

1. 算子状态分类

算子状态的作用范围限定为算子并行子任务。这意味着由同一并行子任务所处理的所有数据都可以访问到相同的状态,状态对于同一子任务而言是共享的。算子状态不能由相同或不同算子的另一个并行子任务访问。

Flink 算子状态怎么用

Flink为算子状态提供三种基本数据结构,主要介绍当并行度改变(扩缩容)时,从保存点重新启动时,算子状态如何分配:

  1. 列表状态(List state):将状态表示为一组数据的列表。

带有算子列表状态的算子在扩缩容时会对列表中的条目进行重新分配。理论上,所有并行算子任务的列表条目会被统一收集起来,随后均匀分配到更少或更多的任务之上。如果列表条目的数量小于算子新设置的并行度,部分任务在启动时的状态就可能为空。

Flink 算子状态怎么用

  1. 联合列表状态(Union list state) 也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障从保存点(savepoint)启动应用程序时进行恢复,如果并行度发生改变,带有算子联合列表状态的算子会在扩缩容时把状态列表的全部条目广播到全部任务上,随后由任务自己决定哪些条目应该保留,哪些应该丢弃。

对于同一个算子来说,假如之前的并行度为2,那么就会有两个子任务,也就是两个状态,假如改变其并行度为3,那么就把之前的两个状态,给每个并行子任务都发一份,这样每个并行子任务上都有所有的状态,然后由并行子任务去决定使用哪个状态。 Flink 算子状态怎么用

  1. 广播状态(Broadcast state):不同于普通的算子状态,每个并行子任务的状态相同。但是仍然是每个并行子任务访问自己的状态,但是状态都是一样的。 如果一个算子有多项任务,而它的每个并行子任务状态又都相同,那么这种特殊情况最适合应用广播状态。

带有算子广播状态的算子在扩缩容时会把状态拷贝到全部新任务上,这样做的原因是广播状态能确保所有任务的状态相同。在缩容的情况下,由于状态经过复制不会丢失,我们可以简单的停掉多出的任务。

2.算子状态的使用

public class StateTest1_OperatorState {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // socket文本流
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);

        // 转换成SensorReading类型
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        // 定义一个有状态的map操作,统计当前分区数据个数
        SingleOutputStreamOperator<Integer> resultStream = dataStream.map(new MyCountMapper());

        resultStream.print();

        env.execute();
    }

    // 自定义MapFunction
    public static class MyCountMapper implements MapFunction<SensorReading, Integer>, ListCheckpointed<Integer>{
        // 定义一个本地变量,作为算子状态
        private Integer count = 0;

        @Override
        public Integer map(SensorReading value) throws Exception {
            count++;
            return count;
        }

        @Override
        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(count);
        }

        @Override
        public void restoreState(List<Integer> state) throws Exception {
            for( Integer num: state )
                count += num;
        }
    }
}
  1. 算子状态的定义和普通的成员变量定义相同,但是对应的算子处理函数要继承对应的接口,例如ListCheckpointed,自定义状态进行快照和恢复的逻辑。

以上是“Flink 算子状态怎么用”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注亿速云行业资讯频道!

推荐阅读:
  1. Flink基础知识点有哪些
  2. Flink的checkpoint与savepoint的区别是什么

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

flink

上一篇:Flink如何读取数据源

下一篇:去除图片人物背景的工具Removebg如何使用

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》