是谁还不会flink的checkpoint呀~
1、State Vs Checkpoint
State:状态,是Flink中某一个Operator在某一个时刻的状态,如maxBy/sum,注意State存的是历史数据/状态,存在内存中。
Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息
一句话概括:
Checkpoint就是State的快照
目的:假设作业停止了,下次启动的时候可以加载快照中的状态数据,以此到达恢复数据的目的。
State backend(状态后端):状态后端用于控制状态保存的位置的。
2、设置Checkpoint
package com.demo.day4;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.HashMap;
import java.util.Map;
public class CheckpointDemo03 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//设置checkpoint
env.enableCheckpointing(1000);// 默认ck是关闭,这行代码开启checkpoint,并且控制每隔多久进行一次ck
env.setStateBackend(new FsStateBackend("hdfs://hdfs-cluster/flink-checkpoint"));
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("hadoop11:9092,hadoop12:9092,hadoop13:9092")
.setTopics("topic1")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setGroupId("g1")
.setStartingOffsets(OffsetsInitializer.latest())
.build();
DataStream<String> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(),"source");
DataStream<Tuple2<String, Long>> tupleDS = ds.map(new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] arr = value.split(",");
return Tuple2.of(arr[0],Long.parseLong(arr[1]));
}
});
DataStream<Tuple3<String, Long, Long>> result2 = tupleDS.keyBy(t -> t.f0).map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {
private ValueState<Long> maxValueState = null;
//-2.初始化状态(一次)
@Override
public void open(Configuration parameters) throws Exception {
//创建状态描述器
ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<>("maxValueState", Long.class);
//根据状态描述器获取状态
maxValueState = getRuntimeContext().getState(stateDescriptor);
}
//-3.使用状态
@Override
public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {
Long currentValue = value.f1;//当前值
Long maxValue = maxValueState.value();//历史最大值
if (maxValue == null || currentValue > maxValue) {
maxValue = currentValue;//当前值作为最大值
//-4.更新状态
maxValueState.update(maxValue);
}
return Tuple3.of(value.f0, currentValue, maxValue);
}
});
result2.print();
env.execute();
}
}
上传jar包到hdfs:
运行:
tijiao.sh:
#! /bin/bash
flink run-application \
-t yarn-application \
-Dyarn.provided.lib.dirs="hdfs://hdfs-cluster/flink-dist" \
-c com.demo.day4.CheckpointDemo03 \
hdfs://hdfs-cluster/my-flinkjars/flink-test-1.0-SNAPSHOT.jar
运行结果:
测试:
停止job:
再输入测试数据:
再次运行(加入检查点/flink-checkpoint/4435e2599d6269a0b7095979a959d97d/chk-416):
tijiao.sh:
#! /bin/bash
flink run-application \
-t yarn-application \
-Dyarn.provided.lib.dirs="hdfs://hdfs-cluster/flink-dist" \
-s hdfs://hdfs-cluster/flink-checkpoint/4435e2599d6269a0b7095979a959d97d/chk-416 \
-c com.demo.day4.CheckpointDemo03 \
hdfs://hdfs-cluster/my-flinkjars/flink-test-1.0-SNAPSHOT.jar
运行成功:
查看(关闭任务后的数据可以成功显示):
再次测试:
3、设置savepoint
checkpoint自动完成state快照、savePoint是手动的完成快照。
与checkpoint类似,将jar包上传到hdfs
执行job:
#! /bin/bash
flink run-application \
-t yarn-application \
-Dyarn.provided.lib.dirs="hdfs://hdfs-cluster/flink-dist" \
-c com.demo.day4.CheckpointDemo03 \
hdfs://hdfs-cluster/my-flinkjars/flink-test-1.0-SNAPSHOT.jar
查看此刻的yarnid(application_1725272814044_0002)以及jobid(f28c53d5ccf838c8d5e7ca7b8c45e5b0)
测试:
执行savepoint操作(2种方法)
停止flink job,并且触发savepoint操作
flink stop --savepointPath hdfs://hdfs-cluster/flink-savepoint f28c53d5ccf838c8d5e7ca7b8c45e5b0 -yid application_1725272814044_0002
不会停止flink的job,只是完成savepoint操作
语法:flink savepoint jobId savePointpath -yid yarn applicationId
flink savepoint f28c53d5ccf838c8d5e7ca7b8c45e5b0 hdfs://hdfs-cluster/flink-savepoint -yid application_1725272814044_0002
运行第一种:
flink stop --savepointPath hdfs://hdfs-cluster/flink-savepoint f28c53d5ccf838c8d5e7ca7b8c45e5b0 -yid application_1725272814044_0002
关闭job,再次测试:
查看最近完成的flink job对应的savepoint,进行快照恢复:
根据之前的savepoint路径,重新启动flink job
#! /bin/bash
flink run-application \
-t yarn-application \
-Dyarn.provided.lib.dirs="hdfs://hdfs-cluster/flink-dist" \
-c com.demo.day4.CheckpointDemo03 \
-s hdfs://hdfs-cluster/flink-savepoint/savepoint-f28c53-23e87194a6b2 \
hdfs://hdfs-cluster/my-flinkjars/flink-test-1.0-SNAPSHOT.jar
再次查看(中断时输入的数据):
再次输入数据:
4、Dinkey--checkpoint
set 'execution.checkpointing.interval'='2sec';
set 'state.checkpoints.dir'='hdfs://hdfs-cluster/flink-checkpoint';
set 'execution.checkpointing.externalized-checkpoint-retention'='RETAIN_ON_CANCELLATION';
CREATE TABLE table1 (
actionTime BIGINT,
monitorId string,
cameraId string,
car string,
speed double,
roadId string,
areaId string,
event_time as proctime() -- 计算列(时间类型为timestamp)
) WITH (
'connector' = 'kafka',
'topic' = 'topic1',
'properties.bootstrap.servers' = 'hadoop11:9092,hadoop12:9092,hadoop13:9092',
'properties.group.id' = 'g1',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
create table table2(
id int,
start_time TIMESTAMP,
end_time TIMESTAMP,
monitor_id string,
avg_speed double,
car_count BIGINT
)WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://hadoop11:3306/cars',
'table-name' = 't_average_speed',
'username' = 'root',
'password' = '123456'
);
insert into table2
SELECT CAST(null as int) as id,
window_start as start_time,
window_end as end_time,
monitorId as monitor_id,
AVG(speed) as avg_speed,
COUNT(DISTINCT car) as car_count
from table(HOP(table table1,DESCRIPTOR(event_time),INTERVAL '30' SECONDS,INTERVAL '60' SECONDS))
group by monitorId,window_start,window_end;