csdn推荐
一、Flink 状态介绍 1. 流处理的无状态和有状态
2. Flink 的状态管理 二、Flink 状态分类 1. 托管状态
Managed State,所有的托管状态都由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现
1.1 算子状态
Operator State,状态作用范围限定为当前的算子任务实例,只对当前的并行子任务实例有效;使用较少
1.1.1 算子状态数据结构 1.1.2 案例
public class TestFlinkOperatorState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));
});
//定义一个有状态的map算子,用于统计输入数据个数
DataStream<Integer> resultStream = dataStream.map(new MyCountMapper());
resultStream.print();
env.execute();
}
//定义有状态的 map 操作
//实现 ListCheckpointed 接口,泛型为状态数据类型
public static class MyCountMapper implements MapFunction<SensorReading, Integer>, ListCheckpointed<Integer> {
//定义一个本地变量作为状态
private Integer count = 0;
@Override
public Integer map(SensorReading value) throws Exception {
count++;
return count;
}
//对状态做快照
@Override
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(count);
}
//容错恢复状态
@Override
public void restoreState(List<Integer> state) throws Exception {
for(Integer num : state) {
count += num;
}
}
}
}
1.2 按键分区状态
Keyed State,状态的作用范围以 key 来隔离,是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,即 keyBy 之后才可以使用
1.2.1 按键分区状态数据结构
//按键分区状态的实例化方法:在富函数中,调用 getRuntimeContext() 方法获取到运行时上下文之后
ValueState<T> getState(ValueStateDescriptor<T>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
ListState<T> getListState(ListStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
列表状态:ListState,将状态表示为一组数据的列表,列表里的元素的数据类型为 T 映射状态:MapState,将状态表示为一组 Key-Value 对 聚合状态:ReducingState 和 AggregatingState,将状态表示为一个用于聚合操作的列表 1.2.2 案例
/**
按键分区状态的使用步骤:
1. 在自定义算子Function中声明一个按键分区数据结构,由于声明时需要使用 getRuntimeContext(),因此要使用继承富函数类的方式自定义算子Function
2. 在自定义算子Function的对应算子方法中进行状态的读写等相关操作
*/
public class TestFlinkKeyedState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));
});
/*
需求:自定义有状态的map算子,按sensor_id统计个数
*/
//使用按键分区状态必须先进行keyBy
DataStream<Integer> resultStream = dataStream.keyBy("id").map(new MyKeyCountMapper());
resultStream.print();
env.execute();
}
//使用继承富函数类的方式自定义MapFunction
public static class MyKeyCountMapper extends RichMapFunction<SensorReading, Integer> {
//定义一个值状态属性
private ValueState<Integer> myValueState;
//在open方法中实例化值状态
@Override
public void open(Configuration parameters) throws Exception {
myValueState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("value-state", Integer.class));
}
@Override
public Integer map(SensorReading value) throws Exception {
//获取状态值
Integer count = myValueState.value();
if(count == null) {
count = 0;
}
count++;
//更新状态值
myValueState.update(count);
return count;
}
}
}
2. 原始状态
Raw State,原始状态是自定义的,相当于开辟了一块内存,需要开发者自己管理,实现状态的序列化和故障恢复
三、Flink 状态编程案例
/**
需求:检测同一个传感器的温度值,如果连续的两个温度差值超过 10 度,就输出报警信息
*/
public class FlinkKeyedStateCase {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
//定义一个有状态的 flatMap 操作,若同一个传感器连续两个温度的差值超过 10 度,则输出报警
//报警信息:sensor_id,前一次温度值,当前温度值
DataStream<Tuple3<String, Double, Double>> warningStream = dataStream.keyBy("id").flatMap(new TempChangeWarning(10.0));
warningStream.print();
env.execute();
}
//使用继承富函数类的方式自定义FlatMapFunction
public static class TempChangeWarning extends RichFlatMapFunction<SensorReading, Tuple3<String, Double, Double>> {
//定义温度差阈值属性
private Double threshold;
//定义值状态属性,保存上一次的温度值
private ValueState<Double> lastTempState;
public TempChangeWarning(Double threshold) {
this.threshold = threshold;
}
//在open方法中实例化值状态
@Override
public void open(Configuration parameters) throws Exception {
lastTempState = getRuntimeContext().getState(new ValueStateDescriptor("last-temp", Double.class));
}
//重写flatMap方法
@Override
public void flatMap(SensorReading value, Collector<Tuple3<String, Double, Double>> out) throws Exception {
//获取上一次温度状态值
Double lastTemp = lastTempState.value();
//如果状态值不为null,则进行差值判断
if(lastTemp != null) {
Double diff = Math.abs(lastTemp - value.getTemperature());
//差值超过阈值,则输出报警信息
if(diff >= threshold) {
out.collect(new Tuple3<>(value.getId(), lastTemp, value.getTemperature()));
}
}
//更新状态值
lastTempState.update(value.getTemperature());
}
//在close方法中清空状态
@Override
public void close() throws Exception {
lastTempState.clear();
}
}
}
四、Flink 状态后端
State Backends,一个可插入的决定状态的存储、访问以及维护等工作的组件
1. 介绍
在 Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backends)。状态后端主要负责两件事:一是本地的状态管理,二是将检查点(checkpoint)写入远程的持久化存储。
2. 分类 3. 配置 3.1 配置文件配置 3.2 代码配置
在代码中为每个作业单独配置状态后端
public class TestStatebackend {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//配置状态后端
//1.MemoryStateBackend
env.setStateBackend(new MemoryStateBackend());
//2.FsStateBackend
env.setStateBackend(new FsStateBackend("hdfs://......"));
//3.RocksDBStateBackend,需要先引入依赖
env.setStateBackend(new RocksDBStateBackend("checkpointDataUri"));
DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));
});
dataStream.print();
env.execute();
}
}
文章来源:https://blog.csdn.net/weixin_44480009/article/details/139565222
微信扫描下方的二维码阅读本文
暂无评论内容