当前位置: 首页 > news >正文

Python项目--基于机器学习的股票预测分析系统

1. 项目介绍

在当今数字化时代,金融市场的数据分析和预测已经成为投资决策的重要依据。本文将详细介绍一个基于Python的股票预测分析系统,该系统利用机器学习算法对历史股票数据进行分析,并预测未来股票价格走势,为投资者提供决策支持。

1.1 项目背景

股票市场充满不确定性,传统的技术分析和基本面分析方法往往依赖于人为判断,存在主观性强、效率低等问题。随着机器学习技术的发展,利用算法对海量历史数据进行分析,挖掘其中的规律和模式,已经成为可能。本项目旨在构建一个完整的股票预测分析系统,集成数据采集、预处理、特征工程、模型训练与评估、预测可视化等功能,为投资决策提供科学依据。

1.2 项目目标

  1. 构建一个完整的股票数据采集与预处理流程
  2. 实现多种机器学习模型用于股票价格预测
  3. 提供直观的数据可视化和分析工具
  4. 开发用户友好的接口,便于投资者使用
  5. 评估不同模型的预测性能,提供最优预测结果

1.3 技术栈

  • 编程语言:Python 3.8+
  • 数据处理:Pandas, NumPy
  • 机器学习框架:Scikit-learn, TensorFlow, Keras
  • 深度学习模型:LSTM, GRU, Transformer
  • 数据可视化:Matplotlib, Seaborn, Plotly
  • Web接口:Flask, Streamlit
  • 数据存储:SQLite, MongoDB
  • API调用:yfinance, alpha_vantage

2. 系统架构

本系统采用模块化设计,包含以下核心组件:

2.1 系统架构图

+------------------------+    +------------------------+    +------------------------+
|                        |    |                        |    |                        |
|    数据采集模块        |    |    数据预处理模块      |    |    特征工程模块        |
|                        |    |                        |    |                        |
+------------------------+    +------------------------+    +------------------------+|                             |                             |v                             v                             v
+------------------------+    +------------------------+    +------------------------+
|                        |    |                        |    |                        |
|    模型训练模块        | <- |    特征选择模块        | <- |    数据存储模块        |
|                        |    |                        |    |                        |
+------------------------+    +------------------------+    +------------------------+|                             ^                             ^v                             |                             |
+------------------------+    +------------------------+    +------------------------+
|                        |    |                        |    |                        |
|    预测评估模块        | -> |    结果可视化模块      | -> |    用户接口模块        |
|                        |    |                        |    |                        |
+------------------------+    +------------------------+    +------------------------+

2.2 模块功能说明

  1. 数据采集模块:负责从各种数据源获取股票历史数据,包括价格、交易量、财务指标等
  2. 数据预处理模块:对原始数据进行清洗、标准化、去噪等处理
  3. 特征工程模块:构建预测模型所需的特征,包括技术指标、统计特征等
  4. 数据存储模块:将处理后的数据存储到数据库中,便于后续分析
  5. 特征选择模块:从众多特征中选择最具预测能力的特征子集
  6. 模型训练模块:实现多种机器学习算法,训练预测模型
  7. 预测评估模块:评估模型性能,生成预测结果
  8. 结果可视化模块:将预测结果以图表形式展示
  9. 用户接口模块:提供友好的用户界面,便于用户操作和查看结果

3. 数据采集与预处理

3.1 数据来源

本系统支持多种数据来源,主要包括:

  1. 公开API

    • Yahoo Finance (yfinance)
    • Alpha Vantage
    • Quandl
    • Tushare (针对中国股市)
  2. CSV文件导入:支持用户上传自定义格式的CSV文件

  3. 数据库导入:支持从SQLite、MongoDB等数据库导入数据

3.2 数据采集实现

以下是使用yfinance库获取股票数据的示例代码:

import yfinance as yf
import pandas as pd
from datetime import datetime, timedeltaclass StockDataCollector:def __init__(self):self.data = Nonedef collect_data(self, ticker, start_date, end_date=None, interval='1d'):"""从Yahoo Finance获取股票历史数据参数:ticker (str): 股票代码,如'AAPL'、'MSFT'start_date (str): 起始日期,格式'YYYY-MM-DD'end_date (str): 结束日期,格式'YYYY-MM-DD',默认为当前日期interval (str): 数据间隔,可选'1d'(日),'1wk'(周),'1mo'(月)返回:pandas.DataFrame: 包含股票历史数据的DataFrame"""if end_date is None:end_date = datetime.now().strftime('%Y-%m-%d')try:stock = yf.Ticker(ticker)self.data = stock.history(start=start_date, end=end_date, interval=interval)print(f"成功获取{ticker}{start_date}{end_date}的历史数据")return self.dataexcept Exception as e:print(f"获取数据时出错: {e}")return Nonedef save_to_csv(self, file_path):"""将数据保存为CSV文件"""if self.data is not None:self.data.to_csv(file_path)print(f"数据已保存至{file_path}")else:print("没有数据可保存")def get_stock_info(self, ticker):"""获取股票基本信息"""try:stock = yf.Ticker(ticker)info = stock.inforeturn infoexcept Exception as e:print(f"获取股票信息时出错: {e}")return None

3.3 数据预处理

原始股票数据通常包含缺失值、异常值等问题,需要进行预处理:

class StockDataPreprocessor:def __init__(self, data=None):self.data = datadef load_data(self, data):"""加载数据"""self.data = datareturn selfdef handle_missing_values(self, method='ffill'):"""处理缺失值"""if self.data is None:print("没有数据可处理")return selfif method == 'ffill':self.data = self.data.fillna(method='ffill')elif method == 'bfill':self.data = self.data.fillna(method='bfill')elif method == 'drop':self.data = self.data.dropna()elif method == 'mean':self.data = self.data.fillna(self.data.mean())return selfdef remove_outliers(self, columns, method='zscore', threshold=3):"""移除异常值"""if self.data is None:print("没有数据可处理")return selfif method == 'zscore':for col in columns:if col in self.data.columns:mean = self.data[col].mean()std = self.data[col].std()self.data = self.data[(self.data[col] - mean).abs() <= threshold * std]return selfdef normalize_data(self, columns, method='minmax'):"""数据标准化"""if self.data is None:print("没有数据可处理")return selfif method == 'minmax':for col in columns:if col in self.data.columns:min_val = self.data[col].min()max_val = self.data[col].max()self.data[col] = (self.data[col] - min_val) / (max_val - min_val)elif method == 'zscore':for col in columns:if col in self.data.columns:mean = self.data[col].mean()std = self.data[col].std()self.data[col] = (self.data[col] - mean) / stdreturn selfdef get_processed_data(self):"""获取处理后的数据"""return self.data## 4. 特征工程特征工程是机器学习模型性能的关键决定因素。在股票预测中,我们需要从原始价格数据中提取有价值的特征。### 4.1 技术指标计算技术指标是股票分析中常用的工具,可以揭示价格趋势、动量和波动性等信息:```python
import numpy as np
import pandas as pd
import talibclass TechnicalIndicators:def __init__(self, data=None):self.data = datadef load_data(self, data):"""加载数据"""self.data = datareturn selfdef add_moving_averages(self, periods=[5, 10, 20, 50, 200]):"""添加移动平均线"""if self.data is None or 'Close' not in self.data.columns:print("数据不包含收盘价")return selffor period in periods:self.data[f'MA_{period}'] = self.data['Close'].rolling(window=period).mean()return selfdef add_exponential_moving_averages(self, periods=[5, 10, 20, 50, 200]):"""添加指数移动平均线"""if self.data is None or 'Close' not in self.data.columns:print("数据不包含收盘价")return selffor period in periods:self.data[f'EMA_{period}'] = self.data['Close'].ewm(span=period, adjust=False).mean()return selfdef add_rsi(self, periods=[14]):"""添加相对强弱指标(RSI)"""if self.data is None or 'Close' not in self.data.columns:print("数据不包含收盘价")return selffor period in periods:delta = self.data['Close'].diff()gain = delta.where(delta > 0, 0)loss = -delta.where(delta < 0, 0)avg_gain = gain.rolling(window=period).mean()avg_loss = loss.rolling(window=period).mean()rs = avg_gain / avg_lossself.data[f'RSI_{period}'] = 100 - (100 / (1 + rs))return selfdef add_macd(self, fast_period=12, slow_period=26, signal_period=9):"""添加MACD指标"""if self.data is None or 'Close' not in self.data.columns:print("数据不包含收盘价")return selfema_fast = self.data['Close'].ewm(span=fast_period, adjust=False).mean()ema_slow = self.data['Close'].ewm(span=slow_period, adjust=False).mean()self.data['MACD'] = ema_fast - ema_slowself.data['MACD_Signal'] = self.data['MACD'].ewm(span=signal_period, adjust=False).mean()self.data['MACD_Hist'] = self.data['MACD'] - self.data['MACD_Signal']return selfdef add_bollinger_bands(self, period=20, std_dev=2):"""添加布林带指标"""if self.data is None or 'Close' not in self.data.columns:print("数据不包含收盘价")return selfself.data[f'BB_Middle_{period}'] = self.data['Close'].rolling(window=period).mean()self.data[f'BB_Std_{period}'] = self.data['Close'].rolling(window=period).std()self.data[f'BB_Upper_{period}'] = self.data[f'BB_Middle_{period}'] + std_dev * self.data[f'BB_Std_{period}']self.data[f'BB_Lower_{period}'] = self.data[f'BB_Middle_{period}'] - std_dev * self.data[f'BB_Std_{period}']return selfdef add_atr(self, period=14):"""添加平均真实范围(ATR)指标"""if self.data is None or not all(col in self.data.columns for col in ['High', 'Low', 'Close']):print("数据不包含必要的价格列")return selfhigh_low = self.data['High'] - self.data['Low']high_close = (self.data['High'] - self.data['Close'].shift()).abs()low_close = (self.data['Low'] - self.data['Close'].shift()).abs()ranges = pd.concat([high_low, high_close, low_close], axis=1)true_range = ranges.max(axis=1)self.data[f'ATR_{period}'] = true_range.rolling(window=period).mean()return selfdef add_stochastic_oscillator(self, k_period=14, d_period=3):"""添加随机指标"""if self.data is None or not all(col in self.data.columns for col in ['High', 'Low', 'Close']):print("数据不包含必要的价格列")return selflow_min = self.data['Low'].rolling(window=k_period).min()high_max = self.data['High'].rolling(window=k_period).max()self.data['%K'] = 100 * ((self.data['Close'] - low_min) / (high_max - low_min))self.data['%D'] = self.data['%K'].rolling(window=d_period).mean()return selfdef add_obv(self):"""添加能量潮(OBV)指标"""if self.data is None or not all(col in self.data.columns for col in ['Close', 'Volume']):print("数据不包含必要的价格和成交量列")return selfobv = [0]for i in range(1, len(self.data)):if self.data['Close'].iloc[i] > self.data['Close'].iloc[i-1]:obv.append(obv[-1] + self.data['Volume'].iloc[i])elif self.data['Close'].iloc[i] < self.data['Close'].iloc[i-1]:obv.append(obv[-1] - self.data['Volume'].iloc[i])else:obv.append(obv[-1])self.data['OBV'] = obvreturn selfdef get_data_with_indicators(self):"""获取添加了技术指标的数据"""return self.data

4.2 特征选择

股票数据可能包含大量特征,但并非所有特征都对预测有帮助。特征选择可以提高模型性能并减少过拟合:

from sklearn.feature_selection import SelectKBest, f_regression, RFE
from sklearn.ensemble import RandomForestRegressorclass FeatureSelector:def __init__(self, data=None):self.data = dataself.selected_features = Nonedef load_data(self, data):"""加载数据"""self.data = datareturn selfdef prepare_data(self, target_col='Close', lag_periods=[1, 2, 3, 5, 10]):"""准备特征和目标变量,创建滞后特征"""if self.data is None:print("没有数据可处理")return None, None# 创建目标变量(下一天的收盘价)self.data['Target'] = self.data[target_col].shift(-1)# 创建滞后特征for lag in lag_periods:for col in self.data.columns:if col != 'Target':self.data[f'{col}_Lag_{lag}'] = self.data[col].shift(lag)# 删除包含NaN的行self.data = self.data.dropna()# 分离特征和目标X = self.data.drop(['Target'], axis=1)y = self.data['Target']return X, ydef select_k_best(self, X, y, k=10):"""使用F值统计量选择最佳特征"""selector = SelectKBest(score_func=f_regression, k=k)selector.fit(X, y)# 获取选中的特征cols = selector.get_support(indices=True)self.selected_features = X.columns[cols].tolist()return X[self.selected_features], self.selected_featuresdef select_with_rfe(self, X, y, n_features=10):"""使用递归特征消除法选择特征"""estimator = RandomForestRegressor(n_estimators=100, random_state=42)selector = RFE(estimator, n_features_to_select=n_features)selector.fit(X, y)# 获取选中的特征cols = selector.get_support(indices=True)self.selected_features = X.columns[cols].tolist()return X[self.selected_features], self.selected_featuresdef select_with_random_forest(self, X, y, threshold=0.01):"""使用随机森林特征重要性选择特征"""rf = RandomForestRegressor(n_estimators=100, random_state=42)rf.fit(X, y)# 获取特征重要性importances = rf.feature_importances_indices = np.argsort(importances)[::-1]# 选择重要性大于阈值的特征self.selected_features = [X.columns[i] for i in indices if importances[i] > threshold]return X[self.selected_features], self.selected_features

5. 模型实现

本系统实现了多种机器学习模型用于股票价格预测,包括传统机器学习模型和深度学习模型。

5.1 数据准备

在训练模型前,需要将数据分为训练集和测试集:

from sklearn.model_selection import train_test_split
import numpy as npclass DataPreparation:def __init__(self, X=None, y=None):self.X = Xself.y = yself.X_train = Noneself.X_test = Noneself.y_train = Noneself.y_test = Nonedef load_data(self, X, y):"""加载特征和目标数据"""self.X = Xself.y = yreturn selfdef train_test_split(self, test_size=0.2, random_state=42):"""划分训练集和测试集"""if self.X is None or self.y is None:print("没有数据可划分")return selfself.X_train, self.X_test, self.y_train, self.y_test = train_test_split(self.X, self.y, test_size=test_size, random_state=random_state, shuffle=False)return selfdef time_series_split(self, test_size=0.2):"""按时间顺序划分训练集和测试集"""if self.X is None or self.y is None:print("没有数据可划分")return self# 计算测试集大小test_index = int(len(self.X) * (1 - test_size))# 按时间顺序划分self.X_train = self.X.iloc[:test_index]self.X_test = self.X.iloc[test_index:]self.y_train = self.y.iloc[:test_index]self.y_test = self.y.iloc[test_index:]return selfdef prepare_lstm_data(self, time_steps=60):"""准备LSTM模型所需的时间序列数据"""if self.X is None or self.y is None:print("没有数据可处理")return None, None, None, None# 将数据转换为numpy数组X_values = self.X.valuesy_values = self.y.valuesX_lstm, y_lstm = [], []for i in range(time_steps, len(X_values)):X_lstm.append(X_values[i-time_steps:i])y_lstm.append(y_values[i])X_lstm, y_lstm = np.array(X_lstm), np.array(y_lstm)# 划分训练集和测试集train_size = int(len(X_lstm) * 0.8)X_train = X_lstm[:train_size]X_test = X_lstm[train_size:]y_train = y_lstm[:train_size]y_test = y_lstm[train_size:]return X_train, X_test, y_train, y_testdef get_train_test_data(self):"""获取划分后的训练集和测试集"""return self.X_train, self.X_test, self.y_train, self.y_test

5.2 传统机器学习模型

实现多种传统机器学习模型用于股票价格预测:

from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.svm import SVR
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import numpy as np
import joblibclass TraditionalModels:def __init__(self):self.models = {}self.best_model = Noneself.best_score = float('inf')def train_linear_regression(self, X_train, y_train):"""训练线性回归模型"""model = LinearRegression()model.fit(X_train, y_train)self.models['LinearRegression'] = modelreturn modeldef train_ridge_regression(self, X_train, y_train, alpha=1.0):"""训练岭回归模型"""model = Ridge(alpha=alpha)model.fit(X_train, y_train)self.models['Ridge'] = modelreturn modeldef train_lasso_regression(self, X_train, y_train, alpha=0.1):"""训练Lasso回归模型"""model = Lasso(alpha=alpha)model.fit(X_train, y_train)self.models['Lasso'] = modelreturn modeldef train_random_forest(self, X_train, y_train, n_estimators=100, max_depth=None):"""训练随机森林模型"""model = RandomForestRegressor(n_estimators=n_estimators, max_depth=max_depth, random_state=42)model.fit(X_train, y_train)self.models['RandomForest'] = modelreturn modeldef train_gradient_boosting(self, X_train, y_train, n_estimators=100, learning_rate=0.1):"""训练梯度提升树模型"""model = GradientBoostingRegressor(n_estimators=n_estimators, learning_rate=learning_rate, random_state=42)model.fit(X_train, y_train)self.models['GradientBoosting'] = modelreturn modeldef train_svr(self, X_train, y_train, kernel='rbf', C=1.0, epsilon=0.1):"""训练支持向量回归模型"""model = SVR(kernel=kernel, C=C, epsilon=epsilon)model.fit(X_train, y_train)self.models['SVR'] = modelreturn modeldef train_all_models(self, X_train, y_train):"""训练所有模型"""self.train_linear_regression(X_train, y_train)self.train_ridge_regression(X_train, y_train)self.train_lasso_regression(X_train, y_train)self.train_random_forest(X_train, y_train)self.train_gradient_boosting(X_train, y_train)self.train_svr(X_train, y_train)return self.modelsdef evaluate_model(self, model, X_test, y_test):"""评估模型性能"""y_pred = model.predict(X_test)mse = mean_squared_error(y_test, y_pred)rmse = np.sqrt(mse)mae = mean_absolute_error(y_test, y_pred)r2 = r2_score(y_test, y_pred)return {'MSE': mse,'RMSE': rmse,'MAE': mae,'R2': r2}def evaluate_all_models(self, X_test, y_test):"""评估所有模型性能"""results = {}for name, model in self.models.items():results[name] = self.evaluate_model(model, X_test, y_test)# 更新最佳模型if results[name]['RMSE'] < self.best_score:self.best_score = results[name]['RMSE']self.best_model = namereturn resultsdef save_model(self, model_name, file_path):"""保存模型"""if model_name in self.models:joblib.dump(self.models[model_name], file_path)print(f"模型已保存至{file_path}")else:print(f"模型{model_name}不存在")def load_model(self, model_name, file_path):"""加载模型"""try:model = joblib.load(file_path)self.models[model_name] = modelprint(f"模型已从{file_path}加载")return modelexcept Exception as e:print(f"加载模型时出错: {e}")return Nonedef get_best_model(self):"""获取性能最佳的模型"""if self.best_model is None:print("尚未评估模型性能")return Nonereturn self.models[self.best_model], self.best_model

5.3 深度学习模型

对于时间序列数据,深度学习模型尤其是LSTM和GRU等循环神经网络具有显著优势:

import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential, load_model, Model
from tensorflow.keras.layers import Dense, LSTM, Dropout, GRU, Input, Bidirectional, Concatenate
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.optimizers import Adam
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as pltclass DeepLearningModels:def __init__(self):self.models = {}self.best_model = Noneself.best_score = float('inf')self.scalers = {}def preprocess_data(self, X_train, X_test, y_train, y_test, feature_range=(0, 1)):"""数据预处理,对每个特征进行标准化"""# 对特征进行标准化X_scaler = MinMaxScaler(feature_range=feature_range)X_train_scaled = X_scaler.fit_transform(X_train)X_test_scaled = X_scaler.transform(X_test)# 对目标变量进行标准化y_scaler = MinMaxScaler(feature_range=feature_range)y_train_scaled = y_scaler.fit_transform(y_train.values.reshape(-1, 1))y_test_scaled = y_scaler.transform(y_test.values.reshape(-1, 1))# 保存缩放器供后续使用self.scalers['X'] = X_scalerself.scalers['y'] = y_scalerreturn X_train_scaled, X_test_scaled, y_train_scaled, y_test_scaleddef reshape_data_for_lstm(self, X_train, X_test):"""将数据重塑为LSTM所需的形状 [samples, time_steps, features]"""# 假设每个样本只有一个时间步X_train_reshaped = X_train.reshape(X_train.shape[0], 1, X_train.shape[1])X_test_reshaped = X_test.reshape(X_test.shape[0], 1, X_test.shape[1])return X_train_reshaped, X_test_reshapeddef build_lstm_model(self, input_shape, units=50, dropout=0.2):"""构建LSTM模型"""model = Sequential()model.add(LSTM(units=units, return_sequences=True, input_shape=input_shape))model.add(Dropout(dropout))model.add(LSTM(units=units, return_sequences=False))model.add(Dropout(dropout))model.add(Dense(units=25))model.add(Dense(units=1))model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')return modeldef build_gru_model(self, input_shape, units=50, dropout=0.2):"""构建GRU模型"""model = Sequential()model.add(GRU(units=units, return_sequences=True, input_shape=input_shape))model.add(Dropout(dropout))model.add(GRU(units=units, return_sequences=False))model.add(Dropout(dropout))model.add(Dense(units=25))model.add(Dense(units=1))model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')return modeldef build_bidirectional_lstm_model(self, input_shape, units=50, dropout=0.2):"""构建双向LSTM模型"""model = Sequential()model.add(Bidirectional(LSTM(units=units, return_sequences=True), input_shape=input_shape))model.add(Dropout(dropout))model.add(Bidirectional(LSTM(units=units, return_sequences=False)))model.add(Dropout(dropout))model.add(Dense(units=25))model.add(Dense(units=1))model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')return modeldef train_model(self, model, X_train, y_train, X_val=None, y_val=None, epochs=100, batch_size=32, model_name=None):"""训练深度学习模型"""callbacks = [EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)]if model_name:callbacks.append(ModelCheckpoint(f'{model_name}.h5', save_best_only=True))# 如果没有提供验证集,使用训练集的20%作为验证集if X_val is None or y_val is None:validation_split = 0.2validation_data = Noneelse:validation_split = 0.0validation_data = (X_val, y_val)history = model.fit(X_train, y_train,epochs=epochs,batch_size=batch_size,validation_split=validation_split,validation_data=validation_data,callbacks=callbacks,verbose=1)if model_name:self.models[model_name] = modelreturn model, historydef evaluate_model(self, model, X_test, y_test):"""评估深度学习模型性能"""# 预测y_pred = model.predict(X_test)# 如果数据经过了标准化,需要还原if 'y' in self.scalers:y_test = self.scalers['y'].inverse_transform(y_test)y_pred = self.scalers['y'].inverse_transform(y_pred)# 计算评估指标mse = np.mean(np.square(y_test - y_pred))rmse = np.sqrt(mse)mae = np.mean(np.abs(y_test - y_pred))# 计算R方ss_tot = np.sum(np.square(y_test - np.mean(y_test)))ss_res = np.sum(np.square(y_test - y_pred))r2 = 1 - (ss_res / ss_tot)return {'MSE': mse,'RMSE': rmse,'MAE': mae,'R2': r2}def predict_future(self, model, last_sequence, n_steps=30, scaler=None):"""预测未来n天的股票价格"""predictions = []current_sequence = last_sequence.copy()for _ in range(n_steps):# 预测下一个值current_pred = model.predict(current_sequence)[0][0]predictions.append(current_pred)# 更新序列用于下一次预测current_sequence = np.roll(current_sequence, -1, axis=1)current_sequence[0, -1, 0] = current_pred# 如果有缩放器,需要还原数据if scaler is not None:predictions = scaler.inverse_transform(np.array(predictions).reshape(-1, 1))return predictionsdef save_model(self, model_name, file_path):"""保存模型"""if model_name in self.models:self.models[model_name].save(file_path)print(f"模型已保存至{file_path}")else:print(f"模型{model_name}不存在")def load_model(self, model_name, file_path):"""加载模型"""try:model = load_model(file_path)self.models[model_name] = modelprint(f"模型已从{file_path}加载")return modelexcept Exception as e:print(f"加载模型时出错: {e}")return Nonedef plot_training_history(self, history, title="模型训练历史"):"""绘制训练过程中的损失曲线"""plt.figure(figsize=(12, 6))plt.plot(history.history['loss'], label='训练集损失')plt.plot(history.history['val_loss'], label='验证集损失')plt.title(title)plt.xlabel('迭代次数')plt.ylabel('损失')plt.legend()plt.grid(True)plt.show()

5.4 集成模型

通过集成多个模型的预测结果,可以进一步提高预测的准确性:

import numpy as np
from sklearn.ensemble import VotingRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_scoreclass EnsembleModel:def __init__(self):self.models = {}self.ensemble_model = Nonedef add_model(self, name, model):"""添加模型到集成中"""self.models[name] = modelreturn selfdef create_voting_ensemble(self, weights=None):"""创建投票集成模型"""if not self.models:print("没有模型可以集成")return Noneestimators = [(name, model) for name, model in self.models.items()]self.ensemble_model = VotingRegressor(estimators=estimators, weights=weights)return self.ensemble_modeldef train_ensemble(self, X_train, y_train):"""训练集成模型"""if self.ensemble_model is None:print("请先创建集成模型")return Noneself.ensemble_model.fit(X_train, y_train)return self.ensemble_modeldef weighted_average_prediction(self, X, weights=None):"""使用加权平均方式集成预测结果"""if not self.models:print("没有模型可以集成")return Nonepredictions = []for name, model in self.models.items():pred = model.predict(X)predictions.append(pred)# 将预测结果转换为数组predictions = np.array(predictions)# 如果没有提供权重,使用平均值if weights is None:weights = np.ones(len(self.models)) / len(self.models)else:# 强制权重和为1weights = np.array(weights) / np.sum(weights)# 计算加权平均预测weighted_pred = np.sum(predictions * weights.reshape(-1, 1), axis=0)return weighted_preddef evaluate_ensemble(self, X_test, y_test):"""评估集成模型性能"""if self.ensemble_model is None:print("请先创建集成模型")return Noney_pred = self.ensemble_model.predict(X_test)mse = mean_squared_error(y_test, y_pred)rmse = np.sqrt(mse)mae = mean_absolute_error(y_test, y_pred)r2 = r2_score(y_test, y_pred)return {'MSE': mse,'RMSE': rmse,'MAE': mae,'R2': r2}def evaluate_weighted_ensemble(self, X_test, y_test, weights=None):"""评估加权集成模型性能"""y_pred = self.weighted_average_prediction(X_test, weights)mse = mean_squared_error(y_test, y_pred)rmse = np.sqrt(mse)mae = mean_absolute_error(y_test, y_pred)r2 = r2_score(y_test, y_pred)return {'MSE': mse,'RMSE': rmse,'MAE': mae,'R2': r2}

6. 数据可视化

数据可视化是股票预测分析系统的重要组成部分,可以直观地展示原始数据、技术指标和预测结果。

6.1 原始数据可视化

使用Matplotlib和Plotly等库可视化股票原始数据:

import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
import numpy as np
from datetime import datetime, timedeltaclass StockDataVisualizer:def __init__(self, data=None):self.data = datadef load_data(self, data):"""加载数据"""self.data = datareturn selfdef plot_stock_price(self, title="股票价格趋势", figsize=(12, 6)):"""使用Matplotlib绘制股票价格趋势图"""if self.data is None or 'Close' not in self.data.columns:print("数据不包含收盘价")return Noneplt.figure(figsize=figsize)plt.plot(self.data.index, self.data['Close'], label='收盘价')# 设置日期格式plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))plt.gca().xaxis.set_major_locator(mdates.MonthLocator())plt.title(title)plt.xlabel('日期')plt.ylabel('价格')plt.legend()plt.grid(True)plt.xticks(rotation=45)plt.tight_layout()return pltdef plot_ohlc(self, title="股票OHLC图", figsize=(12, 6)):"""使用Matplotlib绘制OHLC图"""if self.data is None or not all(col in self.data.columns for col in ['Open', 'High', 'Low', 'Close']):print("数据不包含必要的价格列")return None# 创建图形fig, ax = plt.subplots(figsize=figsize)# 计算柱形图的宽度width = 0.6# 绘制价格柱形图up = self.data[self.data['Close'] >= self.data['Open']]down = self.data[self.data['Close'] < self.data['Open']]# 绘制上涨柱形图(绿色)ax.bar(up.index, up['Close'] - up['Open'], width, bottom=up['Open'], color='g')ax.bar(up.index, up['High'] - up['Close'], width/5, bottom=up['Close'], color='g')ax.bar(up.index, up['Open'] - up['Low'], width/5, bottom=up['Low'], color='g')# 绘制下跌柱形图(红色)ax.bar(down.index, down['Open'] - down['Close'], width, bottom=down['Close'], color='r')ax.bar(down.index, down['High'] - down['Open'], width/5, bottom=down['Open'], color='r')ax.bar(down.index, down['Close'] - down['Low'], width/5, bottom=down['Low'], color='r')# 设置日期格式ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))ax.xaxis.set_major_locator(mdates.MonthLocator())plt.title(title)plt.xlabel('日期')plt.ylabel('价格')plt.grid(True)plt.xticks(rotation=45)plt.tight_layout()return pltdef plot_candlestick_plotly(self, title="股票K线图"):"""使用Plotly绘制交互式K线图"""if self.data is None or not all(col in self.data.columns for col in ['Open', 'High', 'Low', 'Close']):print("数据不包含必要的价格列")return None# 创建K线图fig = go.Figure(data=[go.Candlestick(x=self.data.index,open=self.data['Open'],high=self.data['High'],low=self.data['Low'],close=self.data['Close'],name='K线')])# 添加5日和20日移动平均线if len(self.data) >= 20:fig.add_trace(go.Scatter(x=self.data.index,y=self.data['Close'].rolling(window=5).mean(),line=dict(color='blue', width=1),name='5日移动平均线'))fig.add_trace(go.Scatter(x=self.data.index,y=self.data['Close'].rolling(window=20).mean(),line=dict(color='orange', width=1),name='20日移动平均线'))# 更新布局fig.update_layout(title=title,xaxis_title='日期',yaxis_title='价格',xaxis_rangeslider_visible=False,template='plotly_white')return figdef plot_volume(self, title="成交量分析", figsize=(12, 6)):"""绘制成交量图"""if self.data is None or 'Volume' not in self.data.columns:print("数据不包含成交量")return Noneplt.figure(figsize=figsize)# 根据价格变化给成交量柱形图着色if 'Close' in self.data.columns:colors = ['g' if close_price > open_price else 'r' for close_price, open_price in zip(self.data['Close'], self.data['Close'].shift(1))]else:colors = 'b'plt.bar(self.data.index, self.data['Volume'], color=colors, alpha=0.8)# 添加移动平均线if len(self.data) >= 20:plt.plot(self.data.index, self.data['Volume'].rolling(window=20).mean(), color='orange', label='20日平均成交量')# 设置日期格式plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))plt.gca().xaxis.set_major_locator(mdates.MonthLocator())plt.title(title)plt.xlabel('日期')plt.ylabel('成交量')plt.legend()plt.grid(True)plt.xticks(rotation=45)plt.tight_layout()return pltdef plot_technical_indicators(self, indicators, title="技术指标分析", figsize=(12, 8)):"""绘制技术指标图"""if self.data is None:print("没有数据可绘制")return None# 检查指标是否存在for indicator in indicators:if indicator not in self.data.columns:print(f"指标{indicator}不存在")return None# 创建图形fig, ax = plt.subplots(figsize=figsize)# 绘制收盘价if 'Close' in self.data.columns:ax.plot(self.data.index, self.data['Close'], label='收盘价', color='black')# 绘制指标colors = ['blue', 'green', 'red', 'purple', 'orange', 'brown', 'pink', 'gray', 'olive', 'cyan']for i, indicator in enumerate(indicators):ax.plot(self.data.index, self.data[indicator], label=indicator, color=colors[i % len(colors)])# 设置日期格式ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))ax.xaxis.set_major_locator(mdates.MonthLocator())plt.title(title)plt.xlabel('日期')plt.ylabel('值')plt.legend()plt.grid(True)plt.xticks(rotation=45)plt.tight_layout()return plt

6.2 预测结果可视化

将模型预测结果进行可视化,直观展示预测效果:

class PredictionVisualizer:def __init__(self, actual_data=None, predicted_data=None):self.actual_data = actual_dataself.predicted_data = predicted_datadef load_data(self, actual_data, predicted_data):"""加载实际数据和预测数据"""self.actual_data = actual_dataself.predicted_data = predicted_datareturn selfdef plot_predictions(self, title="股票价格预测结果", figsize=(12, 6)):"""绘制预测结果与实际值对比图"""if self.actual_data is None or self.predicted_data is None:print("数据不完整")return Noneplt.figure(figsize=figsize)# 绘制实际值plt.plot(self.actual_data.index, self.actual_data, label='实际值', color='blue')# 绘制预测值if isinstance(self.predicted_data, pd.Series) and self.predicted_data.index.equals(self.actual_data.index):plt.plot(self.predicted_data.index, self.predicted_data, label='预测值', color='red', linestyle='--')else:plt.plot(self.actual_data.index, self.predicted_data, label='预测值', color='red', linestyle='--')# 设置日期格式plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))plt.gca().xaxis.set_major_locator(mdates.MonthLocator())plt.title(title)plt.xlabel('日期')plt.ylabel('价格')plt.legend()plt.grid(True)plt.xticks(rotation=45)plt.tight_layout()return pltdef plot_future_predictions(self, historical_data, future_predictions, prediction_dates=None, title="未来股票价格预测", figsize=(12, 6)):"""绘制历史数据和未来预测结果"""if historical_data is None or future_predictions is None:print("数据不完整")return Noneplt.figure(figsize=figsize)# 绘制历史数据plt.plot(historical_data.index, historical_data, label='历史数据', color='blue')# 生成预测日期(如果没有提供)if prediction_dates is None:last_date = historical_data.index[-1]if isinstance(last_date, pd.Timestamp):prediction_dates = [last_date + timedelta(days=i+1) for i in range(len(future_predictions))]else:prediction_dates = range(len(historical_data), len(historical_data) + len(future_predictions))# 绘制预测数据plt.plot(prediction_dates, future_predictions, label='未来预测', color='red', linestyle='--')# 添加分隔线plt.axvline(x=historical_data.index[-1], color='green', linestyle='-', label='当前日期')# 设置日期格式(如果是日期类型)if isinstance(historical_data.index[0], pd.Timestamp):plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))plt.gca().xaxis.set_major_locator(mdates.MonthLocator())plt.title(title)plt.xlabel('日期')plt.ylabel('价格')plt.legend()plt.grid(True)plt.xticks(rotation=45)plt.tight_layout()return pltdef plot_model_comparison(self, actual_data, predictions_dict, title="模型预测效果对比", figsize=(12, 6)):"""绘制多个模型的预测结果对比图"""if actual_data is None or not predictions_dict:print("数据不完整")return Noneplt.figure(figsize=figsize)# 绘制实际值plt.plot(actual_data.index, actual_data, label='实际值', color='black', linewidth=2)# 绘制各模型预测值colors = ['red', 'blue', 'green', 'purple', 'orange', 'brown', 'pink', 'gray']for i, (model_name, predictions) in enumerate(predictions_dict.items()):plt.plot(actual_data.index, predictions, label=f'{model_name}预测', color=colors[i % len(colors)], linestyle='--')# 设置日期格式plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))plt.gca().xaxis.set_major_locator(mdates.MonthLocator())plt.title(title)plt.xlabel('日期')plt.ylabel('价格')plt.legend()plt.grid(True)plt.xticks(rotation=45)plt.tight_layout()return pltdef plot_error_distribution(self, actual_data, predicted_data, title="预测误差分布", figsize=(12, 6)):"""绘制预测误差分布图"""if actual_data is None or predicted_data is None:print("数据不完整")return None# 计算误差errors = actual_data - predicted_dataplt.figure(figsize=figsize)# 绘制误差直方图plt.hist(errors, bins=30, alpha=0.7, color='blue')plt.title(title)plt.xlabel('预测误差')plt.ylabel('频次')plt.grid(True)plt.tight_layout()return plt

相关文章:

  • 鸿蒙语言基础
  • c#开发大冲锋游戏登录器
  • OpenCV 中的分水岭算法的原理及其应用---图像分割的利器
  • day31和day32图像处理OpenCV
  • 《数据结构之美--链表oj练习》
  • 【最后203篇系列】028 FastAPI的后台任务处理
  • 第R3周:RNN-心脏病预测
  • 【硬件系统架构】冯·诺依曼架构
  • flutter app实现分辨率自适应的图片资源加载
  • 实时直播弹幕系统设计
  • AI Agent系列(十) -Data Agent(数据分析智能体)开源资源汇总
  • LabVIEW技巧——获取文件版本信息
  • Flutter异常Couldn‘t find dynamic library in default locations
  • 深入解析 HTML5 Web IndexedDB 数据库:构建高效离线应用的基石
  • Ubuntu22.04安装QT、px4安装环境
  • Git 进阶之路:高效协作之分支管理
  • DeepSeek 操作 MySQL 数据库:使用 MCP 实现数据库查询
  • 【MySQL】Ubuntu下C++连接MySQL
  • AWS上构建基于自然语言的数值和符号计算系统
  • (51单片机)LCD显示数据存储(DS1302时钟模块教学)(LCD1602教程)(独立按键教程)(延时函数教程)(I2C总线认识)(AT24C02认识)
  • 高璞任中国一汽党委常委、副总经理
  • 六部门:进一步优化离境退税政策扩大入境消费
  • 第152次中老缅泰湄公河联合巡逻执法行动圆满结束
  • 龚正会见巴基斯坦卡拉奇市市长穆尔塔扎·瓦哈卜、巴西圣保罗市市长里卡多·努内斯
  • 韩国京畿道骊州市市长率团访问菏泽:想和菏泽一起办牡丹节
  • 谷歌一季度利润增超四成:云业务利润率上升,宏观环境可能影响广告业务