当前位置: 首页 > news >正文

关于flink两阶段提交高并发下程序卡住问题

先抛出代码

package com.dpf.flink;import com.dpf.flink.sink.MysqlSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;public class MysqlTwoPhaseCommit {//topicprivate static final String topic_ExactlyOnce = "TwoPhaseCommit";public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度,为了方便测试,查看消息的顺序,这里设置为1,可以更改为多并行度env.setParallelism(1);//checkpoint的设置//每隔10s进行启动一个检查点【设置checkpoint的周期】env.enableCheckpointing(30000);//设置模式为:exactly_one,仅一次语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//确保检查点之间有1s的时间间隔【checkpoint最小间隔】env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);//检查点必须在10s之内完成,或者被丢弃【checkpoint超时时间】env.getCheckpointConfig().setCheckpointTimeout(10000);//同一时间只允许进行一次检查点env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//表示一旦Flink程序被cancel后,会保留checkpoint数据,以便根据实际需要恢复到指定的checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//设置statebackend,将检查点保存在hdfs上面,默认保存在内存中。这里先保存到本地env.setStateBackend(new FsStateBackend("file:///Users/david.dong/tmp/flink/checkpoint"));//设置kafka消费参数Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, topic_ExactlyOnce);/*SimpleStringSchema可以获取到kafka消息,JSONKeyValueDeserializationSchema可以获取都消息的key,value,metadata:topic,partition,offset等信息*/FlinkKafkaConsumer<String> kafkaConsumer011 = new FlinkKafkaConsumer<>(topic_ExactlyOnce,new SimpleStringSchema(),properties);//加入kafka数据源DataStreamSource<String> streamSource = env.addSource(kafkaConsumer011);SingleOutputStreamOperator<Tuple2<String, Integer>> tupleStream = streamSource.map(str -> Tuple2.of(str, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));tupleStream.print();//数据传输到下游tupleStream.addSink(new MysqlSink()).name("MySqlTwoPhaseCommitSink");//触发执行env.execute("StreamDemoKafka2Mysql");}
}
package com.dpf.flink.sink;import com.dpf.flink.utils.DBConnectUtil;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.PreparedStatement;public class MysqlSink extends TwoPhaseCommitSinkFunction<Tuple2<String,Integer>, Connection,Void> {private static final Logger log = LoggerFactory.getLogger(MysqlSink.class);public MysqlSink() {super(new KryoSerializer<>(Connection.class,new ExecutionConfig()), VoidSerializer.INSTANCE);}/*** 执行数据库入库操作  task初始化的时候调用* @param connection* @param tuple* @param context* @throws Exception*/@Overrideprotected void invoke(Connection connection, Tuple2<String, Integer> tuple, Context context) throws Exception {log.info("start invoke...");String value = tuple.f0;Integer total = tuple.f1;String sql = "update student set name = 'aaa' where id = 1";log.info("====执行SQL:{}===",sql);PreparedStatement ps = connection.prepareStatement(sql);ps.setString(1, value);ps.setInt(2, total);ps.setLong(3, System.currentTimeMillis());log.info("要插入的数据:{}----{}",value,total);if (ps != null) {String sqlStr = ps.toString().substring(ps.toString().indexOf(":")+2);log.error("执行的SQL语句:{}",sqlStr);}//执行insert语句ps.execute();}/*** 获取连接,开启手动提交事物(getConnection方法中)* @return* @throws Exception*/@Overrideprotected Connection beginTransaction() throws Exception {log.info("start beginTransaction.......");String url = "jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true";Connection connection = DBConnectUtil.getConnection(url, "root", "12345678");return connection;}/***预提交,这里预提交的逻辑在invoke方法中* @param connection* @throws Exception*/@Overrideprotected void preCommit(Connection connection) throws Exception {log.info("start preCommit...");}/*** 如果invoke方法执行正常,则提交事务* @param connection*/@Overrideprotected void commit(Connection connection) {log.info("start commit...");DBConnectUtil.commit(connection);}/*** 如果invoke执行异常则回滚事物,下一次的checkpoint操作也不会执行* @param connection*/@Overrideprotected void abort(Connection connection) {log.info("start abort rollback...");DBConnectUtil.rollback(connection);}
}
package com.dpf.flink.utils;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;/*** 数据库连接工具类*/
public class DBConnectUtil {private static final Logger log = LoggerFactory.getLogger(DBConnectUtil.class);/*** 获取连接** @param url* @param user* @param password* @return* @throws SQLException*/public static Connection getConnection(String url, String user, String password) throws SQLException {Connection conn = null;try {Class.forName("com.mysql.jdbc.Driver");} catch (ClassNotFoundException e) {log.error("获取mysql.jdbc.Driver失败");e.printStackTrace();}try {conn = DriverManager.getConnection(url, user, password);log.info("获取连接:{" + conn + "} 成功...");} catch (Exception e) {log.error("获取连接失败,url:" + url + ",user:" + user);}//设置手动提交conn.setAutoCommit(false);return conn;}/*** 提交事务*/public static void commit(Connection conn) {if (conn != null) {try {conn.commit();} catch (SQLException e) {log.error("提交事务失败,Connection:" + conn);e.printStackTrace();} finally {close(conn);}}}/*** 事务回滚** @param conn*/public static void rollback(Connection conn) {if (conn != null) {try {conn.rollback();} catch (SQLException e) {log.error("事务回滚失败,Connection:" + conn);e.printStackTrace();} finally {close(conn);}}}/*** 关闭连接** @param conn*/public static void close(Connection conn) {if (conn != null) {try {conn.close();} catch (SQLException e) {log.error("关闭连接失败,Connection:" + conn);e.printStackTrace();}}}
}

这部分代码网上抄的,但是大致不差

前提:

1.source消息密集,全据并行度设置1

2.sink就执行update操作,并且就update同一条数据,为了更好验证问题

结果:

这边我尝试了很多次,中间有时候能顺利执行,但是有时候程序在sink这里卡住了,过一段时间就报错socket interrupt异常。

我的分析:

1.首先设置ck的间隔是10秒一次,那么当ck barrier到达sink算子的时候,就会进行预提交,并且立刻开启一个新事物用来处理后续的消息。那么这里就会出现多事务同时存在的情况,比如预提交的事务A,和新开启的事务B

2.新事物B开启后立刻就可以继续处理后续到来的消息。

3.那么此时如果事务A预提交后,他需要等待来自JobManager的complete指令,到代码层面也就是调用notifyComplete方法来进行commit。那么加入在这个期间,就是还没有收到complete指令的时候,事务B已经执行到了  ps.execute();这里,此时事务B就会卡住,因为他们都是操作同一条数据,那么问题来了,此时程序已经卡住了,也就是线程卡住了,那么此时就算JobManager发送complete指令了,然后调用notifyComplete方法,但是,此时没有线程执行这个方法!(因为主线程卡在了(ps.execute();这里)所以整个程序就彻底卡住了。

然后flnik dag上看到的是sink红了,前面的节点都黑了,就是背压瞬间就很严重了

以上是我对flink两阶段提交存在的问题的分析,我的source消息大概100多万,我分析是这个原因,如果分析的不对,还请大佬帮我分析下我哪里理解不对?或者为什么会卡住?

相关文章:

  • 【C++11】Lambda表达式
  • WPF大数据展示与分析性能优化方向及代码示例
  • 导览项目KD-Tree最近地点搜索优化
  • 用高德API提取广州地铁线路(shp、excel)
  • 【优选算法 | 滑动窗口】滑动窗口算法:高效处理子数组和子串问题
  • WPF核心技术解析与使用示例
  • WPF框架中异步、多线程、高性能、零拷贝技术的应用示例
  • 二、信息时代社会结构的转变
  • 我爱学算法之—— 二分查找(上)
  • 力扣HOT100——102.二叉树层序遍历
  • 解构与重构:“整体部分”视角下的软件开发思维范式
  • File,IO流,字符集
  • 25【干货】在Arcgis中根据字段属性重新排序并自动编号的方法(二)
  • 基于Tcp协议的应用层协议定制
  • Flask + ajax上传文件(三)--图片上传与OCR识别
  • 安服实习面试面经总结(也适合hvv蓝初)
  • 坚果派已适配的鸿蒙版flutter库【持续更新】
  • 什么是Lua模块?你会如何使用NGINX的Lua模块来定制请求处理流程?
  • 从“拼凑”到“构建”:大语言模型系统设计指南!
  • 【开源】基于51单片机的温湿度检测报警系统
  • 俄联邦安全局:俄军高级官员汽车爆炸案嫌疑人已被捕
  • 经济日报金观平:充分发挥增量政策的经济牵引力
  • 巴黎奥运后红土首秀落败,郑钦文止步马德里站次轮
  • 美称中美贸易谈判仍在进行中,外交部:美方不要混淆视听
  • 政治局会议:根据形势变化及时推出增量储备政策,加强超常规逆周期调节
  • 李良生已任应急管理部党委委员、政治部主任