零基础学Python——第八章:实战项目(1-3)
第八章:实战项目
8.1 命令行应用开发
8.1.1 命令行应用基础
-
命令行应用的特点与优势
命令行应用是不需要图形界面的程序,通过文本命令与用户交互。它们启动快速、资源占用少,适合自动化任务和系统管理。
# 命令行应用的简单示例 import sys# 获取命令行参数 if len(sys.argv) > 1:print(f"你好,{sys.argv[1]}!") else:print("请提供你的名字作为参数")# 运行方式:python script.py 张三 # 输出:你好,张三!
-
命令行参数处理
命令行参数是用户在启动程序时提供的额外信息,可以影响程序的行为。Python提供了多种处理命令行参数的方法。
import argparse# 创建参数解析器 parser = argparse.ArgumentParser(description="一个简单的计算器程序")# 添加参数 parser.add_argument("operation", choices=["add", "subtract", "multiply", "divide"], help="要执行的运算") parser.add_argument("x", type=float, help="第一个数字") parser.add_argument("y", type=float, help="第二个数字")# 解析参数 args = parser.parse_args()# 根据参数执行操作 if args.operation == "add":result = args.x + args.y elif args.operation == "subtract":result = args.x - args.y elif args.operation == "multiply":result = args.x * args.y elif args.operation == "divide":if args.y == 0:print("错误:除数不能为零")sys.exit(1)result = args.x / args.yprint(f"结果: {result}")# 运行方式:python calculator.py add 5 3 # 输出:结果: 8.0
-
用户交互设计
良好的命令行应用应该提供清晰的提示和反馈,让用户知道程序在做什么,以及如何正确使用。
def get_user_input(prompt, validator=None):"""获取用户输入并验证"""while True:user_input = input(prompt)if validator is None or validator(user_input):return user_inputprint("输入无效,请重试。")# 验证函数示例 def is_number(value):try:float(value)return Trueexcept ValueError:return False# 使用示例 name = get_user_input("请输入你的名字: ") age = get_user_input("请输入你的年龄: ", is_number)print(f"你好,{name}!你今年{age}岁。")
8.1.2 个人任务管理器项目
-
项目需求分析
我们将开发一个简单的命令行任务管理器,允许用户添加、查看、完成和删除任务。任务数据将保存在文本文件中。
# 任务管理器的核心功能 # 1. 添加新任务 # 2. 查看所有任务 # 3. 标记任务为已完成 # 4. 删除任务 # 5. 保存任务到文件 # 6. 从文件加载任务
-
数据结构设计
我们需要设计一个合适的数据结构来表示任务及其状态。
class Task:def __init__(self, id, description, completed=False):self.id = id # 任务IDself.description = description # 任务描述self.completed = completed # 任务状态def __str__(self):status = "[完成]" if self.completed else "[待办]"return f"{self.id}. {status} {self.description}"
-
功能实现
下面是任务管理器的核心功能实现。
import os import jsonclass TaskManager:def __init__(self, file_path="tasks.json"):self.tasks = [] # 任务列表self.file_path = file_path # 任务存储文件路径self.load_tasks() # 加载任务def add_task(self, description):"""添加新任务"""# 生成新任务IDtask_id = 1 if not self.tasks else max(task.id for task in self.tasks) + 1# 创建新任务并添加到列表task = Task(task_id, description)self.tasks.append(task)self.save_tasks()return taskdef list_tasks(self):"""列出所有任务"""if not self.tasks:print("没有任务。")returnfor task in self.tasks:print(task)def complete_task(self, task_id):"""将任务标记为已完成"""for task in self.tasks:if task.id == task_id:task.completed = Trueself.save_tasks()return Truereturn Falsedef delete_task(self, task_id):"""删除任务"""for i, task in enumerate(self.tasks):if task.id == task_id:del self.tasks[i]self.save_tasks()return Truereturn Falsedef save_tasks(self):"""保存任务到文件"""# 将任务对象转换为字典tasks_data = [{"id": task.id, "description": task.description, "completed": task.completed}for task in self.tasks]# 写入JSON文件with open(self.file_path, "w", encoding="utf-8") as f:json.dump(tasks_data, f, ensure_ascii=False, indent=2)def load_tasks(self):"""从文件加载任务"""if not os.path.exists(self.file_path):returntry:with open(self.file_path, "r", encoding="utf-8") as f:tasks_data = json.load(f)# 将字典转换为任务对象self.tasks = [Task(item["id"], item["description"], item["completed"])for item in tasks_data]except (json.JSONDecodeError, KeyError):print("加载任务时出错,将使用空任务列表。")self.tasks = []
-
主程序与用户界面
最后,我们需要创建主程序和用户界面,将所有功能整合起来。
def main():task_manager = TaskManager()while True:print("\n===== 个人任务管理器 =====")print("1. 添加任务")print("2. 查看所有任务")print("3. 标记任务为已完成")print("4. 删除任务")print("0. 退出程序")choice = input("\n请选择操作 [0-4]: ")if choice == "1":description = input("请输入任务描述: ")task = task_manager.add_task(description)print(f"已添加任务: {task}")elif choice == "2":print("\n所有任务:")task_manager.list_tasks()elif choice == "3":task_id = int(input("请输入要完成的任务ID: "))if task_manager.complete_task(task_id):print(f"任务 {task_id} 已标记为完成")else:print(f"未找到ID为 {task_id} 的任务")elif choice == "4":task_id = int(input("请输入要删除的任务ID: "))if task_manager.delete_task(task_id):print(f"任务 {task_id} 已删除")else:print(f"未找到ID为 {task_id} 的任务")elif choice == "0":print("感谢使用个人任务管理器,再见!")breakelse:print("无效的选择,请重试。")if __name__ == "__main__":main()
-
项目扩展思路
这个简单的任务管理器还可以进一步扩展,例如:
- 添加任务优先级
- 添加任务截止日期
- 添加任务分类或标签
- 实现任务搜索功能
- 添加任务提醒功能
8.2 简单网页爬虫
8.2.1 网页爬虫基础
-
网页爬虫的概念与原理
网页爬虫是一种自动获取网页内容的程序,它模拟人类浏览网页的行为,但速度更快、更自动化。爬虫的基本原理是发送HTTP请求,获取网页内容,然后解析提取所需信息。
# 网页爬虫的基本流程 # 1. 发送HTTP请求获取网页 # 2. 解析网页内容 # 3. 提取所需数据 # 4. 存储数据 # 5. (可选)继续爬取其他相关网页
-
网页结构与HTML解析
网页主要由HTML构成,要提取网页中的信息,需要了解HTML结构并使用合适的工具解析它。
import requests from bs4 import BeautifulSoup# 获取网页内容 url = "https://example.com" response = requests.get(url) html_content = response.text# 解析HTML soup = BeautifulSoup(html_content, "html.parser")# 提取标题 title = soup.title.text print(f"网页标题: {title}")# 提取所有段落文本 paragraphs = soup.find_all("p") for i, p in enumerate(paragraphs):print(f"段落 {i+1}: {p.text.strip()}")
-
网络请求与响应
爬虫需要发送网络请求并处理服务器的响应,Python的requests库使这一过程变得简单。
import requests# 发送GET请求 response = requests.get("https://api.github.com/users/python")# 检查响应状态 if response.status_code == 200:# 解析JSON响应data = response.json()print(f"用户名: {data['login']}")print(f"简介: {data['bio']}")print(f"关注者: {data['followers']}") else:print(f"请求失败,状态码: {response.status_code}")
-
爬虫伦理与法律问题
开发爬虫时,需要遵守网站的robots.txt规则,控制爬取速度,并尊重版权和隐私。
import requests from urllib.robotparser import RobotFileParser import timedef is_crawling_allowed(url, user_agent="*"):"""检查是否允许爬取指定URL"""rp = RobotFileParser()robots_url = f"{url.split('//', 1)[0]}//{url.split('//', 1)[1].split('/', 1)[0]}/robots.txt"rp.set_url(robots_url)rp.read()return rp.can_fetch(user_agent, url)# 使用示例 url = "https://example.com/page" if is_crawling_allowed(url):print("允许爬取该网页")response = requests.get(url)# 处理响应...# 控制爬取速度time.sleep(1) # 每次请求间隔1秒 else:print("robots.txt不允许爬取该网页")
8.2.2 热门电影信息爬虫项目
-
项目需求分析
我们将开发一个爬虫,从电影评分网站获取热门电影的信息,包括标题、评分、导演、主演和简介等。
# 电影信息爬虫的核心功能 # 1. 获取热门电影列表页面 # 2. 提取电影基本信息 # 3. 获取每部电影的详情页面 # 4. 提取详细信息 # 5. 保存电影数据
-
数据抓取实现
下面是电影信息爬虫的核心实现。注意:实际使用时需要根据目标网站的具体结构调整选择器。
import requests from bs4 import BeautifulSoup import csv import time import randomclass MovieScraper:def __init__(self, base_url):self.base_url = base_urlself.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"}def get_page(self, url):"""获取网页内容"""try:response = requests.get(url, headers=self.headers)response.raise_for_status() # 如果请求失败,抛出异常return response.textexcept requests.exceptions.RequestException as e:print(f"获取页面失败: {e}")return Nonedef parse_movie_list(self, html):"""解析电影列表页面,提取电影基本信息和详情页链接"""soup = BeautifulSoup(html, "html.parser")movies = []# 注意:以下选择器需要根据实际网站结构调整movie_items = soup.select(".movie-item") # 假设每部电影在一个class为movie-item的div中for item in movie_items:movie = {"title": item.select_one(".title").text.strip(),"rating": item.select_one(".rating").text.strip(),"detail_url": self.base_url + item.select_one("a")["href"]}movies.append(movie)return moviesdef parse_movie_detail(self, html):"""解析电影详情页面,提取详细信息"""soup = BeautifulSoup(html, "html.parser")# 注意:以下选择器需要根据实际网站结构调整detail = {"director": soup.select_one(".director").text.strip(),"actors": [actor.text.strip() for actor in soup.select(".actors .actor")],"genres": [genre.text.strip() for genre in soup.select(".genres .genre")],"release_date": soup.select_one(".release-date").text.strip(),"duration": soup.select_one(".duration").text.strip(),"summary": soup.select_one(".summary").text.strip()}return detaildef scrape_movies(self, pages=1):"""爬取指定页数的电影信息"""all_movies = []for page in range(1, pages + 1):print(f"正在爬取第 {page} 页...")list_url = f"{self.base_url}/movies?page={page}"html = self.get_page(list_url)if not html:continuemovies = self.parse_movie_list(html)for movie in movies:print(f"正在爬取电影 '{movie['title']}' 的详细信息...")detail_html = self.get_page(movie["detail_url"])if detail_html:details = self.parse_movie_detail(detail_html)movie.update(details)all_movies.append(movie)# 添加随机延迟,避免请求过于频繁time.sleep(random.uniform(1, 3))return all_moviesdef save_to_csv(self, movies, filename="movies.csv"):"""将电影信息保存到CSV文件"""if not movies:print("没有电影数据可保存")return# 获取所有可能的字段fields = set()for movie in movies:fields.update(movie.keys())with open(filename, "w", newline="", encoding="utf-8") as f:writer = csv.DictWriter(f, fieldnames=sorted(fields))writer.writeheader()writer.writerows(movies)print(f"已将 {len(movies)} 部电影信息保存到 {filename}")
-
主程序与用户界面
下面是电影爬虫的主程序,提供简单的命令行界面。
def main():print("===== 热门电影信息爬虫 =====\n")# 注意:请替换为实际要爬取的网站URLbase_url = input("请输入电影网站基础URL (例如 https://example.com): ")pages = int(input("请输入要爬取的页数: "))output_file = input("请输入保存文件名 (默认为 movies.csv): ") or "movies.csv"scraper = MovieScraper(base_url)print("\n开始爬取电影信息...\n")movies = scraper.scrape_movies(pages)if movies:scraper.save_to_csv(movies, output_file)print(f"\n爬取完成!共获取 {len(movies)} 部电影的信息。")else:print("\n未能获取任何电影信息。")if __name__ == "__main__":main()
-
项目扩展思路
这个电影爬虫项目还可以进一步扩展:
- 添加多线程或异步爬取,提高效率
- 实现增量爬取,只获取新上映的电影
- 添加数据库存储,而不仅仅是CSV文件
- 开发简单的Web界面展示爬取结果
- 添加电影海报图片下载功能
8.3 数据分析项目
8.3.1 数据分析基础
-
数据分析的流程与方法
数据分析通常包括数据收集、清洗、探索、建模和可视化等步骤,目的是从数据中提取有用的信息和洞察。
# 数据分析的基本流程 # 1. 数据收集:获取原始数据 # 2. 数据清洗:处理缺失值、异常值等 # 3. 数据探索:了解数据分布和特征 # 4. 数据分析:应用统计方法提取信息 # 5. 数据可视化:直观展示分析结果
-
常用数据分析库介绍
Python有丰富的数据分析库,如NumPy、Pandas、Matplotlib等,它们提供了强大的数据处理和可视化功能。
import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns# NumPy示例:创建数组并进行基本统计 data = np.random.normal(0, 1, 1000) # 生成1000个正态分布随机数 print(f"均值: {np.mean(data):.4f}") print(f"标准差: {np.std(data):.4f}") print(f"最小值: {np.min(data):.4f}") print(f"最大值: {np.max(data):.4f}")# Pandas示例:创建数据框并进行基本操作 df = pd.DataFrame({"A": np.random.randint(0, 10, 10),"B": np.random.normal(0, 1, 10),"C": pd.date_range("2023-01-01", periods=10) }) print("\nPandas数据框:") print(df.head()) print("\n数据描述:") print(df.describe())# Matplotlib示例:绘制简单图表 plt.figure(figsize=(10, 4))plt.subplot(1, 2, 1) plt.hist(data, bins=30) plt.title("正态分布直方图")plt.subplot(1, 2, 2) plt.boxplot(data) plt.title("箱线图")plt.tight_layout() plt.show()
-
数据可视化技巧
数据可视化是数据分析的重要组成部分,好的可视化可以直观地展示数据特征和分析结果。
import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns# 设置样式 sns.set(style="whitegrid")# 创建示例数据 categories = ["A", "B", "C", "D", "E"] values = [23, 45, 56, 78, 32]# 创建简单的条形图 plt.figure(figsize=(10, 6))# 使用Seaborn绘制条形图 ax = sns.barplot(x=categories, y=values, palette="viridis")# 添加数据标签 for i, v in enumerate(values):ax.text(i, v + 1, str(v), ha="center")# 添加标题和标签 plt.title("各类别数值比较", fontsize=15) plt.xlabel("类别", fontsize=12) plt.ylabel("数值", fontsize=12)# 调整布局并显示 plt.tight_layout() plt.show()
8.3.2 销售数据分析项目
-
项目需求分析
我们将开发一个销售数据分析项目,分析一家零售商的销售数据,包括销售趋势、产品表现、客户行为等方面。
# 销售数据分析的核心内容 # 1. 销售趋势分析:按时间查看销售变化 # 2. 产品分析:热销产品、利润率等 # 3. 客户分析:客户分布、购买行为等 # 4. 地区分析:不同地区的销售情况
-
数据准备与清洗
首先,我们需要加载销售数据并进行必要的清洗和预处理。
import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns# 设置可视化样式 sns.set(style="whitegrid") plt.rcParams["font.sans-serif"] = ["SimHei"] # 用来正常显示中文 plt.rcParams["axes.unicode_minus"] = False # 用来正常显示负号def load_and_clean_data(file_path):"""加载并清洗销售数据"""# 加载数据print(f"正在加载数据: {file_path}")df = pd.read_csv(file_path)# 显示基本信息print(f"\n数据集形状: {df.shape}")print("\n数据集前5行:")print(df.head())# 检查缺失值missing = df.isnull().sum()if missing.sum() > 0:print("\n缺失值统计:")print(missing[missing > 0])else:print("\n数据集没有缺失值")# 数据类型转换if "订单日期" in df.columns:df["订单日期"] = pd.to_datetime(df["订单日期"])df["年"] = df["订单日期"].dt.yeardf["月"] = df["订单日期"].dt.monthdf["日"] = df["订单日期"].dt.daydf["星期"] = df["订单日期"].dt.day_name()# 处理缺失值if missing.sum() > 0:# 数值型列用中位数填充numeric_cols = df.select_dtypes(include=[np.number]).columnsfor col in numeric_cols:if df[col].isnull().sum() > 0:df[col].fillna(df[col].median(), inplace=True)# 分类型列用众数填充categorical_cols = df.select_dtypes(include=['object']).columnsfor col in categorical_cols:if df[col].isnull().sum() > 0:df[col].fillna(df[col].mode()[0], inplace=True)return df
-
销售趋势分析
接下来,我们分析销售数据的时间趋势,了解销售额的变化规律。
def analyze_sales_trend(df):"""分析销售趋势"""print("\n===== 销售趋势分析 =====")# 按月汇总销售额monthly_sales = df.groupby(["年", "月"])["销售额"].sum().reset_index()# 创建时间索引monthly_sales["日期"] = pd.to_datetime(monthly_sales["年"].astype(str) + "-" + monthly_sales["月"].astype(str) + "-1")monthly_sales.sort_values("日期", inplace=True)# 绘制月度销售趋势图plt.figure(figsize=(12, 6))plt.plot(monthly_sales["日期"], monthly_sales["销售额"], marker="o", linestyle="-")plt.title("月度销售额趋势", fontsize=15)plt.xlabel("日期", fontsize=12)plt.ylabel("销售额(元)", fontsize=12)plt.grid(True, linestyle="--", alpha=0.7)plt.xticks(rotation=45)plt.tight_layout()# 计算同比增长率if len(monthly_sales["年"].unique()) > 1:print("\n年度销售额比较:")yearly_sales = df.groupby("年")["销售额"].sum()for i in range(1, len(yearly_sales)):current_year = yearly_sales.index[i]prev_year = yearly_sales.index[i-1]growth_rate = (yearly_sales[current_year] - yearly_sales[prev_year]) / yearly_sales[prev_year] * 100print(f"{current_year}年销售额: {yearly_sales[current_year]:.2f}元, 同比增长: {growth_rate:.2f}%")# 分析季节性模式seasonal_sales = df.groupby("月")["销售额"].mean().reset_index()plt.figure(figsize=(10, 5))sns.barplot(x="月", y="销售额", data=seasonal_sales, palette="viridis")plt.title("月均销售额分布(季节性分析)", fontsize=15)plt.xlabel("月份", fontsize=12)plt.ylabel("平均销售额(元)", fontsize=12)plt.tight_layout()return monthly_sales
-
产品分析
分析不同产品的销售情况,找出热销产品和高利润产品。
def analyze_products(df):"""分析产品销售情况"""print("\n===== 产品分析 =====")# 确保有产品相关列if "产品名称" not in df.columns or "销售额" not in df.columns:print("数据中缺少产品名称或销售额列,无法进行产品分析")return# 按产品汇总销售数据product_sales = df.groupby("产品名称").agg({"销售额": "sum","销售数量": "sum","订单ID": "count" # 假设订单ID列表示订单数}).reset_index()# 计算每个产品的平均单价product_sales["平均单价"] = product_sales["销售额"] / product_sales["销售数量"]# 按销售额排序product_sales.sort_values("销售额", ascending=False, inplace=True)# 显示销售额最高的前10个产品print("\n销售额最高的前10个产品:")print(product_sales.head(10)[["产品名称", "销售额", "销售数量", "平均单价"]])# 可视化产品销售额分布plt.figure(figsize=(12, 6))top_products = product_sales.head(10)sns.barplot(x="销售额", y="产品名称", data=top_products, palette="viridis")plt.title("销售额最高的10个产品", fontsize=15)plt.xlabel("销售额(元)", fontsize=12)plt.ylabel("产品名称", fontsize=12)plt.tight_layout()# 如果有利润数据,分析利润率if "利润" in df.columns:product_sales["利润"] = df.groupby("产品名称")["利润"].sum().valuesproduct_sales["利润率"] = product_sales["利润"] / product_sales["销售额"] * 100# 按利润率排序product_sales.sort_values("利润率", ascending=False, inplace=True)print("\n利润率最高的前10个产品:")print(product_sales.head(10)[["产品名称", "销售额", "利润", "利润率"]])# 可视化利润率分布plt.figure(figsize=(12, 6))top_profit_products = product_sales.head(10)sns.barplot(x="利润率", y="产品名称", data=top_profit_products, palette="viridis")plt.title("利润率最高的10个产品", fontsize=15)plt.xlabel("利润率(%)", fontsize=12)plt.ylabel("产品名称", fontsize=12)plt.tight_layout()return product_sales
-
客户分析
分析客户购买行为,找出高价值客户和客户分布特征。
def analyze_customers(df):"""分析客户购买行为"""print("\n===== 客户分析 =====")# 确保有客户相关列if "客户ID" not in df.columns or "销售额" not in df.columns:print("数据中缺少客户ID或销售额列,无法进行客户分析")return# 按客户汇总销售数据customer_sales = df.groupby("客户ID").agg({"销售额": "sum","订单ID": pd.Series.nunique, # 计算不同订单数"订单日期": "count" # 计算购买次数}).reset_index()# 计算客户平均订单金额customer_sales["平均订单金额"] = customer_sales["销售额"] / customer_sales["订单ID"]# 按销售额排序customer_sales.sort_values("销售额", ascending=False, inplace=True)# 显示销售额最高的前10个客户print("\n消费最高的前10个客户:")print(customer_sales.head(10)[["客户ID", "销售额", "订单ID", "平均订单金额"]])# 客户分层分析(RFM模型简化版)if "订单日期" in df.columns:# 计算最近一次购买日期latest_date = df["订单日期"].max()customer_rfm = df.groupby("客户ID").agg({"订单日期": lambda x: (latest_date - x.max()).days, # 最近购买时间(天数)"订单ID": pd.Series.nunique, # 购买频率"销售额": "sum" # 消费金额}).reset_index()customer_rfm.columns = ["客户ID", "最近购买时间", "购买频率", "消费金额"]# 简单的客户分层customer_rfm["价值分数"] = 0# 最近购买时间得分(越小越好)customer_rfm.loc[customer_rfm["最近购买时间"] <= 30, "价值分数"] += 3customer_rfm.loc[(customer_rfm["最近购买时间"] > 30) & (customer_rfm["最近购买时间"] <= 90), "价值分数"] += 2customer_rfm.loc[customer_rfm["最近购买时间"] > 90, "价值分数"] += 1# 购买频率得分customer_rfm.loc[customer_rfm["购买频率"] >= 5, "价值分数"] += 3customer_rfm.loc[(customer_rfm["购买频率"] >= 2) & (customer_rfm["购买频率"] < 5), "价值分数"] += 2customer_rfm.loc[customer_rfm["购买频率"] < 2, "价值分数"] += 1# 消费金额得分threshold_high = customer_rfm["消费金额"].quantile(0.75)threshold_low = customer_rfm["消费金额"].quantile(0.25)customer_rfm.loc[customer_rfm["消费金额"] >= threshold_high, "价值分数"] += 3customer_rfm.loc[(customer_rfm["消费金额"] >= threshold_low) & (customer_rfm["消费金额"] < threshold_high), "价值分数"] += 2customer_rfm.loc[customer_rfm["消费金额"] < threshold_low, "价值分数"] += 1# 客户分层customer_rfm["客户类型"] = "一般客户"customer_rfm.loc[customer_rfm["价值分数"] >= 8, "客户类型"] = "高价值客户"customer_rfm.loc[customer_rfm["价值分数"] <= 4, "客户类型"] = "低活跃客户"# 统计各类客户数量customer_type_counts = customer_rfm["客户类型"].value_counts()print("\n客户分层统计:")print(customer_type_counts)# 可视化客户分层plt.figure(figsize=(10, 6))customer_type_counts.plot(kind="pie", autopct="%1.1f%%", colors=sns.color_palette("viridis"))plt.title("客户类型分布", fontsize=15)plt.ylabel("")plt.tight_layout()return customer_sales
-
地区分析
分析不同地区的销售情况,找出销售热点区域。
def analyze_regions(df):"""分析地区销售情况"""print("\n===== 地区分析 =====")# 确保有地区相关列if "地区" not in df.columns and "省份" not in df.columns and "城市" not in df.columns:print("数据中缺少地区相关列,无法进行地区分析")return# 确定使用哪个地区列region_col = Nonefor col in ["省份", "地区", "城市"]:if col in df.columns:region_col = colbreakif region_col is None:return# 按地区汇总销售数据region_sales = df.groupby(region_col).agg({"销售额": "sum","订单ID": pd.Series.nunique,"客户ID": pd.Series.nunique}).reset_index()# 计算每个地区的平均订单金额region_sales["平均订单金额"] = region_sales["销售额"] / region_sales["订单ID"]# 按销售额排序region_sales.sort_values("销售额", ascending=False, inplace=True)# 显示销售额最高的前10个地区print(f"\n销售额最高的前10个{region_col}:")print(region_sales.head(10)[[region_col, "销售额", "订单ID", "客户ID", "平均订单金额"]])# 可视化地区销售额分布plt.figure(figsize=(12, 6))top_regions = region_sales.head(10)sns.barplot(x="销售额", y=region_col, data=top_regions, palette="viridis")plt.title(f"销售额最高的10个{region_col}", fontsize=15)plt.xlabel("销售额(元)", fontsize=12)plt.ylabel(region_col, fontsize=12)plt.tight_layout()# 如果有利润数据,分析地区利润率if "利润" in df.columns:region_sales["利润"] = df.groupby(region_col)["利润"].sum().valuesregion_sales["利润率"] = region_sales["利润"] / region_sales["销售额"] * 100# 按利润率排序region_sales.sort_values("利润率", ascending=False, inplace=True)print(f"\n利润率最高的前10个{region_col}:")print(region_sales.head(10)[[region_col, "销售额", "利润", "利润率"]])return region_sales
-
主程序与报告生成
最后,我们创建主程序,整合所有分析功能,并生成完整的分析报告。
def main():print("===== 销售数据分析系统 =====\n")# 获取数据文件路径file_path = input("请输入销售数据CSV文件路径: ")try:# 加载并清洗数据df = load_and_clean_data(file_path)# 执行各项分析monthly_sales = analyze_sales_trend(df)product_sales = analyze_products(df)customer_sales = analyze_customers(df)region_sales = analyze_regions(df)# 保存分析结果save_results = input("\n是否保存分析结果? (y/n): ").lower()if save_results == 'y':# 创建结果目录import osresults_dir = "sales_analysis_results"if not os.path.exists(results_dir):os.makedirs(results_dir)# 保存图表plt.figure(figsize=(10, 8))plt.text(0.5, 0.5, "销售数据分析报告\n\n" + \f"分析日期: {pd.Timestamp.now().strftime('%Y-%m-%d')}\n" + \f"数据记录数: {len(df)}\n" + \f"分析时段: {df['订单日期'].min().strftime('%Y-%m-%d')} 至 {df['订单日期'].max().strftime('%Y-%m-%d')}\n\n" + \"主要发现:\n" + \"1. 销售趋势分析显示...\n" + \"2. 产品分析显示...\n" + \"3. 客户分析显示...\n" + \"4. 地区分析显示...",ha='center', va='center', fontsize=12)plt.axis('off')plt.savefig(f"{results_dir}/分析报告封面.png", dpi=300, bbox_inches='tight')# 保存数据表if monthly_sales is not None:monthly_sales.to_csv(f"{results_dir}/月度销售趋势.csv", index=False, encoding="utf-8-sig")if product_sales is not None:product_sales.to_csv(f"{results_dir}/产品销售分析.csv", index=False, encoding="utf-8-sig")if customer_sales is not None:customer_sales.to_csv(f"{results_dir}/客户销售分析.csv", index=False, encoding="utf-8-sig")if region_sales is not None:region_sales.to_csv(f"{results_dir}/地区销售分析.csv", index=False, encoding="utf-8-sig")print(f"\n分析结果已保存到 {results_dir} 目录")print("\n分析完成!")except Exception as e:print(f"分析过程中出错: {e}")if __name__ == "__main__":main()
-
项目扩展思路
这个销售数据分析项目还可以进一步扩展:
- 添加更复杂的统计分析,如相关性分析、回归分析等
- 实现销售预测功能,预测未来销售趋势
- 添加交互式数据可视化,使用Plotly或Dash
- 开发Web界面,让用户上传数据并查看分析结果
- 添加自动化报告生成功能,生成PDF或HTML报告