当前位置: 首页 > news >正文

【Project】基于spark-App端口懂车帝数据采集与可视化

文章目录

      • hadoop完全分布式部署
        • hdfs-site.xml
        • core-site.xml
        • marpred-site.xml
        • yarn-site.xml
      • spark集群部署
        • spark-env.sh
      • mongodb分片模式部署
        • config 服务器
          • 初始化config 副本集
        • shard 服务器
          • 初始化shard 副本集
        • mongos服务器
          • 添加shard
          • 设置chunk大小
        • 启动分片
          • 为集合 user 创建索引并进行分片
      • fillder抓包+数据采集
      • spark清洗数据
      • Flask+echarts进行可视化
      • 效果图

hadoop完全分布式部署

hdfs-site.xml
<configuration><property><name>dfs.namenode.name.dir</name><value>file:/usr/local/src/hadoop/dfs/name</value></property><property><name>dfs.datanode.data.dir</name><value>file:/usr/local/src/hadoop/dfs/data</value></property><property><name>dfs.replication</name><value>3</value></property></property><!-- Secondary NameNode 所在主机的ip和端口  --><property><name>dfs.namenode.secondary.http-address</name><value>master02:9890</value></property>
</configuration>
core-site.xml
<configuration><!--   --><!-- 设置hadoop的文件系统,由URI指定  --><property><!-- 指定namenode地址节点所在机器  --><name>fs.defaultFS</name><value>hdfs://master01:9000</value></property><!-- 配置hadoop临时目录,默认/tmp/hadoop-${user.name}  --><property><name>hadoop.tmp.dir</name><value>/usr/local/src/hadoop/tmp/</value></property></configuration>
marpred-site.xml
<configuration><property><name>mapreduce.framework.name</name><value>yarn</value></property><property><name>mapreduce.jobhistory.address</name><value>master01:10020</value></property>
</configuration>
yarn-site.xml
<configuration><property><name>mapreduce.framework.name</name><value>yarn</value></property><property><name>mapreduce.jobhistory.address</name><value>master01:10020</value></property>
</configuration>

spark集群部署

spark-env.sh
export JAVA_HOME=/usr/local/src/jdk1.8.0_152
export HADOOP_CONF_DIR=/usr/local/src/hadoop-3.2.2/etc/hadoop/
export SPARK_MASTER_IP=master01
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_MEMORY=512m
export SPARK_WORKER_CORES=1
export SPARK_EXECUTOR_MEMORY=512m
export SPARK_EXECUTOR_CORES=1
export SPARK_WORKER_INSTANCES=1

mongodb分片模式部署

服务器名称IP地址Shard1 (端口/角色)Shard2 (端口/角色)Shard3 (端口/角色)mongos (端口)Config Server (端口/角色)
master01192.168.121.13427018 (主节点)27020 (仲裁节点)27019 (副节点)2702127022 (主节点)
node01192.168.121.13527019 (副节点)27018 (主节点)27020 (仲裁节点)2702127022 (副节点)
node02192.168.121.13627020 (仲裁节点)27019 (副节点)27018 (主节点)-27022 (副节点)
config 服务器
dbpath=/usr/local/src/mongodb_demo/shardcluster/configServer/data
logpath=/usr/local/src/mongodb_demo/shardcluster/configServer/logs/config_server.log
port=27022
bind_ip=master01
logappend=true
fork=trues
maxConns=5000
replSet=configs
configsvr=true
初始化config 副本集
$ ./mongod -f /usr/local/src/mongodb_demo/shardcluster/configServer/configFile/mongodb_config.conf$ ./mongo --host master01 --port 27022
> rs.initiate()
configs:SECONDARY> rs.add('node01:27022')
configs:PRIMARY> rs.add('node02:27022')
shard 服务器
dbpath=/usr/local/src/mongodb_demo/shardcluster/shard/shard1_data
logpath=/usr/local/src/mongodb_demo/shardcluster/shard/logs/shard1.log
port=27018
bind_ip=master01
logappend=true
fork=true
maxConns=5000
replSet=shard1
shardsvr=true

类似地,编辑 shard2.conf 和 shard3.conf,分别修改 dbpath, logpath, port, bind_ip, 和 replSet 参数。

初始化shard 副本集
# 所有节点都启动
$ ./mongod -f /usr/local/src/mongodb_demo/shardcluster/shard/configFile/mongodb_shard1.conf
$ ./mongod -f /usr/local/src/mongodb_demo/shardcluster/shard/configFile/mongodb_shard2.conf
$ ./mongod -f /usr/local/src/mongodb_demo/shardcluster/shard/configFile/mongodb_shard3.conf# master01节点
$ ./mongo --host master01 --port 27018
> rs.initiate()
configs:SECONDARY> rs.add('node01:27019')
configs:PRIMARY> rs.add('node02:27020')# node01节点
$ ./mongo --host node01--port 27018
> rs.initiate()
configs:SECONDARY> rs.add('node02:27019')
configs:PRIMARY> rs.add('master01 :27020')# node02节点
$ ./mongo --host node02 --port 27018
> rs.initiate()
configs:SECONDARY> rs.add('master01 :27019')
configs:PRIMARY> rs.add('node01:27020')
mongos服务器
logpath=/usr/local/src/mongodb_demo/shardcluster/mongos/logs/mongos.log
logappend=true
port=27021
bind_ip=master01
fork=true
configdb=configs/master01:27022,node01:27022,node02:27022
maxConns=20000

类似地,在node01节点配置

添加shard

$ ./mongo --host master01--port 27021mongos> use gateway
mongos> sh.addShard("shard1/master01:27018,node01:27019,node02:27020")
mongos> sh.addShard("shard2/master01:27020,node01:27018,node02:27019")
mongos> sh.addShard("shard3/master01:27019,node01:27020,node02:27018")
设置chunk大小
use config;
db.settings.insertOne({ _id: "chunksize", value: 64 });
启动分片
mongos> use gateway
mongos> sh.enableSharding("xxxdatabase")
为集合 user 创建索引并进行分片
mongos> use xxxdatabase
mongos> db.user.createIndex({"id":1})
mongos> sh.shardCollection("xxxdatabase.xxxcollection",{"id":1})

fillder抓包+数据采集

在这里插入图片描述

    def data_SalesVolume(self,allcity):'''获取全国和各城市车辆销售量排名数据结果存储到本地和hdfs:param allcity: 城市名称-->list:return:'''data_list = []for city in allcity:city_quote = quote(city)newurl = self.url+f'method=jsb.app.fetch&rank_data_type=116&month=202412&energy_type&price=0%2C-1&manufacturer&rank_city_name={city_quote}&market_time=0&offset=0&count=500&scm_version=1.0.0.2209&iid=3991681621300419&device_id=3991681621296323&ac=wifi&channel=home&aid=36&app_name=automobile&version_code=748&version_name=7.4.8&device_platform=android&os=android&ab_client=a1%2Cc2%2Ce1%2Cf2%2Cg2%2Cf7&ab_group=3167590%2C3577236&ssmix=a&device_type=PCLM10&device_brand=OPPO&language=zh&os_api=28&os_version=9&manifest_version_code=748&resolution=720*1280&dpi=320&update_version_code=7483&_rticket=1738335686380&cdid=637e94d4-1e98-49a9-9b9e-7f567951c149&rom_version=coloros__pq3b.190801.10161630+release-keys&longi_lati_type=0&longi_lati_time=0&content_sort_mode=0&total_memory=3.85&cpu_name=Qualcomm+Technologies%2C+Inc+MSM8998&overall_score=8.6995&cpu_score=7.9848&host_abi=armeabi-v7a'print(f'准备获取=={city}==的数据---{newurl}---')citynames = self.session.get(url=newurl, headers=self.headers).json()city_sales = {city:citynames}data_list.append(city_sales)# print(citynames)print('----------获取销量数据完成-----------')data_json = {'SalesValue':data_list}with open(f'data/data_salesvolume.json','w',encoding='utf-8') as f:json.dump(data_json,f,ensure_ascii=False,indent=4)hdfs_path = "/data_doncar/data_carkm.json"  # HDFS 目标路径self.hdfs_client.create(hdfs_path, data_json, overwrite=True)  # 上传到 HDFS

spark清洗数据

创建对象

package org.exampleimport org.apache.log4j.{Level, Logger}
import org.apache.spark._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._object DonCar_spark {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.ERROR)Logger.getLogger("akka").setLevel(Level.ERROR)val spark = SparkSession.builder().appName("car_analysis").master("local[*]").config("spark.mongodb.read.connection.uri", "mongodb://root:123456@192.168.121.134:27017").getOrCreate()

连接mongodb数据库

    def readmongocollection(db:String,collection:String) = {spark.read.format("mongodb").option("database",db).option("collection",collection).load()}

创建临时表

val car_info = readmongocollection("doncar_top","car_info")val citys_sv = readmongocollection("doncar_top","citys_sv")val nationwide_sv = readmongocollection("doncar_top","nationwide_sv")citys_sv.createOrReplaceTempView("citys_table")car_info.createOrReplaceTempView("car_table")nationwide_sv.createOrReplaceTempView("nationwide_table")

数据分析 并且把数据存到HDFS


//    各城市汽车销量val citys_sales = spark.sql("select city,sales from citys_table").groupBy("city").sum("sales")//    电车续航排名val brand_max_km = spark.sql("select brand,range_km from car_table").groupBy("brand").max()brand_max_km.limit(7).show()//    油电销量val car_type_sales =spark.sql("select sales,car_type from nationwide_table ").withColumn("category",when(col("car_type") === 0,0).otherwise(1)).groupBy("category").sum("sales")car_type_sales.show()// 能源占比
//    0 纯油
//    1 纯电
//    2 混动
//    3-4 增程val car_type = spark.sql("select car_type,sales from nationwide_table").groupBy("car_type").sum("sales")//    car_type.printSchema()val car_type2 = car_type.withColumn("sum(sales)",when(col("car_type") === 3,col("sum(sales)")+car_type.filter(col("car_type") === 4).select("sum(sales)").first().getLong(0)).otherwise(col("sum(sales)")))car_type2.show()//销量——价格关系val price_sales = spark.sql("SELECT min_price,max_price,sales from nationwide_table").withColumn("average_price",(col("min_price") + col("max_price")) / 2).drop("min_price","max_price")price_sales.show()//   品牌销量val brand_sales = spark.sql("SELECT brand_name,model,sales FROM nationwide_table").groupBy("brand_name").sum("sales").sort(col("sum(sales)").desc)brand_sales.show()//    城市销量排名val city_top = citys_sales.sort(col("sum(sales)").desc)city_top.show()//    车辆销售排名val car_top = spark.sql("SELECT model,sales FROM nationwide_table").sort(col("sales").desc)car_top.show()}
}

在这里插入图片描述

Flask+echarts进行可视化

from flask import Flask, render_template, redirect, url_for, request, flash, session,jsonify
from SparkAnalysis import analysis
from SparkAnalysis import Get_HdfsData
import pymysqlapp = Flask(__name__)#重定向路由
@app.route('/')
def default():return redirect(url_for('login'))@app.route('/login', methods=['GET', 'POST'])
def login():if request.method == 'POST':username = request.form['username']password = request.form['password']conn = pymysql.connect(host='localhost',  # 数据库主机地址user='root',  # 数据库用户名password='123456',  # 数据库密码database='users'  # 数据库名称)with conn.cursor() as cursor:cursor.execute("SELECT * FROM account WHERE username = %s AND password = %s", (username, password))account = cursor.fetchone()  # 获取第一行记录conn.close()if account:return redirect(url_for('index'))else:return redirect(url_for('login'))return render_template('login.html')@app.route('/register', methods=['GET', 'POST'])
def register():if request.method == 'POST':username = request.form['username']password = request.form['password']conn = pymysql.connect(host='localhost',user='root',password='123456',database='users')with conn.cursor() as cursor:cursor.execute("SELECT * FROM account WHERE username = %s", (username,))if cursor.fetchone():return redirect(url_for('register'))# 如果用户名不存在,插入新用户cursor.execute("INSERT INTO account (username, password) VALUES (%s, %s)", (username, password))conn.commit()conn.close()return redirect(url_for('login'))return render_template('register.html')@app.route('/index')
def index():return render_template('index.html',map_data=Get_HdfsData.get_map_data(),bar1_x = analysis.get_bar1_data()['brand'].values.tolist(),# bar1_y=Get_HdfsData.get_bar1_data()['range_km'].values.tolist(),bar1_y=[float(num) for num in analysis.get_bar1_data()['range_km'].values.tolist()],type_0 = Get_HdfsData.get_num_data()[0][2:],type_n0 = Get_HdfsData.get_num_data()[1][2:],pie_data = Get_HdfsData.get_pie_data(),scatter_data = Get_HdfsData.get_scatter_data(),bar2_x=analysis.get_bar2_data()['brand_name'].values.tolist()[:5],bar2_y=analysis.get_bar2_data()['sales'].values.tolist()[:5],line1_x_top=Get_HdfsData.get_line1_data()['city'].values.tolist()[:12],line1_y_top=Get_HdfsData.get_line1_data()['value'].values.tolist()[:12],line1_x_last=analysis.get_line1_data()['city'].values.tolist()[-12:],line1_y_last=analysis.get_line1_data()['value'].values.tolist()[-12:],line2_x_top=Get_HdfsData.get_line2_data()['model'].values.tolist()[:10],line2_y_top=Get_HdfsData.get_line2_data()['sales'].values.tolist()[:10],)if __name__ == '__main__':app.run(host='127.0.0.1', port=5001)

echarts部分代码


(function() {var myChart = echarts.init(document.querySelector(".map .chart"));echarts.registerMap('china', chinaData);option = {tooltip: {trigger: 'item',formatter: '{b}<br/>{c} (辆)'},visualMap: {min: 1,max: 40000,text: ['High', 'Low'],realtime: false,calculable: true,inRange: {color: ['lightskyblue', 'yellow', 'orangered']},textStyle: {color: 'red' // 设置字体颜色为红色}},series: [{name: 'city_map',type: 'map',map: 'china',data: map_data,roam: true // 在这里添加 roam 属性}]};myChart.setOption(option);// 监听浏览器缩放,图表对象调用缩放resize函数window.addEventListener("resize", function() {myChart.resize();});}
)();

效果图

在这里插入图片描述

相关文章:

  • Vue 3中如何封装API请求:提升开发效率的最佳实践
  • Geek强大的电脑卸载软件工具,免费下载
  • Winform实现条码打印
  • Vue生命周期详细解析
  • AI语音助手自定义角色百度大模型 【全新AI开发套件掌上AI+4w字教程+零基础上手】
  • Android SDK 下载及配置 --- app笔记
  • 【分布式锁通关指南 09】源码剖析redisson之公平锁的实现
  • [KVM] KVM挂起状态恢复失败与KVM存储池迁移
  • Spring JDBC 的开发步骤(注解方式)
  • 私有知识库 Coco AI 实战(三):摄入 Elasticsearch 官方文档
  • Go语言学习笔记(一)
  • 【论文阅读】Dual-branch Cross-Patch Attention Learning for Group Affect Recognition
  • 代理模式:控制对象访问的中间层设计
  • 论文阅读 | 大模型工具调用控制的策略优化
  • Spark与Hadoop之间的联系与区别
  • 使用nodeJs的express+axios+cors做代理
  • 配置MambaIRv2: Attentive State Space Restoration的环境
  • Sql刷题日志(day5)
  • 说一下Redis的发布订阅模型和PipeLine
  • OpenBayes 一周速览|EasyControl 高效控制 DiT 架构,助力吉卜力风图像一键生成;TripoSG 单图秒变高保真 3D 模型
  • 云南洱源县4.8级地震:房屋受损442户,无人员伤亡报告
  • 洛阳白马寺的墓主人是狄仁杰?其实这个误解从北宋就开始了
  • 游客大理古城买瓜起争执:170克手机称出340克
  • 印控克什米尔地区发生针对游客枪击事件,造成至少25人丧生
  • 根据学习教育安排,上海市委中心组专题学习总书记力戒形式主义官僚主义重要论述
  • 罗马教皇方济各去世