当前位置: 首页 > news >正文

在AWS Glue中实现缓慢变化维度(SCD)的三种类型

根据缓慢变化维度(SCD)的三种核心类型(类型1、类型2、类型3),以下是基于AWS Glue的实现设计、步骤及测试用例:


一、AWS Glue实现SCD的设计与步骤

1. SCD类型1(覆盖旧值)

设计目标:直接更新目标表中的记录,不保留历史数据。
技术选型

  • 使用AWS Glue ETL作业(PySpark)
  • 目标存储:S3(Parquet格式)或Amazon Redshift
  • 数据比对方式:基于业务键(如customer_id)匹配新旧记录

实现步骤

  1. 数据源准备

    • 源表(Source):实时更新的客户表(如CSV或数据库表)。
    • 目标表(Target):维度表(如dim_customer)。
  2. Glue作业逻辑

    from pyspark.context import SparkContext
    from awsglue.context import GlueContextsc = SparkContext()
    glueContext = GlueContext(sc)# 加载源数据和目标数据
    source_df = glueContext.create_dynamic_frame.from_catalog(database="source_db", table_name="customer").toDF()
    target_df = glueContext.create_dynamic_frame.from_catalog(database="target_db", table_name="dim_customer").toDF()# 合并逻辑:覆盖旧值
    merged_df = target_df.alias("target").join(source_df.alias("source"),target_df.customer_id == source_df.customer_id,"outer"
    ).selectExpr("coalesce(source.customer_id, target.customer_id) as customer_id","source.name as name",  # 直接覆盖"source.address as address"  # 直接覆盖
    ).distinct()# 写入目标表(覆盖模式)
    glueContext.write_dynamic_frame.from_catalog(frame=DynamicFrame.fromDF(merged_df, glueContext, "merged_df"),database="target_db",table_name="dim_customer",transformation_ctx="write_target"
    )
    
  3. 目标表结构

    CREATE TABLE dim_customer (customer_id INT PRIMARY KEY,name STRING,address STRING
    );
    

2. SCD类型2(创建新记录)

设计目标:插入新记录并标记历史版本。
技术选型

  • 使用Glue的窗口函数(row_number)跟踪最新记录
  • 新增字段:is_current(布尔值)、start_dateend_date
  • 存储格式:S3 + Parquet(支持ACID事务)

实现步骤

  1. 目标表结构

    CREATE TABLE dim_customer_scd2 (customer_sk INT AUTOINCREMENT PRIMARY KEY,  -- 代理键customer_id INT,name STRING,address STRING,is_current BOOLEAN,start_date DATE,end_date DATE
    );
    
  2. Glue作业逻辑

    from pyspark.sql.window import Window
    from pyspark.sql.functions import row_number, lit, current_date, when# 加载源数据和目标数据
    source_df = glueContext.create_dynamic_frame.from_catalog(database="source_db", table_name="customer").toDF()
    target_df = glueContext.create_dynamic_frame.from_catalog(database="target_db", table_name="dim_customer_scd2").toDF()# 标记目标表中的旧记录为失效
    target_updated = target_df.withColumn("end_date",when((target_df.customer_id.isin(source_df.select("customer_id").collect())) &(target_df.is_current == True),current_date()).otherwise(target_df.end_date)
    ).withColumn("is_current",when((target_df.customer_id.isin(source_df.select("customer_id").collect())) &(target_df.is_current == True),False).otherwise(target_df.is_current)
    )# 插入新记录
    new_records = source_df.join(target_updated,["customer_id"],"left_anti"  # 仅选择源中存在但目标中不存在的记录
    ).select("customer_id","name","address",lit(True).alias("is_current"),current_date().alias("start_date"),lit(None).cast("date").alias("end_date")
    )# 合并并写入目标表
    final_df = target_updated.unionByName(new_records)
    glueContext.write_dynamic_frame.from_catalog(frame=DynamicFrame.fromDF(final_df, glueContext, "final_df"),database="target_db",table_name="dim_customer_scd2"
    )
    

3. SCD类型3(添加有效日期)

设计目标:维护当前记录和历史记录的有效时间范围。
技术选型

  • 新增字段:valid_fromvalid_to
  • 使用Glue的coalesce处理时间重叠

实现步骤

  1. 目标表结构

    CREATE TABLE dim_customer_scd3 (customer_sk INT PRIMARY KEY,customer_id INT,name STRING,address STRING,valid_from DATE,valid_to DATE
    );
    
  2. Glue作业逻辑

    # 关闭旧记录的valid_to
    target_updated = target_df.withColumn("valid_to",when((target_df.customer_id.isin(source_df.select("customer_id").collect())) &(target_df.valid_to.isNull()),current_date()).otherwise(target_df.valid_to)
    )# 插入新记录
    new_records = source_df.select("customer_id","name","address",current_date().alias("valid_from"),lit(None).cast("date").alias("valid_to")
    )# 合并并写入
    final_df = target_updated.unionByName(new_records)
    glueContext.write_dynamic_frame.from_catalog(...)
    

二、测试用例

通用测试场景
测试场景预期结果
无变化的记录目标表记录保持不变。
新增记录目标表插入新记录(类型2/3新增代理键,类型1直接插入)。
属性值变化类型1覆盖旧值;类型2插入新记录并标记旧记录失效;类型3关闭旧记录有效期。
多次更新同一记录类型2生成多条历史记录;类型3仅保留当前和上一次状态。
类型2专项测试
  1. 历史版本查询

    SELECT * FROM dim_customer_scd2 
    WHERE customer_id = 100 
    ORDER BY start_date DESC;
    

    预期:返回该客户的所有历史地址记录。

  2. 当前标记验证

    SELECT COUNT(*) FROM dim_customer_scd2 
    WHERE customer_id = 100 AND is_current = True;
    

    预期:仅返回1条记录。

类型3专项测试
  1. 时间范围覆盖
    SELECT * FROM dim_customer_scd3 
    WHERE customer_id = 200 
    AND valid_from <= '2023-10-01' 
    AND valid_to >= '2023-10-01';
    
    预期:返回该时间点有效的记录。

三、性能优化建议

  1. 分区策略:按日期或业务键分区目标表(如valid_from)。
  2. 索引优化:在Redshift中为customer_idis_current列创建排序键。
  3. 增量处理:启用Glue Job Bookmark仅处理新增数据。

相关文章:

  • 深圳市富力达:SAP一体化管理助力精密制造升级 | 工博科技SAP客户案例
  • 织梦dedecms网站如何修改上一篇下一篇的标题字数
  • 【Flutter】Flutter + Unity 插件结构与通信接口封装
  • 光场的相位与偏振
  • 详解 Unreal Engine(虚幻引擎)
  • 开源网络入侵检测与防御系统:Snort
  • Spark SQL开发实战:从IDEA环境搭建到UDF/UDAF自定义函数实现
  • Maven下载aspose依赖失败的解决方法
  • BeeWorks Meet更适合企业内部使用的原因
  • Linux中线程池的简单实现 -- 线程安全的日志模块,策略模式,线程池的封装设计,单例模式,饿汉式单例模式,懒汉式单例模式
  • streamlit实现非原生的按钮触发效果 + flask实现带信息的按钮触发
  • 前端浏览器窗口交互完全指南:从基础操作到高级控制
  • 论文导读 - 基于大规模测量与多任务深度学习的电子鼻系统实现目标识别、浓度预测与状态判断
  • [计算机科学#3]:布尔逻辑 (计算机数学基础)
  • 【中级软件设计师】编译和解释程序的翻译阶段、符号表 (附软考真题)
  • Lua 第10部分 模式匹配
  • 【嵌入式八股22】排序算法与哈希算法
  • 辞九门回忆
  • windows安装docker,发现没有hyper
  • WSL2里手动安装Docker 遇坑
  • 光明网评论员:手机“二次放号”,需要重新确认“你是你”
  • 王文涛会见德国汽车工业协会主席穆勒
  • 一回合摘下“狮心”,张名扬霸气回应观众:再嘘一个我听听
  • 偷拍拷贝某轨道车技术信息后撰写论文发表,工程师被判一年有期徒刑
  • 甘肃张掖至重庆航线开通,串起西北与西南文旅“黄金走廊”
  • 金隅集团:今年拿地将选择核心热门地块,稳健审慎投资