FastAPI WebSocket 聊天应用详细教程
项目简介
这是一个基于 FastAPI 和 WebSocket 实现的实时聊天应用,支持一对一聊天、离线消息存储等功能。
技术栈
- 后端:FastAPI (Python)
- 前端:HTML、JavaScript、CSS
- 通信:WebSocket
- 认证:简单的 token 认证
项目结构
├── main.py # 后端主程序
└── templates/ # 前端模板目录└── chat.html # 聊天页面
详细代码实现
1. 后端实现 (main.py)
from fastapi import FastAPI, WebSocket, Depends, WebSocketDisconnect
from typing import Dict
import json
from fastapi.responses import HTMLResponse, FileResponse
from fastapi.staticfiles import StaticFilesapp = FastAPI()
app.mount("/templates", StaticFiles(directory="templates"), name="templates")class ConnectionManager:def __init__(self):self.active_connections: Dict[str, WebSocket] = {}self.offline_messages: Dict[str, list] = {} # 离线消息存储async def connect(self, user: str, websocket: WebSocket):await websocket.accept()self.active_connections[user] = websocketasync def disconnect(self, user: str, websocket: WebSocket):if user in self.active_connections:del self.active_connections[user]async def send_personal_message(self, message: str, to_user: str):if to_user in self.active_connections:await self.active_connections[to_user].send_text(message)return {"success": True}else:# 存储离线消息if to_user not in self.offline_messages:self.offline_messages[to_user] = []self.offline_messages[to_user].append(message)return {"success": False, "reason": "offline", "stored": True}async def get_offline_messages(self, user: str):if user in self.offline_messages:messages = self.offline_messages[user]del self.offline_messages[user] # 取出后删除return messagesreturn []# Token 认证
async def get_cookie_or_token(token: str = None):if not token:from fastapi import HTTPExceptionraise HTTPException(status_code=401, detail="未授权")return token# 初始化连接管理器
ws_manager = ConnectionManager()@app.websocket("/ws/{user}")
async def websocket_one_to_one(websocket: WebSocket,user: str,cookie_or_token: str = Depends(get_cookie_or_token)
):await ws_manager.connect(user, websocket)# 发送离线消息offline_msgs = await ws_manager.get_offline_messages(user)for msg in offline_msgs:await websocket.send_text(msg)try:while True:data = await websocket.receive_text()message_data = json.loads(data)# 防止自己给自己发消息if message_data["to_user"] == user:error_msg = {"type": "error","content": "不能发送消息给自己"}await websocket.send_text(json.dumps(error_msg))continueresponse_message = {"from": user,"content": message_data["content"],"timestamp": message_data.get("timestamp"),"type": "message"}# 发送消息result = await ws_manager.send_personal_message(message=json.dumps(response_message),to_user=message_data["to_user"])# 处理离线消息if not result["success"]:if result.get("stored"):success_msg = {"type": "info","content": f"消息已保存,将在用户 {message_data['to_user']} 上线时送达"}await websocket.send_text(json.dumps(success_msg))except WebSocketDisconnect:await ws_manager.disconnect(user, websocket)except Exception as e:print(f"WebSocket 错误: {e}")await ws_manager.disconnect(user, websocket)@app.get("/", response_class=FileResponse)
async def root():return "templates/chat.html"
2. 前端实现 (templates/chat.html)
<!DOCTYPE html>
<html>
<head><title>WebSocket Chat</title><style>body {font-family: Arial, sans-serif;max-width: 800px;margin: 0 auto;padding: 20px;}#loginSection {text-align: center;margin-top: 100px;}#chatSection {display: none;}#messages {list-style-type: none;padding: 0;height: 400px;overflow-y: auto;border: 1px solid #ccc;margin-bottom: 20px;padding: 10px;}.message {margin: 10px 0;padding: 10px;border-radius: 5px;background-color: #f0f0f0;}.input-group {margin-bottom: 10px;}input[type="text"] {padding: 8px;margin-right: 10px;width: 200px;}button {padding: 8px 15px;background-color: #4CAF50;color: white;border: none;border-radius: 4px;cursor: pointer;}button:hover {background-color: #45a049;}#logoutBtn {background-color: #f44336;}#logoutBtn:hover {background-color: #da190b;}.error {color: red;margin: 10px 0;}.info {color: blue;margin: 10px 0;}</style>
</head>
<body><!-- 登录部分 --><div id="loginSection"><h1>WebSocket 聊天</h1><div class="input-group"><input type="text" id="loginUsername" placeholder="输入用户名" autocomplete="off"/><button onclick="login()">登录</button></div><div id="loginError" class="error"></div></div><!-- 聊天部分 --><div id="chatSection"><h1>WebSocket 聊天</h1><div id="currentUser"></div><form action="" onsubmit="sendMessage(event)"><div class="input-group"><input type="text" id="messageText" placeholder="输入消息" autocomplete="off"/><input type="text" id="username" placeholder="接收者用户名" autocomplete="off"/><button type="submit">发送</button></div></form><button id="logoutBtn" onclick="logout()">退出</button><ul id='messages'></ul></div><script>let ws = null;function checkLogin() {const token = localStorage.getItem('token');if (token) {document.getElementById('loginSection').style.display = 'none';document.getElementById('chatSection').style.display = 'block';document.getElementById('currentUser').textContent = `当前用户: ${token}`;connectWebSocket();} else {document.getElementById('loginSection').style.display = 'block';document.getElementById('chatSection').style.display = 'none';}}function login() {const username = document.getElementById('loginUsername').value.trim();if (!username) {document.getElementById('loginError').textContent = '请输入用户名';return;}localStorage.setItem('token', username);checkLogin();}function logout() {if (ws && ws.readyState === WebSocket.OPEN) {ws.close();}localStorage.removeItem('token');document.getElementById('messages').innerHTML = '';document.getElementById('messageText').value = '';document.getElementById('username').value = '';document.getElementById('loginSection').style.display = 'block';document.getElementById('chatSection').style.display = 'none';document.getElementById('loginUsername').value = '';document.getElementById('currentUser').textContent = '';}function connectWebSocket() {const token = localStorage.getItem('token');ws = new WebSocket(`ws://localhost:8000/ws/${token}?token=${token}`);ws.onmessage = function(event) {const messages = document.getElementById('messages');const message = document.createElement('li');message.className = 'message';const data = JSON.parse(event.data);if (data.type === 'error') {message.style.color = 'red';message.textContent = data.content;} else if (data.type === 'info') {message.style.color = 'blue';message.textContent = data.content;} else {message.textContent = `${data.from}: ${data.content}`;}messages.appendChild(message);messages.scrollTop = messages.scrollHeight;};ws.onerror = function(error) {console.error('WebSocket 错误:', error);};ws.onclose = function() {console.log('WebSocket 连接已关闭');};}function sendMessage(event) {event.preventDefault();const messageInput = document.getElementById("messageText");const usernameInput = document.getElementById("username");if (!messageInput.value.trim() || !usernameInput.value.trim()) {return;}const messageData = {content: messageInput.value,to_user: usernameInput.value,timestamp: new Date().toISOString()};ws.send(JSON.stringify(messageData));const messages = document.getElementById('messages');const message = document.createElement('li');message.className = 'message';message.style.backgroundColor = '#e3f2fd';message.textContent = `我: ${messageInput.value}`;messages.appendChild(message);messages.scrollTop = messages.scrollHeight;messageInput.value = '';}checkLogin();</script>
</body>
</html>
功能特点
-
用户管理
- 简单的用户名登录系统
- 用户在线状态管理
- 用户会话保持
-
消息功能
- 实时一对一聊天
- 离线消息存储
- 上线自动接收离线消息
- 防止自己给自己发消息
-
界面特性
- 响应式设计
- 消息实时显示
- 错误信息提示
- 消息状态反馈
-
技术特性
- WebSocket 实时通信
- 状态管理
- 错误处理
- 会话管理
如何运行
- 安装依赖
pip install fastapi uvicorn
- 启动服务器
uvicorn main:app --reload
- 访问应用
- 打开浏览器访问
http://localhost:8000
- 输入用户名登录
- 开始聊天
使用流程
-
登录
- 输入用户名
- 点击登录按钮
-
发送消息
- 输入接收者用户名
- 输入消息内容
- 点击发送
-
接收消息
- 实时接收其他用户发送的消息
- 上线时接收离线消息
-
退出
- 点击退出按钮
- 清除登录状态