施磊老师基于muduo网络库的集群聊天服务器(四)
文章目录
- 实现登录业务
- 登录业务代码
- 补全数据库接口:查询,更新状态
- 注意学习一下里面用到的数据库api
- 测试与问题
- **问题1:**
- **问题2:**
- 用户连接信息与线程安全
- 聊天服务器是长连接服务器
- 如何找到用户B的连接?
- 在业务层存储用户的连接信息
- 多线程安全问题
- 加锁!
- 处理客户端异常退出
- 处理情况:-功能不完善
- 两个任务:
- 测试
- 点对点聊天业务(在线)
- 传什么?
- 业务处理逻辑
- 代码结构规划
- 调试与测试:
- 离线消息存储业务
- 设计数据库表:
- Model 类封装:
- 操作流程:
- 代码结构:-单台
- 问题
- 测试
- 服务器异常下线
- 功能不完整
- 解决核心
- 解决步骤
- 低耦合性
- 代码结构
- 测试
实现登录业务
登录业务代码
要考虑细一点: 登陆成功, 登录失败, 哪里失败了, 登录成功更新状态
chatservice.cpp
// 处理登录业务
void ChatService::login(const TcpConnectionPtr &conn, json &js, Timestamp time)
{// LOG_INFO<<"do login service"; //测试用//int id = js["id"];int id = js["id"].get<int>(); // js字符串 转成整型string pwd = js["password"];User user = _usermodel.query(id);if (user.getId() == id && user.getPwd() == pwd) // id默认为-1{// 登陆成功if (user.getState() == "online") // 用户在线, 不允许重复登录{json response;response["msgid"] = LOGIN_MSG_ACK;response["errno"] = 2;response["errmsg"] = "用户已经登录, 不允许重复登录!";conn->send(response.dump());}else{user.setState("online"); //登录成功, 更新状态_usermodel.updateState(user); // 刷新状态json response;response["msgid"] = LOGIN_MSG_ACK;response["errno"] = 0;response["id"] = user.getId();response["name"] = user.getName();conn->send(response.dump());}}else{// 登录失败if (user.getId() == -1) // 没找到{json response;response["msgid"] = LOGIN_MSG_ACK;response["errno"] = 1;response["errmsg"] = "用户名不存在!";conn->send(response.dump());}else if (user.getPwd() != pwd){json response;response["msgid"] = LOGIN_MSG_ACK;response["errno"] = 3;response["errmsg"] = "密码错误!";conn->send(response.dump());}}
}
补全数据库接口:查询,更新状态
usermodel.hpp
// 更新 用户状态信息bool updateState(User user);
usermodel.cpp
User UserModel::query(int id)
{//1. 组装 sql语句char sql[1024]={0};sprintf(sql, "select * from user where id=%d", id);MySQL mysql;if(mysql.connect()){MYSQL_RES *res = mysql.query(sql); // 数据库apiif(res!=nullptr){MYSQL_ROW row = mysql_fetch_row(res);if(row != nullptr){User user;user.setId(atoi(row[0]));user.setName(row[1]);user.setPwd(row[2]);user.setState(row[3]);mysql_free_result(res); // 释放一下资源, 否则内存不断泄露return user;}}}return false;}// 更新 用户状态信息
bool UserModel::updateState(User user)
{//1. 组装 sql语句char sql[1024]={0};sprintf(sql, "update user set state='%s' where id=%d",user.getState().c_str(), user.getId());MySQL mysql;if(mysql.connect()){if(mysql.update(sql)){return true;}}return false;
}
注意学习一下里面用到的数据库api
测试与问题
问题1:
reason: [json.exception.type_error.302] type must be number, but is null
检查一下, 本代码 使用 用户id 进行登录, 而不是用户名!
测试的话, 事先查一下 用户id
{"msgid":1,"id":22,"password":"101010"}
问题2:
.get<int>()
–js字符串转整型–>错误, 不支持 字符串转int
get<T>()
不是设计用来做隐式转换的,而是严格类型检查工具。- 它的意义在于 强制开发者明确处理类型问题,避免隐藏的运行时错误。
int id = js["id"].get<int>(); // js字符串 转成整型
{"msgid":1,"id":22,"password":"101010"}
//还没有设计下线 状态改变, 自己多试试那几种登录情况
js["id"]
转 int
的几种方式
js["id"].get<int>()
- ✅
"id":22
→22
- ❌
"id":"22"
→ 抛出异常(需try-catch
)
- ✅
int id = js["id"]
- ✅
"id":22
→22
(隐式转换) - ❌
"id":"22"
→ 编译错误
- ✅
js.value("id", 0)
- ✅
"id":22
→22
- ✅
"id":"22"
→ 返回默认值0
(不抛异常)
- ✅
用户连接信息与线程安全
聊天服务器是长连接服务器
长连接的特点
- 持久连接:客户端与服务器建立连接后保持长时间开放
- 双向通信:服务器可以主动向客户端推送消息
- 低延迟:省去了频繁建立/断开连接的开销
聊天服务器为何适合长连接
- 实时性要求:需要即时推送新消息给在线用户
- 频繁交互:聊天场景下用户会持续发送和接收消息
- 状态保持:可以维持用户在线状态、未读消息等上下文
如何找到用户B的连接?
服务器必须主动推:当用户A发给用户B时,服务器需要立刻找到用户B的连接,把消息推过去
**解决方案:**维护一个"用户ID ⇨ 连接"的映射表
在业务层存储用户的连接信息
include/server/chatservice.hpp
//存储在线用户的 连接信息unordered_map<int, TcpConnectionPtr> _userConnMap; // 只有登录成功了, 才会存储
src/server/chatservice.cpp
//登陆成功里 添加---具体看自己代码, 哪里是登陆成功
// 存储用户登录信息// _userConnMap.insert({id, conn});_userConnMap.emplace(id,conn);
多线程安全问题
onMessage()—> 在网络库中, 会被多线程调用, 业务层要 考虑 现成安全
在线用户信息 在 运行中, 是不断改变的, 一定要考虑 线程安全!!
加锁!
锁的力度 一定要小!!!
include/server/chatservice.hpp
//定义互斥锁, 保证_userConnMap安全mutex _connMutex;//存储在线用户的 连接信息unordered_map<int, TcpConnectionPtr> _userConnMap; // 只有登录成功了, 才会存储
src/server/chatservice.cpp
使用 作用域, 时加锁力度 尽可能小!!
// 存储用户登录信息// _userConnMap.insert({id, conn});{lock_guard<mutex> lock(_connMutex); // 自动解锁_userConnMap.emplace(id,conn);}
处理客户端异常退出
处理情况:-功能不完善
必须是客户端 断开连接, 服务端不能断开, 就算是 服务端断开连接, 客户端必须先退出, 否则 这个 修改状态 无法完成
客户端只要断开连接, 就需要修改用户的 状态
两个任务:
- 删除该用户的 在线用户连接信息
- 修改用户在数据库中的状态
include/server/chatservice.hpp
// 客户端异常退出处理void clientCloseException(const TcpConnectionPtr &conn);
src/server/chatservice.cpp
// 客户端异常退出处理void ChatService::clientCloseException(const TcpConnectionPtr &conn)
{User user;{lock_guard<mutex> lock(_connMutex);for (auto it = _userConnMap.begin(); it != _userConnMap.end(); ++it){if (it->second == conn){user.setId(it->first);_userConnMap.erase(it);break;}}}if(user.getId()!=-1){//更新用户状态信息user.setState("offline");_usermodel.updateState(user);}}
src/server/chatserver.cpp
void ChatServer::onConnect(const TcpConnectionPtr& conn)
{// 客户端断开连接 if(!conn->connected()){//客户端异常退出ChatService::instance()->clientCloseException(conn);conn->shutdown();}
}
测试
一定要退出 telnet
完成
点对点聊天业务(在线)
传什么?
msgid: //说明是什么业务
fromid: id
fromname:
to: id
msg: "......"
业务处理逻辑
客户端发送聊天消息 → 服务器收到并解析
首先判断接收者(to
)是否在线:
- **在线:**直接通过 connection 转发消息
- **离线:**将消息存入离线消息表(后续上线时再发送)
为保证线程安全,访问用户连接表时加锁(用 lock_guard
)
代码结构规划
include/public.hpp
ONE_CHAT_MSG //一对一聊天
include/server/chatservice.hpp
// 一对一 聊天业务void oneChat(const TcpConnectionPtr &conn, json &js, Timestamp time);
src/server/chatservice.cpp
// 一对一 聊天业务 并进行处理器绑定
void ChatService::oneChat(const TcpConnectionPtr &conn, json &js, Timestamp time)
{int toid = js["to"].get<int>();{lock_guard<mutex> lock(_connMutex);auto it = _userConnMap.find(toid);if(it!=_userConnMap.end()){//在线, 转发消息it->second->send(js.dump());return;}}//不在线, 存储离线消息}
调试与测试:
- 使用手动构造 JSON 数据模拟客户端行为进行测试
- 验证用户注册、登录、消息发送与接收是否正常
{"msgid":1,"id":22,"password":"101010"}
{"msgid":3,"name":"hzh1","password":"101010"}
{"msgid":1,"id":23,"password":"101010"}
{"msgid":5,"id":22,"from":"hzh","to":23,"msg":"i am hzh!hello!"}
{"msgid":5,"id":23,"from":"hzh1","to":22,"msg":"我不认识你!"}
离线消息存储业务
设计数据库表:
- 创建
offline_message
表,包含user_id
和message
两个字段。 - 这样设计是为了简单高效地存储每个用户的离线消息。
Model 类封装:
定义了
OfflineMessage
类来封装与离线消息表的交互。这样做的目的是将数据库操作与业务逻辑解耦,符合“分层设计”的原则。通过这一层的封装,业务代码与数据库操作分离,减少了耦合,提高了代码的可维护性。
- 创建
OfflineMessage
类,提供方法:insert
: 存储离线消息。remove
: 删除某个用户的所有离线消息(避免用户登录后再收到历史消息)。query
: 查询用户的所有离线消息,返回一个vector
容器。
操作流程:
- 插入消息:当用户不在线时,将消息存入
offline_message
表。- 离线消息的增加会在数据库表
offlinemessage
中插入一条新记录。即使userid
相同,每次插入的消息都会作为一条独立的记录存储在表中,因为没有任何逻辑限制userid
的重复。
- 离线消息的增加会在数据库表
- 查询消息:用户登录时,查询该用户的所有离线消息,并返回给用户。
- 由于会重复, 所以 要用
vector
, 每条都加进去
- 由于会重复, 所以 要用
- 删除消息:用户登录后,查询并返回离线消息后,删除该用户的所有消息,确保下次登录时不再重复。
代码结构:-单台
还不考虑 在另一台 上, 需要同步消息
include/server/offlinemessagemodel.hpp
#ifndef OFFLINEMESSAGEMODEL_H
#define OFFLINEMESSAGEMODEL_H#include <string>
#include <vector>
using namespace std;// 提供离线消息表的操作接口方法
class OfflineMessageModel
{
public:// 添加离线消息void insert(int userid, const string &msg);// 删除离线消息void remove(int userid);// 查询离线消息vector<string> query(int userid);
};#endif
src/server/offlinemessagemodel.cpp
#include "offlinemessagemodel.hpp"
#include "db.h"
#include <iostream>// 添加离线消息
void OfflineMessageModel::insert(int userid, const string &msg)
{// 1. 创建数据库连接// 2. 执行 SQL 语句插入离线消息// 3. 关闭数据库连接// 4. 返回插入结果char sql[1024] = {0};sprintf(sql, "insert into offlinemessage (userid, message) values (%d, '%s')", userid, msg.c_str());MySQL mysql;if (mysql.connect()){mysql.update(sql);}}// 删除离线消息
void OfflineMessageModel::remove(int userid)
{// 1. 创建数据库连接// 2. 执行 SQL 语句删除离线消息// 3. 关闭数据库连接// 4. 返回删除结果char sql[1024] = {0};sprintf(sql, "delete from offlinemessage where userid = %d", userid);MySQL mysql;if(mysql.connect()){mysql.update(sql);}}// 查询离线消息
vector<string> OfflineMessageModel::query(int userid)
{// 1. 创建数据库连接// 2. 执行 SQL 语句查询离线消息// 3. 关闭数据库连接// 4. 返回查询结果char sql[1024] = {0};sprintf(sql, "select message from offlinemessage where userid = %d", userid);vector<string> vec;MySQL mysql;if(mysql.connect()){MYSQL_RES *res = mysql.query(sql); // 数据库apiif(res!=nullptr){MYSQL_ROW row;while((row = mysql_fetch_row(res)) != nullptr){vec.push_back(row[0]); // 将查询到的消息添加到结果向量中}mysql_free_result(res); // 释放一下资源, 否则内存不断泄露}return vec;}return vec;
}
include/server/chatservice.hpp
#include <offlinemessagemodel.hpp>// 离线消息操作对象OfflineMessageModel _offlineMsg;
src/server/chatservice.cpp
//补充 点对点聊天的 离线存储业务
//不在线, 存储离线消息_offlineMsg.insert(toid, js.dump());
//补充 登录业务, 登陆成功后, 查询是否有离线消息// 查询离线消息
vector<string> vec = _offlineMsg.query(id);
if (!vec.empty())
{response["offlinemsg"] = vec; // 离线消息 js可以直接序列化容器// 离线消息发送完毕, 删除离线消息_offlineMsg.remove(id);
}
问题
新增了文件, 容易出现 链接错误, 因为找不到
需要把 build里面的全部删了, 重新 cmake
测试
{"msgid":1,"id":22,"password":"101010"}
{"msgid":5,"id":22,"from":"hzh","to":23,"msg":"i am hzh!hello!"}
至此, 数据库 离线消息 表已经存储了, 可以自行查看
{"msgid":1,"id":23,"password":"101010"}
至此, 数据库 离线消息 表已经 删除了
服务器异常下线
功能不完整
之前,客户端异常下线主要导致了用户状态的变化。然而,目前更常遇到的是服务器端异常,例如无法识别字符或格式错误,这些问题通常会导致服务器异常下线。
这种异常 导致 用户 状态 没改变, 所以 需要 进行完善!!
目前 仅处理 服务端 ctrl+c 的异常, 暂不处理 客户端发过来消息 不对导致的 服务器退出问题
解决核心
使用信号捕捉函数: signal------这个实际一般不推荐
(一般使用sigaction()函数)-----老师没用这个
解决步骤
捕获终止信号:使用 signal.h
来捕获 Ctrl+C 终止信号(SIGINT),并在信号处理函数中进行用户状态的重置。
编写重置函数:
- 在
chat service
中添加reset
方法,用来重置用户的状态。 reset
方法调用user model
中的update
方法,将所有在线(online)状态的用户状态更新为离线(offline)。
确保正确更新数据库:通过 SQL 语句 UPDATE user SET state = 'offline' WHERE state = 'online'
来批量更新数据库中用户的状态。
测试过程:
- 在正常登录和退出后,测试通过 Ctrl+C 强制结束服务器,确保所有用户的状态都被成功更新为离线(offline)。
- 在服务器重启后,验证用户状态是否恢复正确。
低耦合性
main.cpp
仅负责信号捕获,ChatService
处理业务逻辑,UserModel
负责数据库操作
代码结构
src/server/main.cpp
使用信号捕捉
#include <signal.h>
#include "chatservice.hpp"void resetHandler(int)
{ChatService::instance()->reset();exit(0);
}//main
signal(SIGINT, resetHandler);
include/server/chatservice.hpp
// 服务器异常退出处理, 重置用户状态函数void reset();
src/server/chatservice.cpp
// 服务器异常退出处理, 重置用户状态函数
void ChatService::reset()
{// 更新所有用户的状态----把在线用户都设置为离线_usermodel.resetState();
}
include/server/usermodel.hpp
// 重置用户状态void resetState();
src/server/usermodel.cpp
// 重置用户状态
void UserModel::resetState()
{//1. 组装 sql语句char sql[1024]={0};sprintf(sql, "update user set state='offline' where state='online'");MySQL mysql;if(mysql.connect()){mysql.update(sql);}
}
测试
{"msgid":1,"id":22,"password":"101010"}
服务器: ctrl+c