当前位置: 首页 > news >正文

spark数据清洗练习

文章目录

    • 准备工作
    • 删除缺失值 >= 3 的数据
    • 删除星级、评论数、评分中任意字段为空的数据
    • 删除非法数据
    • hotel_data.csv

通过编写Spark程序清洗酒店数据里的缺失数据、非法数据、重复数据

准备工作

  1. 搭建 hadoop 伪分布或 hadoop 完全分布
  2. 上传 hotal_data.csv 文件到 hadoop
  3. idea 配置好 scala 环境

删除缺失值 >= 3 的数据

  1. 读取 /hotel_data.csv
  2. 删除缺失值 >= 3 的数据, 打印剔除的数量
  3. 将清洗后的数据保存为/hotelsparktask1
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo01 {
  
  def main(args: Array[String]): Unit = {
    // System.setProperty("HADOOP_USER_NAME", "root")//解决保存文件权限不够的问题
    val config: SparkConf = new SparkConf().setMaster("local[1]").setAppName("1")
    val sc = new SparkContext(config)
    val hdfsUrl ="hdfs://192.168.226.129:9000"
    val filePath: String = hdfsUrl+"/file3_1/hotel_data.csv"
    val data: RDD[Array[String]] = sc.textFile(filePath).map(_.split(",")).cache()
    val total: Long = data.count()
    val dataDrop: RDD[Array[String]] = data.filter(_.count(_.equals("NULL")) <= 3)
    println("删除的数据条目有: " + (total - dataDrop.count()))
    dataDrop.map(_.mkString(",")).saveAsTextFile(hdfsUrl+ "/hotelsparktask1")
    sc.stop()
  }
}

删除星级、评论数、评分中任意字段为空的数据

  1. 读取 /hotel_data.csv
  2. 将字段{星级、评论数、评分}中任意字段为空的数据删除, 打印剔除的数量
  3. 保存 /hotelsparktask2
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo02 {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    val config: SparkConf = new SparkConf().setMaster("local[1]").setAppName("2")
    val sc = new SparkContext(config)
    val hdfsUrl ="hdfs://192.168.226.129:9000"
    val filePath: String = hdfsUrl+"/file3_1/hotel_data.csv"
    val data: RDD[Array[String]] = sc.textFile(filePath).map(_.split(",")).cache()
    val total: Long = data.count()
    val dataDrop: RDD[Array[String]] = data.filter {
      arr: Array[String] =>
        !(arr(6).equals("NULL") || arr(10).equals("NULL") || arr(11).equals("NULL"))
    }
    println("删除的数据条目有: " + (total - dataDrop.count()))
    dataDrop.map(_.mkString(",")).saveAsTextFile(hdfsUrl+ "/hotelsparktask2")
    sc.stop()
  }
}

删除非法数据

  1. 读取第一题的 /hotelsparktask1
  2. 剔除数据集中评分和星级字段的非法数据,合法数据是评分[0,5]的实数,星级是指星级字段内容中包含 NULL、二星、三星、四星、五星的数据
  3. 剔除数据集中的重复数据
  4. 分别打印 删除含有非法评分、星级以及重复的数据条目数
  5. 保存 /hotelsparktask3
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo03 {

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")//解决权限问题
    val config: SparkConf = new SparkConf().setMaster(  "local[1]").setAppName("3")
    val sc = new SparkContext(config)
    val hdfsUrl ="hdfs://192.168.226.129:9000"
    val filePath: String = hdfsUrl+"/hotelsparktask1"
    val lines: RDD[String] = sc.textFile(filePath).cache()
    val data: RDD[Array[String]] = lines.map(_.split(","))

    val total: Long = data.count()
    val dataDrop: RDD[Array[String]] = data.filter {
      arr: Array[String] =>
        try {
          (arr(10).toDouble >= 0) && (arr(10).toDouble <= 5)
        } catch {
          case _: Exception => false
        }
    }

    val lab = Array("NULL", "一星", "二星", "三星", "四星", "五星")
    val dataDrop1: RDD[Array[String]] = data.filter { arr: Array[String] =>
      var flag = false
      for (elem <- lab) {
        if (arr(6).contains(elem)) {
          flag = true
        }
      }
      flag
    }

    val dataDrop2: RDD[String] = lines.distinct

    println("删除的非法评分数据条目有: " + (total - dataDrop.count()))
    println("删除的非法星级数据条目有: " + (total - dataDrop1.count()))
    println("删除重复数据条目有: " + (total - dataDrop2.count()))

    val wordsRdd: RDD[Array[String]] = lines.distinct.map(_.split(",")).filter {
      arr: Array[String] =>
        try {
          (arr(10).toDouble >= 0) && (arr(10).toDouble <= 5)
        } catch {
          case _: Exception => false
        }
    }.filter { arr: Array[String] =>
      var flag = false
      for (elem <- lab) {
        if (arr(6).contains(elem)) {
          flag = true
        }
      }
      flag
    }

    wordsRdd.map(_.mkString(","))
      .saveAsTextFile(hdfsUrl + "/hotelsparktask3")

    sc.stop()

  }
}

hotel_data.csv

下载数据:https://download.csdn.net/download/weixin_44018458/87437211

相关文章:

  • ChatGPT 这个风口,普通人怎么抓住:比如APP集成ChatGPT,公众号集成ChatGPT...
  • 谷粒商城-品牌管理-JSR303数据校验
  • Lesson 6.5 机器学习调参基础理论与网格搜索
  • Servlet实现表白墙
  • java高频面试题(2023最新)
  • chatGPT接入个人微信教程(国内可用)
  • splay
  • 这是从零在独自开开发,将是副业赚钱最好的平台!
  • 基于 oss 框架的音频驱动
  • 基于matlab使用机器学习和深度学习进行雷达目标分类
  • LeetCode - 1109 - 航班预定统计
  • Rust编码的信息窃取恶意软件源代码公布,专家警告已被利用
  • typora每次复制文档都要附带图片文件夹?学会配置gitee图床
  • 离散数学 课时一 命题逻辑的基本概念
  • Typescript - 类型守卫(typeof / in / instanceof / 自定义类型保护的类型谓词)通俗易懂详细教程
  • 【Android -- 每日一问】现在 Android 怎么学?学什么?
  • 蔚来日常实习收获
  • 【C++入门】命名空间,输出输入,缺省参数,函数重载
  • [oeasy]python0078_设置索引颜色_index_color_ansi_控制终端颜色
  • day3——有关java运算符的笔记
  • 对话地铁读书人|来自大学教授的科普:读书日也是版权日
  • 讲座预告|把握可持续信息披露新机遇
  • 上海浦东打造全新开放平台,年内实现基本功能落地运行
  • 青创上海-2025浦东徒步行倒计时1天,明日浦东世博文化公园不见不散
  • 杜志龙任榆林市府谷县委书记
  • 梅宏院士:数实共生将改写社会经济运行规则