当前位置: 首页 > news >正文

Apache Sqoop数据采集问题

Sqoop数据采集格式问题

  • 一、Sqoop工作原理
  • 二、Sqoop命令格式
  • 三、Oracle数据采集格式问题
  • 四、Sqoop增量采集方案

Apache Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql…)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。

Sqoop项目开始于2009年,最早是作为Hadoop的一个第三方模块存在,后来为了让使用者能够快速部署,也为了让开发人员能够更快速的迭代开发,Sqoop独立成为一个Apache项目。

一、Sqoop工作原理

  • 数据导入:Sqoop通过MapReduce任务来实现数据的并行导入。首先,它会将关系型数据库中的数据表按照一定的规则进行分区,然后为每个分区启动一个Map任务,同时从数据库中读取相应分区的数据,并将数据写入到HDFS或其他Hadoop存储系统中。这样可以充分利用Hadoop集群的分布式计算能力,提高数据导入的效率。

  • 导出过程:与导入类似,Sqoop也会将数据进行分区处理,然后通过Map任务将Hadoop中的数据读取出来,并按照目标关系型数据库的格式要求,将数据写入到数据库中。

Sqoop通过创建一个数据传输的MR程序,进而实现数据传输。

Sqoop安装:

  1. JAVA环境配置
  2. Hadoop环境配置
  3. 相关数据库驱动包

只要环境满足以上设置,直接解压Sqoop安装包即可安装,修改配置后即可使用。

二、Sqoop命令格式

基础使用语法:

sqoop import | export \
--数据库连接参数
--HDFS或者Hive的连接参数
--配置参数

数据传输常用参数:

选项参数
–connectjdbc:mysql://hostname:3306(数据库连接URL)
–username数据库用户名
–password数据库用户密码
–table指定数据表
–columns指定表列值
–where数据过滤条件
–e/–query自定义SQL语句
–driver指定数据库驱动
–delete-target-dir导入数据时,清空目标目录
–target-dir指定导入数据的目录(通常为HDFS路径)
–export-dir指定导出数据的源目录(通常为HDFS路径)

Sqoop命令的使用方法可以通过sqoop -h命令查看相关使用方法,此处不在赘述了

三、Oracle数据采集格式问题

场景:

  • Step1: 查看业务数据库中 CISS_SERVICE_WORKORDER 表的数据条数。

    select count(1) as cnt from CISS_SERVICE_WORKORDER;  178609
  • Step2: 采集CISS_SERVICE_WORKORDER的数据到HDFS上

    sqoop import \
    --connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \  
    --username ciss \
    --password 123456 \
    --table CISS4.CISS_SERVICE_WORKORDER \
    --delete-target-dir \
    --target-dir /test/full_imp/ciss4.ciss_service_workorder \
    --fields-terminated-by "\001" \   #指定数据分割符
    -m 1  #指定并行度
    
  • Step3: 使用Hive查看导入数据表的行数

    create external table test_text(
    line string # 将导入的数据一行作为表中的一列
    )
    location '/test/full_imp/ciss4.ciss_service_workorder';
    select count(*) from test_text;  195825

问题:
Sqoop采集完数据后,HDFS数据中存储的数据行数跟源数据库的数据量不符合。

原因:

  • sqoop以文本格式导入数据时,默认的换行符是特殊字符。
  • Oracle中的数据列中如果出现了\n、\r、\t等特殊字符,就会被划分为多行

Oracle数据:

idnameage
001zhang\nsan18

Sqoop转换后的数据:

001zhang
san18

Hive表中的数据:

idnameage
001zhang
san18

解决方法:

  • 方案一:
    • 删除或者替换数据中的换行符
    • Sqoop参数 --hive-drop-import-delims 删除换行符
    • Sqoop参数 --hive-delims-replacement char 替换换行符

    不建议使用,破坏原始数据结构,ODS层数据尽量抱持原结构

  • 方案二:
    • 采用特殊的存储格式,AVRO格式

常见的文件格式介绍:

类型介绍
TextFileHive默认的文件格式,最简单的数据格式,便于查看和编辑,耗费存储空间,I/O性能较低
SequenceFile含有键值对的二进制文件,优化磁盘利用率和I/O,并行操作数据,查询效率高,但存储空间消耗最大
AvroFile特殊的二进制文件,设计的主要目标是为了满足schema evolution,Schema和数据保存在一起
OrcFile列式存储,Schema存储在footer中,不支持schema evolution,高度压缩比并包含索引,查询速度非常快
ParquetFile列式存储,与Orc类似,压缩比不如Orc,但是查询性能接近,支持的工具更多,通用性更强

Avro格式特点

  • 优点
    • 二进制数据存储,性能好、效率高
    • 使用JSON描述模式,支持场景更丰富
    • Schema和数据统一存储,消息自描述(将表中的一行数据作为对象存储,并且Schema为元数据)
    • 模式定义允许定义数据的排序
  • 缺点
    • 只支持Avro自己的序列化格式
    • 少量列的读取性能比较差,压缩比较低
  • 场景:基于行的大规模结构化数据写入、列的读取非常多或者Schema变更操作比较频繁的场景

Sqoop使用Avro格式:

  sqoop import \-Dmapreduce.job.user.classpath.first=true \--connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \--username ciss \--password 123456 \--table CISS4.CISS_SERVICE_WORKORDER \--delete-target-dir \--target-dir /test/full_imp/ciss4.ciss_service_workorder \--as-avrodatafile \    # 选择文件存储格式为AVRO--fields-terminated-by "\001" \-m 1

Hive建表指定文件的存储格式:

create external table test_avro(
line string
)
stored as avro
location '/test/full_imp/ciss4.ciss_service_workorder';

AVRO 数据以 二进制序列化 存储,字段通过预定义的 模式(Schema) 解析,而非依赖分隔符,即使字段内容包含逗号、换行符等特殊字符,也不会影响数据结构的正确性。
Schema 定义(JSON 格式),明确描述了字段名称、类型、顺序等信息。

四、Sqoop增量采集方案

Sqoop 支持两种增量模式:

  • append 模式:
    适用于 仅追加数据 的表(如日志表),基于 递增列(如自增主键 id)采集新数据。

  • lastmodified 模式:
    适用于 数据会更新 的表(如用户表),基于 时间戳列(如 last_update_time)采集新增或修改的数据。

append模式要求源数据表具备自增列,如建表时设置的自增id
lastmodified模式要求源数据表具有时间戳字段。

Append模式:

要求:必须有一列自增的值,按照自增的int值进行判断

特点:只能导入增加的数据,无法导入更新的数据

场景:数据只会发生新增,不会发生更新的场景

sqoop import \                                   # 执行数据导入操作--connect jdbc:mysql://node3:3306/sqoopTest \  # 连接MySQL数据库(地址:node3,数据库名:sqoopTest)--username root \                             # 数据库用户名:root--password 123456 \                           # 数据库密码:123456--table tb_tohdfs \                           # 要导入的源表:tb_tohdfs--target-dir /sqoop/import/test02 \           # HDFS目标目录(数据将写入此路径)--fields-terminated-by '\t' \                 # 字段分隔符为制表符(\t)--check-column id \                           # 指定增量检查列:id(通常是自增主键)--incremental append \                        # 增量模式为“append”(仅导入新数据)--last-value 0 \                              # 上次导入的id最大值(初始值为0,首次导入id>0的数据)-m 1                                          # 使用1个Map任务(单线程)

appebd模式使用last-value记录上次导入的数据id最大值,初次导入一般为全量导入,即id>0

此处的last_value需要手动填写,因此可以使用Sqoop的job管理进行自动记录。

sqoop job --create my_job -- import ... --incremental append --check-column id --last-value 0
sqoop job --exec my_job  # 自动更新 last-value

lastmodified模式:
要求:必须包含动态时间变化这一列,按照数据变化的时间进行判断

特点:既导入新增的数据也导入更新的数据

场景:表中的记录会新增或更新,且每次更新都会修改 lastmode 时间戳。一般无法满足要求,所以不用。

sqoop import \                                   # 执行数据导入操作--connect jdbc:mysql://node3:3306/sqoopTest \  # 连接MySQL数据库(地址:node3,数据库名:sqoopTest)--username root \                             # 数据库用户名:root--password 123456 \                           # 数据库密码:123456--table tb_lastmode \                         # 要导入的源表:tb_lastmode--target-dir /sqoop/import/test03 \           # HDFS目标目录(数据将写入此路径)--fields-terminated-by '\t' \                 # 字段分隔符为制表符(\t)--incremental lastmodified \                  # 增量模式为“lastmodified”(采集新增或修改的数据)--check-column lastmode \                     # 指定时间戳列:lastmode(记录数据的更新时间)--last-value '2021-06-06 16:09:32' \          # 上次导入的最大时间值(导入此时间之后的新增/修改数据)-m 1                                          # 使用1个Map任务(单线程)

lastmodified模式使用时间戳记载数据的更新线。

若同一条记录被多次更新,且 lastmode 时间超过 --last-value,Sqoop 会多次导入该记录。

解决方案:添加 --merge-key <主键列> 参数,合并新旧数据(基于主键去重):

 --merge-key id  # 假设 id 是主键列

自定义模式:
要求:每次运行的输出目录不能相同

特点:自己实现增量的数据过滤,可以实现新增和更新数据的采集

场景:一般用于自定义增量采集每天的分区数据到Hive

sqoop  import \
--connect jdbc:mysql://node3:3306/db_order \
--username root \
--password-file file:///export/data/sqoop.passwd \
--query "select * from tb_order where substring(create_time,1,10) = '2021-09-14' or substring(update_time,1,10) = '2021-09-14' and \$CONDITIONS " \
--delete-target-dir \
--target-dir /nginx/logs/tb_order/daystr=2021-09-14 \
--fields-terminated-by '\t' \
-m 1

自定义模式可以根据设置的sql进行数据导入,因此是最常用的场景。

相关文章:

  • 【Spring Boot】Maven中引入 springboot 相关依赖的方式
  • 大模型——Suna集成浏览器操作与数据分析的智能代理
  • [Vulfocus解题系列]Apache HugeGraph JWT Token硬编码导致权限绕过(CVE-2024-43441)
  • Apache Tomcat 漏洞(CVE-2025-24813)导致服务器面临 RCE 风险
  • vue项目页面适配
  • 数据结构【堆和链式结构】
  • PWN基础-利用格式化字符串漏洞泄露canary结合栈溢出getshell
  • 耳机,三段式, 四段式,录音,播放
  • C++修炼:list模拟实现
  • spark学习总结
  • 【Spark入门】Spark简介:分布式计算框架的演进与定位
  • 面试新收获-窗口排序函数
  • 详解最新链路追踪skywalking框架介绍、架构、环境本地部署配置、整合微服务springcloudalibaba 、日志收集、自定义链路追踪、告警等
  • Java学习手册:Java开发常用的内置工具类包
  • Python函数基础:简介,函数的定义,函数的调用和传入参数,函数的返回值
  • C语言学习之调试
  • 测试基础笔记第十三天
  • 第八部分:缓解 RAG 中的幻觉
  • 6.2 内容生成与营销:个性化内容创作与营销策略优化
  • 常见cmd命令
  • 广州海关原党委委员、副关长刘小威被开除党籍
  • 我的科学观|张峥:AI快速迭代,我们更需学会如何与科技共处
  • 加总理:目前没有针对加拿大人的“活跃威胁”
  • 加拿大温哥华一车辆冲撞人群,造成多人伤亡
  • 持续更新丨伊朗官员:港口爆炸事件已致5人死亡
  • 魔都眼·上海车展⑤|被主播包围的新车