新能源汽车可视化大屏系统毕业设计
以下是一个基于Python和Flask框架的新能源汽车可视化大屏系统后台代码示例。这个系统提供API接口用于前端大屏展示新能源汽车相关数据。
- 主应用文件 (app.py)
python
from flask import Flask, jsonify, request
from flask_cors import CORS
import random
from datetime import datetime, timedelta
import time
import threading
import sqlite3
from collections import defaultdict
app = Flask(name)
CORS(app) # 允许跨域请求
模拟数据库
DATABASE = ‘new_energy_vehicles.db’
def init_db():
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
# 创建车辆信息表
c.execute('''CREATE TABLE IF NOT EXISTS vehicles(id INTEGER PRIMARY KEY AUTOINCREMENT,vin TEXT UNIQUE,brand TEXT,model TEXT,battery_capacity REAL,range INTEGER,production_date TEXT,status TEXT)''')# 创建充电记录表
c.execute('''CREATE TABLE IF NOT EXISTS charging_records(id INTEGER PRIMARY KEY AUTOINCREMENT,vin TEXT,station_id TEXT,start_time TEXT,end_time TEXT,energy_consumed REAL,cost REAL)''')# 创建行驶记录表
c.execute('''CREATE TABLE IF NOT EXISTS driving_records(id INTEGER PRIMARY KEY AUTOINCREMENT,vin TEXT,start_time TEXT,end_time TEXT,distance REAL,energy_consumed REAL,avg_speed REAL)''')# 创建充电站表
c.execute('''CREATE TABLE IF NOT EXISTS charging_stations(id INTEGER PRIMARY KEY AUTOINCREMENT,station_id TEXT UNIQUE,name TEXT,location TEXT,total_ports INTEGER,available_ports INTEGER,status TEXT)''')conn.commit()
conn.close()
初始化数据库
init_db()
模拟数据生成器
class DataGenerator:
@staticmethod
def generate_vehicle_data():
brands = [‘特斯拉’, ‘比亚迪’, ‘蔚来’, ‘小鹏’, ‘理想’, ‘广汽埃安’, ‘极氪’, ‘哪吒’]
models = {
‘特斯拉’: [‘Model 3’, ‘Model Y’, ‘Model S’, ‘Model X’],
‘比亚迪’: [‘汉EV’, ‘唐EV’, ‘秦PLUS EV’, ‘海豚’],
‘蔚来’: [‘ES6’, ‘ES8’, ‘ET7’, ‘ET5’],
‘小鹏’: [‘P7’, ‘G3’, ‘P5’, ‘G9’],
‘理想’: [‘L9’, ‘L8’, ‘L7’, ‘ONE’],
‘广汽埃安’: [‘LX’, ‘V’, ‘Y’, ‘S’],
‘极氪’: [‘001’, ‘009’],
‘哪吒’: [‘U’, ‘V’, ‘S’]
}
brand = random.choice(brands)model = random.choice(models[brand])battery_capacity = round(random.uniform(30, 100), 1)vehicle_range = int(battery_capacity * random.uniform(5, 7))return {'vin': f'LV{random.randint(1000000000000000, 9999999999999999)}','brand': brand,'model': model,'battery_capacity': battery_capacity,'range': vehicle_range,'production_date': f'{random.randint(2018, 2023)}-{random.randint(1, 12):02d}-{random.randint(1, 28):02d}','status': random.choice(['行驶中', '充电中', '空闲', '维修中'])}@staticmethod
def generate_charging_station_data():locations = ['北京', '上海', '广州', '深圳', '杭州', '成都', '重庆', '武汉']prefixes = ['国家电网', '特来电', '星星充电', '南方电网', '特斯拉超充']return {'station_id': f'CS{random.randint(100000, 999999)}','name': f'{random.choice(prefixes)}{random.choice(["中心站", "快充站", "超级充电站"])}','location': f'{random.choice(locations)}{random.choice(["市", ""])}{random.choice(["朝阳区", "浦东新区", "天河区", "南山区", "余杭区", "高新区"])}','total_ports': random.randint(4, 20),'available_ports': random.randint(0, 4),'status': random.choice(['运营中', '建设中', '维护中'])}
初始化一些模拟数据
def init_sample_data():
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
# 检查是否有数据,没有则插入
c.execute("SELECT COUNT(*) FROM vehicles")
if c.fetchone()[0] == 0:for _ in range(100):vehicle = DataGenerator.generate_vehicle_data()c.execute("INSERT INTO vehicles (vin, brand, model, battery_capacity, range, production_date, status) VALUES (?, ?, ?, ?, ?, ?, ?)",(vehicle['vin'], vehicle['brand'], vehicle['model'], vehicle['battery_capacity'], vehicle['range'], vehicle['production_date'], vehicle['status']))c.execute("SELECT COUNT(*) FROM charging_stations")
if c.fetchone()[0] == 0:for _ in range(20):station = DataGenerator.generate_charging_station_data()c.execute("INSERT INTO charging_stations (station_id, name, location, total_ports, available_ports, status) VALUES (?, ?, ?, ?, ?, ?)",(station['station_id'], station['name'], station['location'], station['total_ports'], station['available_ports'], station['status']))conn.commit()
conn.close()
init_sample_data()
实时数据更新线程
class RealTimeDataUpdater(threading.Thread):
def init(self):
super().init()
self.daemon = True
self.running = True
def run(self):while self.running:try:conn = sqlite3.connect(DATABASE)c = conn.cursor()# 更新车辆状态c.execute("SELECT vin FROM vehicles")vins = [row[0] for row in c.fetchall()]for vin in vins:new_status = random.choice(['行驶中', '充电中', '空闲', '维修中'])c.execute("UPDATE vehicles SET status=? WHERE vin=?", (new_status, vin))# 随机生成行驶或充电记录if new_status == '行驶中':start_time = datetime.now() - timedelta(minutes=random.randint(1, 120))end_time = start_time + timedelta(minutes=random.randint(10, 180))distance = round(random.uniform(5, 200), 1)energy_consumed = round(distance * random.uniform(0.12, 0.18), 1)avg_speed = random.randint(30, 90)c.execute("INSERT INTO driving_records (vin, start_time, end_time, distance, energy_consumed, avg_speed) VALUES (?, ?, ?, ?, ?, ?)",(vin, start_time.isoformat(), end_time.isoformat(), distance, energy_consumed, avg_speed))elif new_status == '充电中':c.execute("SELECT station_id FROM charging_stations WHERE status='运营中' AND available_ports>0 ORDER BY RANDOM() LIMIT 1")station = c.fetchone()if station:station_id = station[0]start_time = datetime.now() - timedelta(minutes=random.randint(1, 60))end_time = start_time + timedelta(minutes=random.randint(30, 180))energy_consumed = round(random.uniform(10, 60), 1)cost = round(energy_consumed * random.uniform(1.2, 2.5), 2)c.execute("INSERT INTO charging_records (vin, station_id, start_time, end_time, energy_consumed, cost) VALUES (?, ?, ?, ?, ?, ?)",(vin, station_id, start_time.isoformat(), end_time.isoformat(), energy_consumed, cost))# 减少可用充电端口c.execute("UPDATE charging_stations SET available_ports=available_ports-1 WHERE station_id=?", (station_id,))# 更新充电站可用端口c.execute("SELECT station_id FROM charging_stations WHERE status='运营中'")stations = [row[0] for row in c.fetchall()]for station_id in stations:c.execute("SELECT available_ports, total_ports FROM charging_stations WHERE station_id=?", (station_id,))available, total = c.fetchone()# 随机增加或减少可用端口change = random.randint(-2, 2)new_available = max(0, min(total, available + change))if new_available != available:c.execute("UPDATE charging_stations SET available_ports=? WHERE station_id=?", (new_available, station_id))conn.commit()conn.close()# 每分钟更新一次time.sleep(60)except Exception as e:print(f"Error in real-time data updater: {e}")time.sleep(10)def stop(self):self.running = False
启动实时数据更新线程
data_updater = RealTimeDataUpdater()
data_updater.start()
API路由
@app.route(‘/api/dashboard/summary’, methods=[‘GET’])
def get_dashboard_summary():
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
# 车辆总数
c.execute("SELECT COUNT(*) FROM vehicles")
total_vehicles = c.fetchone()[0]# 按品牌统计
c.execute("SELECT brand, COUNT(*) FROM vehicles GROUP BY brand")
brands = {row[0]: row[1] for row in c.fetchall()}# 按状态统计
c.execute("SELECT status, COUNT(*) FROM vehicles GROUP BY status")
statuses = {row[0]: row[1] for row in c.fetchall()}# 充电站统计
c.execute("SELECT COUNT(*) FROM charging_stations WHERE status='运营中'")
operating_stations = c.fetchone()[0]c.execute("SELECT SUM(total_ports) FROM charging_stations WHERE status='运营中'")
total_ports = c.fetchone()[0] or 0c.execute("SELECT SUM(available_ports) FROM charging_stations WHERE status='运营中'")
available_ports = c.fetchone()[0] or 0# 今日充电量
today = datetime.now().date().isoformat()
c.execute("SELECT SUM(energy_consumed) FROM charging_records WHERE date(start_time)=?", (today,))
today_charging = c.fetchone()[0] or 0# 今日行驶里程
c.execute("SELECT SUM(distance) FROM driving_records WHERE date(start_time)=?", (today,))
today_distance = c.fetchone()[0] or 0conn.close()return jsonify({'total_vehicles': total_vehicles,'brand_distribution': brands,'status_distribution': statuses,'charging_stations': {'total': operating_stations,'ports': {'total': total_ports,'available': available_ports,'utilization': round((total_ports - available_ports) / total_ports * 100, 1) if total_ports > 0 else 0}},'today_stats': {'charging_energy': round(today_charging, 1),'distance': round(today_distance, 1)},'timestamp': datetime.now().isoformat()
})
@app.route(‘/api/vehicles/list’, methods=[‘GET’])
def get_vehicle_list():
page = int(request.args.get(‘page’, 1))
page_size = int(request.args.get(‘page_size’, 10))
offset = (page - 1) * page_size
conn = sqlite3.connect(DATABASE)
c = conn.cursor()c.execute("SELECT * FROM vehicles LIMIT ? OFFSET ?", (page_size, offset))
vehicles = []
for row in c.fetchall():vehicles.append({'id': row[0],'vin': row[1],'brand': row[2],'model': row[3],'battery_capacity': row[4],'range': row[5],'production_date': row[6],'status': row[7]})c.execute("SELECT COUNT(*) FROM vehicles")
total = c.fetchone()[0]conn.close()return jsonify({'data': vehicles,'pagination': {'page': page,'page_size': page_size,'total': total,'total_pages': (total + page_size - 1) // page_size}
})
@app.route(‘/api/charging/stations’, methods=[‘GET’])
def get_charging_stations():
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
c.execute("SELECT * FROM charging_stations")
stations = []
for row in c.fetchall():stations.append({'id': row[0],'station_id': row[1],'name': row[2],'location': row[3],'total_ports': row[4],'available_ports': row[5],'status': row[6]})conn.close()return jsonify(stations)
@app.route(‘/api/charging/records’, methods=[‘GET’])
def get_charging_records():
days = int(request.args.get(‘days’, 7))
start_date = (datetime.now() - timedelta(days=days)).date().isoformat()
conn = sqlite3.connect(DATABASE)
c = conn.cursor()# 按天统计充电量
c.execute('''SELECT date(start_time) as day, SUM(energy_consumed), COUNT(*)FROM charging_records WHERE date(start_time) >= ?GROUP BY dayORDER BY day''', (start_date,))daily_stats = []
for row in c.fetchall():daily_stats.append({'date': row[0],'energy': round(row[1], 1),'sessions': row[2]})# 按充电站统计
c.execute('''SELECT s.name, COUNT(*) as sessions, SUM(c.energy_consumed) as energyFROM charging_records cJOIN charging_stations s ON c.station_id = s.station_idWHERE date(c.start_time) >= ?GROUP BY s.nameORDER BY energy DESCLIMIT 10''', (start_date,))station_stats = []
for row in c.fetchall():station_stats.append({'station': row[0],'sessions': row[1],'energy': round(row[2], 1)})conn.close()return jsonify({'daily_stats': daily_stats,'station_stats': station_stats
})
@app.route(‘/api/driving/records’, methods=[‘GET’])
def get_driving_records():
days = int(request.args.get(‘days’, 7))
start_date = (datetime.now() - timedelta(days=days)).date().isoformat()
conn = sqlite3.connect(DATABASE)
c = conn.cursor()# 按天统计行驶里程
c.execute('''SELECT date(start_time) as day, SUM(distance), SUM(energy_consumed), COUNT(*)FROM driving_records WHERE date(start_time) >= ?GROUP BY dayORDER BY day''', (start_date,))daily_stats = []
for row in c.fetchall():daily_stats.append({'date': row[0],'distance': round(row[1], 1),'energy': round(row[2], 1),'trips': row[3],'efficiency': round(row[1] / row[2], 1) if row[2] > 0 else 0})# 按品牌统计平均效率
c.execute('''SELECT v.brand, AVG(d.distance/d.energy_consumed) as avg_efficiencyFROM driving_records dJOIN vehicles v ON d.vin = v.vinWHERE date(d.start_time) >= ?GROUP BY v.brandORDER BY avg_efficiency DESC''', (start_date,))brand_efficiency = []
for row in c.fetchall():brand_efficiency.append({'brand': row[0],'efficiency': round(row[1], 1)})conn.close()return jsonify({'daily_stats': daily_stats,'brand_efficiency': brand_efficiency
})
@app.route(‘/api/map/vehicles’, methods=[‘GET’])
def get_map_vehicles():
# 模拟车辆位置数据
vehicles = []
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
c.execute("SELECT vin, brand, model, status FROM vehicles WHERE status IN ('行驶中', '充电中') LIMIT 50")
for row in c.fetchall():# 生成随机位置(中国范围内)lat = round(random.uniform(21.0, 42.0), 6)lng = round(random.uniform(98.0, 122.0), 6)vehicles.append({'vin': row[0],'brand': row[1],'model': row[2],'status': row[3],'location': [lat, lng],'battery': random.randint(10, 90)})conn.close()return jsonify(vehicles)
@app.route(‘/api/map/stations’, methods=[‘GET’])
def get_map_stations():
# 获取充电站位置数据
stations = []
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
c.execute("SELECT station_id, name, location, available_ports, total_ports FROM charging_stations WHERE status='运营中'")
for row in c.fetchall():# 从位置信息中提取坐标(模拟)# 实际应用中应该存储真实的经纬度lat = round(random.uniform(21.0, 42.0), 6)lng = round(random.uniform(98.0, 122.0), 6)stations.append({'id': row[0],'name': row[1],'location': row[2],'coordinates': [lat, lng],'available_ports': row[3],'total_ports': row[4],'utilization': round((row[4] - row[3]) / row[4] * 100, 1) if row[4] > 0 else 0})conn.close()return jsonify(stations)
if name == ‘main’:
app.run(host=‘0.0.0.0’, port=5000, debug=True)
2. 数据库初始化脚本 (init_db.py)
python
import sqlite3
from app import DATABASE, DataGenerator
def init_database():
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
# 删除现有表
c.execute("DROP TABLE IF EXISTS vehicles")
c.execute("DROP TABLE IF EXISTS charging_records")
c.execute("DROP TABLE IF EXISTS driving_records")
c.execute("DROP TABLE IF EXISTS charging_stations")# 重新创建表
c.execute('''CREATE TABLE vehicles(id INTEGER PRIMARY KEY AUTOINCREMENT,vin TEXT UNIQUE,brand TEXT,model TEXT,battery_capacity REAL,range INTEGER,production_date TEXT,status TEXT)''')c.execute('''CREATE TABLE charging_records(id INTEGER PRIMARY KEY AUTOINCREMENT,vin TEXT,station_id TEXT,start_time TEXT,end_time TEXT,energy_consumed REAL,cost REAL)''')c.execute('''CREATE TABLE driving_records(id INTEGER PRIMARY KEY AUTOINCREMENT,vin TEXT,start_time TEXT,end_time TEXT,distance REAL,energy_consumed REAL,avg_speed REAL)''')c.execute('''CREATE TABLE charging_stations(id INTEGER PRIMARY KEY AUTOINCREMENT,station_id TEXT UNIQUE,name TEXT,location TEXT,total_ports INTEGER,available_ports INTEGER,status TEXT)''')# 插入模拟数据
for _ in range(100):vehicle = DataGenerator.generate_vehicle_data()c.execute("INSERT INTO vehicles (vin, brand, model, battery_capacity, range, production_date, status) VALUES (?, ?, ?, ?, ?, ?, ?)",(vehicle['vin'], vehicle['brand'], vehicle['model'], vehicle['battery_capacity'], vehicle['range'], vehicle['production_date'], vehicle['status']))for _ in range(20):station = DataGenerator.generate_charging_station_data()c.execute("INSERT INTO charging_stations (station_id, name, location, total_ports, available_ports, status) VALUES (?, ?, ?, ?, ?, ?)",(station['station_id'], station['name'], station['location'], station['total_ports'], station['available_ports'], station['status']))conn.commit()
conn.close()
print("数据库初始化完成!")
if name == ‘main’:
init_database()
3. 配置文件 (config.py)
python
import osbasedir = os.path.abspath(os.path.dirname(__file__))class Config:# 基础配置SECRET_KEY = os.environ.get('SECRET_KEY') or 'dev-key-123'DATABASE = os.path.join(basedir, 'new_energy_vehicles.db')# API配置API_PREFIX = '/api'API_VERSION = 'v1'# 数据更新间隔(秒)DATA_UPDATE_INTERVAL = 60# 日志配置LOG_FILE = 'nevs.log'LOG_LEVEL = 'INFO'@staticmethoddef init_app(app):passclass DevelopmentConfig(Config):DEBUG = TrueLOG_LEVEL = 'DEBUG'class ProductionConfig(Config):DEBUG = Falseconfig = {'development': DevelopmentConfig,'production': ProductionConfig,'default': DevelopmentConfig
}
- 前端API调用示例
javascript
// 使用axios调用API示例// 1. 获取仪表板摘要数据
axios.get('/api/dashboard/summary').then(response => {console.log('仪表板数据:', response.data);// 处理数据...}).catch(error => {console.error('获取仪表板数据失败:', error);});
// 2. 获取车辆列表
axios.get(‘/api/vehicles/list’, {
params: {
page: 1,
page_size: 10
}
})
.then(response => {
console.log(‘车辆列表:’, response.data);
// 处理数据…
});
// 3. 获取充电记录统计
axios.get(‘/api/charging/records’, {
params: {
days: 30
}
})
.then(response => {
console.log(‘充电记录:’, response.data);
// 处理数据…
});
// 4. 获取地图数据
Promise.all([
axios.get(‘/api/map/vehicles’),
axios.get(‘/api/map/stations’)
])
.then(([vehiclesRes, stationsRes]) => {
console.log(‘地图车辆数据:’, vehiclesRes.data);
console.log(‘地图充电站数据:’, stationsRes.data);
// 处理数据…
});
系统功能说明
数据概览仪表板:
显示车辆总数、品牌分布、状态分布
充电站统计信息
今日充电量和行驶里程
车辆管理:
车辆列表分页查询
车辆状态实时更新
充电管理:
充电站信息查询
充电记录统计
充电站利用率分析
行驶数据分析:
行驶里程统计
能耗效率分析
品牌能耗对比
地图可视化:
车辆实时位置显示
充电站位置及状态显示
部署说明
安装依赖:
pip install flask flask-cors sqlite3
初始化数据库:
python init_db.py
启动服务:
python app.py
访问API:
仪表板数据: http://localhost:5000/api/dashboard/summary
车辆列表: http://localhost:5000/api/vehicles/list
充电站地图数据: http://localhost:5000/api/map/stations
这个系统提供了新能源汽车监控和分析所需的基本API接口,前端可以通过这些接口获取数据并实现可视化大屏展示。