DataStreamAPI实践原理——快速上手
引入
通过编程模型,我们知道Flink的编程模型提供了多层级的抽象,越上层的API,其描述性和可阅读性越强,越下层API,其灵活度高、表达力越强,多数时候上层API能做到的事情,下层API也能做到,反过来未必,不过这些API的底层模型是一致的,可以混合使用。
Flink架构可以处理批和流,Flink 批处理数据需要使用到Flink中的DataSet API,此API主要是支持
Flink针对批数据进行操作,本质上Flink处理批数据也是看成一种特殊的流处理(有界流),所以没有必要分成批和流两套API,从Flink1.12版本往后,Dataset API 已经标记为Legacy(已过时),已被官方软弃用,官方建议使用Table API 或者SQL 来处理批数据,我们也可以使用带有Batch执行模式的DataStream API来处理批数据(DataSet和DataStream API做到了合并),而在后续Flink版本中DataSet API 也被删除。
DataStream API的学习对于理解Flink数据处理流程非常重要,下面我们先从核心API层开始学习,通过基于DataStream API 的编程实践,去学习Flink编程方式,处理数据流程以及转换处理。
现在我们先通过数据处理最经典的WordCount案例,来快速上手Flink的DataStream API开发。
代码编写流程
我们知道Flink编程模型主要有数据源、转换操作和数据输出三个部分,而实际开发编程的时候,则会多两个部分:
- 初始化上下文环境(Environment)
Environment是编写Flink程序的基础,不同层级API编程中创建的Environment环境不同,如:Dataset 编程中需要创建ExecutionEnvironment,DataStream编程中需要创建
StreamExecutionEnvironment,在Table和SQL API中需要创建TableExecutionEnvironment,使用不同语言编程导入的包也不同,在获取到对应的Environment后我们还可以进行外参数的配置,例如:并行度、容错机制设置等。 - 数据源(DataSource)<可以有多个>
DataSource部分主要定义了数据接入功能,主要是将外部数据接入到Flink系统中并转换成DataStream对象供后续的转换使用。 - 转换操作(Transformation)
Transformation部分有各种各样的算子操作可以对DataStream流进行转换操作,最终将转换结果数据通过DataSink写出到外部存储介质中,例如:文件、数据库、Kafka消息系统等。 - 数据输出(DataSink)
经过一系列Transformation转换操作后,最后一定要调用Sink操作,才会形成一个完整的DataFlow拓扑。只有调用了Sink操作,才会产生最终的计算结果,这些数据可以写入到的文件、输出到指定的网络端口、消息中间件、外部的文件系统或者是打印到控制台。 - 程序触发(env.execute())
在DataStream编程中编写完成DataSink代码后并不意味着程序结束,由于Flink是基于事件驱动处理的,有一条数据时就会进行处理,所以最后一定要使用Environment.execute()来触发程序执行。
Flink数据类型
在 Apache Flink 中,为了能够在分布式计算过程中对数据的类型进行管理和判断,引入了 TypeInformation 类来对数据类型进行描述。TypeInformation 是 Flink 类型系统的基石,它允许 Flink 在编译时推断数据类型,从而为数据的序列化、反序列化、内存管理等操作提供必要的类型信息。以下是 Flink 中常见的数据类型及其对应的 TypeInformation 类型:
1. 基本数据类型
Flink 通过 BasicTypeInfo 支持 Java 的基本数据类型(如 int、double、boolean 等)以及它们的包装类(如 Integer、Double、Boolean 等),还支持 String 类型。
2. 数组类型
对于数组类型,Flink 提供了 BasicArrayTypeInfo,支持如 int[]、String[] 等数组数据类型。
3. Tuple 类型
Tuple 是 Flink 中一种常用的数据类型,用于表示固定长度的字段集合。Flink 提供了 TupleTypeInfo 来支持 Tuple 类型的数据。
4. POJO 类型
POJO(Plain Old Java Object)类型是 Flink 中非常重要的数据类型,它允许使用普通的 Java 类来表示数据对象。为了使 Flink 能够正确识别和处理 POJO 类型,需要满足以下条件:
- POJO 类必须是公共类(public)且不能是内部类。
- POJO 类必须包含一个默认的无参构造函数。
- POJO 类的所有字段必须是公共的,或者提供公共的 getter 和 setter 方法。
当满足上述条件时,Flink 会自动识别 POJO 类型,并通过 PojoTypeInfo 来描述该类型。
5. Scala Case Class 类型
对于使用 Scala 编写的 Flink 应用,Flink 提供了 CaseClassTypeInfo 来支持 Scala 的 Case Class 类型。Case Class 是 Scala 中一种特殊的类,通常用于表示不可变的数据对象,非常适合在 Flink 中作为数据类型使用。
在使用Java API开发Flink应用时,通常情况下Flink都能正常进行数据类型推断进而选择合适的serializers以及comparators,但是在定义函数时如果使用到了泛型,JVM就会出现类型擦除的问题,Flink就获取不到对应的类型信息,这就需要借助类型提示(Type Hints)来告诉系统函数中传入的参数类型信息和输出类型,进而对数据类型进行推断处理。
在使用Scala API 开发Flink应用时,Scala API通过使用Manifest和类标签在编译器运行时获取类型信息,即使在函数定义中使用了泛型,也不会像Java API出现类型擦除问题,但是在使用到Flink已经通过TypeInformation定义的数据类型时,TypeInformation类不会自动创建,需要使用隐式参数的方式引入:import org.apache.flink.api.scala._,否则在运行代码过程中会出现“could not find implicit value for evidence parameter of type TypeInformation”的错误。
Flink 序列化机制
在两个进程进行远程通信时,它们需要将各种类型的数据以二进制序列的形式在网络上传输,数据发送方需要将对象转换为字节序列,进行序列化,而接收方则将字节序列恢复为各种对象,进行反序列化。对象的序列化有两个主要用途:
- 一是将对象的字节序列永久保存到硬盘上,通常存放在文件中;
- 二是在网络上传输对象的字节序列。序列化的好处包括减少数据在内存和硬盘中的占用空间,减少网络传输开销,精确推算内存使用情况,降低垃圾回收的频率。
序列化和反序列化是分布式计算框架中的关键环节,尤其是在节点之间需要进行数据传输时。Flink 的序列化机制负责将数据对象转换为字节序列以便在网络上传输或在磁盘上存储,并能够在需要时将字节序列恢复为原始对象。Flink 提供了多种序列化器,以满足不同类型的数据序列化需求。高效的序列化和反序列化对于分布式计算框架至关重要,原因如下:
减少数据传输开销:通过将对象转换为紧凑的字节序列,可以减少网络传输的数据量,提高数据传输效率。
降低内存占用:序列化后的数据通常占用更少的内存空间,有助于提高内存利用率,尤其是在处理大规模数据集时。
支持数据持久化:序列化后的数据可以方便地写入磁盘进行持久化存储,便于后续的数据恢复和分析。
Flink序列化机制负责在节点之间传输数据时对数据对象进行序列化和反序列化,确保数据的正确性和一致性。Flink提供了多种序列化器,包括Kryo、Avro和Java序列化器等,大多数情况下,用户不用担心flink的序列化框架,Flink会通过TypeInfomation在数据处理之前推断数据类型,进而使用对应的序列化器,例如:针对标准类型(int,double,long,string)直接由Flink自带的序列化器处理,其他类型默认会交给Kryo处理。
但是对于Kryo仍然无法处理的类型,可以采取以下两种解决方案:
1. 强制使用Avro替代Kryo序列化
//设置flink序列化方式为avro
env.getConfig().enableForceAvro();
2. 自定义注册Kryo序列化
//注册kryo 自定义序列化器
env.getConfig().registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer> serializerClass)
单词统计案例
下面我们通过一个单词统计的案例,快速上手应用Flink,进行流处理。
引入依赖
<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><flink.version>1.16.0</flink.version><slf4j.version>1.7.31</slf4j.version><log4j.version>2.17.1</log4j.version><scala.version>2.12.10</scala.version><scala.binary.version>2.12</scala.binary.version></properties><dependencies><!-- Flink批和流开发依赖包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- Scala包 --><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>${scala.version}</version></dependency><!-- slf4j&log4j 日志相关包 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>${log4j.version}</version></dependency></dependencies>
代码实现
/*** WordCount 类实现了一个简单的 Flink 流式处理程序,用于统计输入文本文件中每个单词的出现次数。*/
public class WordCount {/*** 程序的主入口方法,负责创建 Flink 流式处理环境,读取输入文件,进行单词计数,并输出结果。* * @param args 命令行参数,在本程序中未使用。* @throws Exception 当执行 Flink 任务时可能抛出异常。*/public static void main(String[] args) throws Exception {// 1. 创建流式处理环境,用于配置和执行 Flink 流式计算任务StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 从指定的文本文件中读取数据,返回一个 DataStreamSource 对象,其中每个元素是文件中的一行文本DataStreamSource<String> lines = env.readTextFile("./data/words.txt");// 3. 对读取的每行文本进行处理,将其切分为单词,并转换为 <单词, 1> 的键值对形式// flatMap 方法用于将每行文本拆分为多个单词,并为每个单词生成一个键值对// returns 方法用于指定 flatMap 操作返回的数据类型SingleOutputStreamOperator<Tuple2<String, Long>> kvWordsDS =lines.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {// 将每行文本按空格分割成单词数组String[] words = line.split(" ");// 遍历单词数组,为每个单词生成一个 <单词, 1> 的键值对,并收集到 Collector 中for (String word : words) {collector.collect(Tuple2.of(word, 1L));}}).returns(Types.TUPLE(Types.STRING, Types.LONG));// 4. 对键值对数据进行分组统计,按照单词(键)进行分组,对值(出现次数)进行求和// keyBy 方法用于按照指定的键对数据进行分组// sum 方法用于对分组后的数据的指定字段进行求和操作// print 方法用于将统计结果输出到控制台kvWordsDS.keyBy(tp -> tp.f0).sum(1).print();// 5. 在流式计算中,需要调用 execute 方法来触发任务的执行// 该方法会阻塞当前线程,直到任务执行完成或被中断env.execute();}
}