详解 Flink 的状态管理

csdn推荐

一、Flink 状态介绍 1. 流处理的无状态和有状态

2. Flink 的状态管理 二、Flink 状态分类 1. 托管状态

Managed State,所有的托管状态都由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现

1.1 算子状态

Operator State,状态作用范围限定为当前的算子任务实例,只对当前的并行子任务实例有效;使用较少

1.1.1 算子状态数据结构 1.1.2 案例

public class TestFlinkOperatorState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
        
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        //定义一个有状态的map算子,用于统计输入数据个数
        DataStream<Integer> resultStream = dataStream.map(new MyCountMapper());
        
        resultStream.print();
        
        env.execute();
        
    }
    
    //定义有状态的 map 操作
    //实现 ListCheckpointed 接口,泛型为状态数据类型
    public static class MyCountMapper implements MapFunction<SensorReading, Integer>, ListCheckpointed<Integer> {
        //定义一个本地变量作为状态
        private Integer count = 0;
        
        @Override
        public Integer map(SensorReading value) throws Exception {
            count++;
            return count;
        }
        
        //对状态做快照
        @Override
        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(count);
        }
        
        //容错恢复状态
        @Override
        public void restoreState(List<Integer> state) throws Exception {
            for(Integer num : state) {
                count += num;
            }
        }
        
    }
    
}

1.2 按键分区状态

Keyed State,状态的作用范围以 key 来隔离,是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,即 keyBy 之后才可以使用

1.2.1 按键分区状态数据结构

//按键分区状态的实例化方法:在富函数中,调用 getRuntimeContext() 方法获取到运行时上下文之后
ValueState<T> getState(ValueStateDescriptor<T>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
ListState<T> getListState(ListStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
AggregatingState<IN,  OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)

列表状态:ListState,将状态表示为一组数据的列表,列表里的元素的数据类型为 T 映射状态:MapState,将状态表示为一组 Key-Value 对 聚合状态:ReducingState 和 AggregatingState,将状态表示为一个用于聚合操作的列表 1.2.2 案例

/**
	按键分区状态的使用步骤:
		1. 在自定义算子Function中声明一个按键分区数据结构,由于声明时需要使用 getRuntimeContext(),因此要使用继承富函数类的方式自定义算子Function
		2. 在自定义算子Function的对应算子方法中进行状态的读写等相关操作
*/
public class TestFlinkKeyedState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
        
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        
        /*
        	需求:自定义有状态的map算子,按sensor_id统计个数
        */
        //使用按键分区状态必须先进行keyBy
        DataStream<Integer> resultStream = dataStream.keyBy("id").map(new MyKeyCountMapper());
        
        resultStream.print();
        
        env.execute();
    }
    
    //使用继承富函数类的方式自定义MapFunction
    public static class MyKeyCountMapper extends RichMapFunction<SensorReading, Integer> {
        
        //定义一个值状态属性
        private ValueState<Integer> myValueState;
        
        //在open方法中实例化值状态
        @Override
        public void open(Configuration parameters) throws Exception {
            myValueState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("value-state", Integer.class));
        }
        
        @Override
        public Integer map(SensorReading value) throws Exception {
            //获取状态值
            Integer count = myValueState.value();
            if(count == null) {
                count = 0;
            }
            
            count++;
                
            //更新状态值
            myValueState.update(count);
            
            return count;
        }
                
    }
}

2. 原始状态

Raw State,原始状态是自定义的,相当于开辟了一块内存,需要开发者自己管理,实现状态的序列化和故障恢复

三、Flink 状态编程案例

/**
	需求:检测同一个传感器的温度值,如果连续的两个温度差值超过 10 度,就输出报警信息
*/
public class FlinkKeyedStateCase {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        //定义一个有状态的 flatMap 操作,若同一个传感器连续两个温度的差值超过 10 度,则输出报警
        //报警信息:sensor_id,前一次温度值,当前温度值
        DataStream<Tuple3<String, Double, Double>> warningStream = dataStream.keyBy("id").flatMap(new TempChangeWarning(10.0));
        
        warningStream.print();
        
        env.execute();
    }
    
    //使用继承富函数类的方式自定义FlatMapFunction
    public static class TempChangeWarning extends RichFlatMapFunction<SensorReading, Tuple3<String, Double, Double>> {
        //定义温度差阈值属性
        private Double threshold;
        //定义值状态属性,保存上一次的温度值
        private ValueState<Double> lastTempState;
        
        public TempChangeWarning(Double threshold) {
            this.threshold = threshold;
        }
        
        //在open方法中实例化值状态
        @Override
        public void open(Configuration parameters) throws Exception {
            lastTempState = getRuntimeContext().getState(new ValueStateDescriptor("last-temp", Double.class));
        }
        
        //重写flatMap方法
        @Override
        public void flatMap(SensorReading value, Collector<Tuple3<String, Double, Double>> out) throws Exception {
            //获取上一次温度状态值
            Double lastTemp = lastTempState.value();
            
            //如果状态值不为null,则进行差值判断
            if(lastTemp != null) {
                Double diff = Math.abs(lastTemp - value.getTemperature());
                //差值超过阈值,则输出报警信息
                if(diff >= threshold) {
                    out.collect(new Tuple3<>(value.getId(), lastTemp, value.getTemperature()));
                }
            }
            
            //更新状态值
            lastTempState.update(value.getTemperature());
        }
        
        //在close方法中清空状态
        @Override
        public void close() throws Exception {
            lastTempState.clear();
        }
    }
}

四、Flink 状态后端

State Backends,一个可插入的决定状态的存储、访问以及维护等工作的组件

1. 介绍

​ 在 Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backends)。状态后端主要负责两件事:一是本地的状态管理,二是将检查点(checkpoint)写入远程的持久化存储。

2. 分类 3. 配置 3.1 配置文件配置 3.2 代码配置

在代码中为每个作业单独配置状态后端

public class TestStatebackend {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        //配置状态后端
        //1.MemoryStateBackend
        env.setStateBackend(new MemoryStateBackend());
        
        //2.FsStateBackend
        env.setStateBackend(new FsStateBackend("hdfs://......"));
        
        //3.RocksDBStateBackend,需要先引入依赖
        env.setStateBackend(new RocksDBStateBackend("checkpointDataUri"));
        
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
        
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));
        });
    
        dataStream.print();
        
        env.execute();
    }
}

文章来源:https://blog.csdn.net/weixin_44480009/article/details/139565222



微信扫描下方的二维码阅读本文

© 版权声明
THE END
喜欢就支持一下吧
点赞12 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容