当前位置: 首页 > news >正文

是谁还不会flink的checkpoint呀~

1、State Vs Checkpoint

State:状态,是Flink中某一个Operator在某一个时刻的状态,如maxBy/sum,注意State存的是历史数据/状态,存在内存中。

Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息

一句话概括: Checkpoint就是State的快照

目的:假设作业停止了,下次启动的时候可以加载快照中的状态数据,以此到达恢复数据的目的。

State backend(状态后端):状态后端用于控制状态保存的位置的。

2、设置Checkpoint 

 

package com.demo.day4;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.HashMap;
import java.util.Map;

public class CheckpointDemo03 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //设置checkpoint
        env.enableCheckpointing(1000);// 默认ck是关闭,这行代码开启checkpoint,并且控制每隔多久进行一次ck
        env.setStateBackend(new FsStateBackend("hdfs://hdfs-cluster/flink-checkpoint"));
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("hadoop11:9092,hadoop12:9092,hadoop13:9092")
                .setTopics("topic1")
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setGroupId("g1")
                .setStartingOffsets(OffsetsInitializer.latest())
                .build();
        DataStream<String> ds = env.fromSource(source, WatermarkStrategy.noWatermarks(),"source");

        DataStream<Tuple2<String, Long>> tupleDS = ds.map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String value) throws Exception {
                String[] arr = value.split(",");
                return Tuple2.of(arr[0],Long.parseLong(arr[1]));
            }
        });

        DataStream<Tuple3<String, Long, Long>> result2 = tupleDS.keyBy(t -> t.f0).map(new RichMapFunction<Tuple2<String, Long>, Tuple3<String, Long, Long>>() {

            private ValueState<Long> maxValueState = null;

            //-2.初始化状态(一次)
            @Override
            public void open(Configuration parameters) throws Exception {
                //创建状态描述器
                ValueStateDescriptor<Long> stateDescriptor = new ValueStateDescriptor<>("maxValueState", Long.class);
                //根据状态描述器获取状态
                maxValueState = getRuntimeContext().getState(stateDescriptor);
            }

            //-3.使用状态
            @Override
            public Tuple3<String, Long, Long> map(Tuple2<String, Long> value) throws Exception {
                Long currentValue = value.f1;//当前值
                Long maxValue = maxValueState.value();//历史最大值
                if (maxValue == null || currentValue > maxValue) {
                    maxValue = currentValue;//当前值作为最大值
                    //-4.更新状态
                    maxValueState.update(maxValue);
                }
                return Tuple3.of(value.f0, currentValue, maxValue);
            }

       

        });

        result2.print();

        env.execute();
    }
}

上传jar包到hdfs:

 运行:

 tijiao.sh:

#! /bin/bash

flink run-application \
-t yarn-application   \
-Dyarn.provided.lib.dirs="hdfs://hdfs-cluster/flink-dist"     \
-c com.demo.day4.CheckpointDemo03  \
hdfs://hdfs-cluster/my-flinkjars/flink-test-1.0-SNAPSHOT.jar

运行结果:

测试:

 

停止job:

再输入测试数据:

 再次运行(加入检查点/flink-checkpoint/4435e2599d6269a0b7095979a959d97d/chk-416):

 

 tijiao.sh:

#! /bin/bash

flink run-application \
-t yarn-application   \
-Dyarn.provided.lib.dirs="hdfs://hdfs-cluster/flink-dist"     \
-s hdfs://hdfs-cluster/flink-checkpoint/4435e2599d6269a0b7095979a959d97d/chk-416  \
-c com.demo.day4.CheckpointDemo03  \
hdfs://hdfs-cluster/my-flinkjars/flink-test-1.0-SNAPSHOT.jar

运行成功:

 

 查看(关闭任务后的数据可以成功显示):

 

再次测试:

 

 

3、设置savepoint 

 checkpoint自动完成state快照、savePoint是手动的完成快照。

 与checkpoint类似,将jar包上传到hdfs

执行job:

#! /bin/bash

flink run-application \
-t yarn-application   \
-Dyarn.provided.lib.dirs="hdfs://hdfs-cluster/flink-dist"     \
-c com.demo.day4.CheckpointDemo03  \
hdfs://hdfs-cluster/my-flinkjars/flink-test-1.0-SNAPSHOT.jar

查看此刻的yarnid(application_1725272814044_0002)以及jobid(f28c53d5ccf838c8d5e7ca7b8c45e5b0) 

 

测试:

 

 执行savepoint操作(2种方法)

停止flink job,并且触发savepoint操作
flink stop --savepointPath  hdfs://hdfs-cluster/flink-savepoint  f28c53d5ccf838c8d5e7ca7b8c45e5b0 -yid application_1725272814044_0002

不会停止flink的job,只是完成savepoint操作
语法:flink savepoint jobId  savePointpath -yid yarn applicationId 

flink savepoint f28c53d5ccf838c8d5e7ca7b8c45e5b0 hdfs://hdfs-cluster/flink-savepoint -yid application_1725272814044_0002

运行第一种:

flink stop --savepointPath  hdfs://hdfs-cluster/flink-savepoint  f28c53d5ccf838c8d5e7ca7b8c45e5b0 -yid application_1725272814044_0002

 关闭job,再次测试:

 查看最近完成的flink job对应的savepoint,进行快照恢复:

 根据之前的savepoint路径,重新启动flink job

#! /bin/bash

flink run-application \
-t yarn-application   \
-Dyarn.provided.lib.dirs="hdfs://hdfs-cluster/flink-dist"     \
-c com.demo.day4.CheckpointDemo03  \
-s hdfs://hdfs-cluster/flink-savepoint/savepoint-f28c53-23e87194a6b2 \
hdfs://hdfs-cluster/my-flinkjars/flink-test-1.0-SNAPSHOT.jar

再次查看(中断时输入的数据): 

再次输入数据: 

 

 4、Dinkey--checkpoint 

set 'execution.checkpointing.interval'='2sec';
set 'state.checkpoints.dir'='hdfs://hdfs-cluster/flink-checkpoint';
set 'execution.checkpointing.externalized-checkpoint-retention'='RETAIN_ON_CANCELLATION';

CREATE TABLE table1 (
  actionTime BIGINT,
  monitorId string,
  cameraId  string,
  car     string,
  speed    double,
 roadId string,
 areaId string,
 event_time  as proctime()     -- 计算列(时间类型为timestamp)
) WITH (
  'connector' = 'kafka',
  'topic' = 'topic1',
  'properties.bootstrap.servers' = 'hadoop11:9092,hadoop12:9092,hadoop13:9092',
  'properties.group.id' = 'g1',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

create table table2(
      id     int,
  start_time  TIMESTAMP,
  end_time    TIMESTAMP,
  monitor_id  string,
  avg_speed   double,
  car_count   BIGINT
)WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://hadoop11:3306/cars',
    'table-name' = 't_average_speed',
    'username' = 'root',
    'password' = '123456'
    );

insert into table2
SELECT CAST(null as int) as id,
       window_start as start_time,
       window_end as end_time,
       monitorId as monitor_id,
       AVG(speed) as avg_speed,
       COUNT(DISTINCT car) as car_count
from table(HOP(table table1,DESCRIPTOR(event_time),INTERVAL '30' SECONDS,INTERVAL '60' SECONDS))
group by monitorId,window_start,window_end;

 

相关文章:

  • Java传输本地目录到远程服务器
  • WSL 在 Windows 上删除已经安装的 Ubuntu | WSL 再次重装 Ubuntu | cv2.imshow() 弹窗支持
  • Java语言的Netty框架+云快充协议1.5+充电桩系统+新能源汽车充电桩系统源码
  • 基于FreeRTOS的STM32多功能手表
  • Java Spring Boot 项目中的密码加密与验证开发案例手册
  • Redis集群
  • C++11 ---- 右值引用和移动语义
  • SpringBoot依赖之Spring Boot DevTools热部署开发增效工具
  • 【Spring Boot 3】【Web】解析获取HTTP请求参数
  • 2. 卷积神经网络(CNN):图像识别的核心技术
  • MySQL 数据库管理与操作指南
  • k8s集群本地搭建,使用gitlab、harbor、jenkens、cicd来实现devops自动化构建
  • c++中析构函数是否一定要为虚函数
  • 轴承知识大全,详细介绍(附3D图纸免费下载)
  • APP长文本内容编辑器功能实现方案
  • Linux 一个简单的中断信号实现
  • 【网络安全 | 渗透工具】Cencys+Shodan使用教程
  • 【C语言】---- 位操作处理
  • KCP实现原理探析
  • 【python计算机视觉编程——4.照相机模型与增强现实】
  • 江苏银行一季度净赚近98亿增逾8%,不良贷款率微降
  • 吕国范任河南省人民政府副省长
  • 中方会否公布铁线礁的领海基线?外交部:中方执法活动旨在反制菲方侵权挑衅
  • 美情报机构攻击中国大型商用密码产品提供商,调查报告公布
  • 新干式二尖瓣瓣膜国内上市,专家:重视瓣膜病全生命周期管理
  • 特朗普说克里米亚将留在俄罗斯,泽连斯基:绝不承认