当前位置: 首页 > news >正文

施磊老师基于muduo网络库的集群聊天服务器(四)

文章目录

  • 实现登录业务
    • 登录业务代码
    • 补全数据库接口:查询,更新状态
    • 注意学习一下里面用到的数据库api
    • 测试与问题
      • **问题1:**
      • **问题2:**
  • 用户连接信息与线程安全
    • 聊天服务器是长连接服务器
    • 如何找到用户B的连接?
    • 在业务层存储用户的连接信息
    • 多线程安全问题
    • 加锁!
  • 处理客户端异常退出
    • 处理情况:-功能不完善
    • 两个任务:
    • 测试
  • 点对点聊天业务(在线)
    • 传什么?
    • 业务处理逻辑
    • 代码结构规划
    • 调试与测试:
  • 离线消息存储业务
    • 设计数据库表:
    • Model 类封装:
    • 操作流程:
    • 代码结构:-单台
    • 问题
    • 测试
  • 服务器异常下线
    • 功能不完整
    • 解决核心
    • 解决步骤
    • 低耦合性
    • 代码结构
    • 测试

实现登录业务

登录业务代码

要考虑细一点: 登陆成功, 登录失败, 哪里失败了, 登录成功更新状态

chatservice.cpp

// 处理登录业务
void ChatService::login(const TcpConnectionPtr &conn, json &js, Timestamp time)
{// LOG_INFO<<"do login service"; //测试用//int id = js["id"];int id = js["id"].get<int>();  // js字符串 转成整型string pwd = js["password"];User user = _usermodel.query(id);if (user.getId() == id && user.getPwd() == pwd) // id默认为-1{// 登陆成功if (user.getState() == "online") // 用户在线, 不允许重复登录{json response;response["msgid"] = LOGIN_MSG_ACK;response["errno"] = 2;response["errmsg"] = "用户已经登录, 不允许重复登录!";conn->send(response.dump());}else{user.setState("online");   //登录成功, 更新状态_usermodel.updateState(user);  // 刷新状态json response;response["msgid"] = LOGIN_MSG_ACK;response["errno"] = 0;response["id"] = user.getId();response["name"] = user.getName();conn->send(response.dump());}}else{// 登录失败if (user.getId() == -1) // 没找到{json response;response["msgid"] = LOGIN_MSG_ACK;response["errno"] = 1;response["errmsg"] = "用户名不存在!";conn->send(response.dump());}else if (user.getPwd() != pwd){json response;response["msgid"] = LOGIN_MSG_ACK;response["errno"] = 3;response["errmsg"] = "密码错误!";conn->send(response.dump());}}
}

补全数据库接口:查询,更新状态

usermodel.hpp

// 更新 用户状态信息bool updateState(User user);

usermodel.cpp

User UserModel::query(int id)
{//1. 组装 sql语句char sql[1024]={0};sprintf(sql, "select * from user where id=%d", id);MySQL mysql;if(mysql.connect()){MYSQL_RES *res = mysql.query(sql);   // 数据库apiif(res!=nullptr){MYSQL_ROW row = mysql_fetch_row(res);if(row != nullptr){User user;user.setId(atoi(row[0]));user.setName(row[1]);user.setPwd(row[2]);user.setState(row[3]);mysql_free_result(res); // 释放一下资源, 否则内存不断泄露return user;}}}return false;}// 更新 用户状态信息
bool UserModel::updateState(User user)
{//1. 组装 sql语句char sql[1024]={0};sprintf(sql, "update user set state='%s' where id=%d",user.getState().c_str(), user.getId());MySQL mysql;if(mysql.connect()){if(mysql.update(sql)){return true;}}return false;
}

注意学习一下里面用到的数据库api

测试与问题

问题1:

reason: [json.exception.type_error.302] type must be number, but is null

检查一下, 本代码 使用 用户id 进行登录, 而不是用户名!

测试的话, 事先查一下 用户id

{"msgid":1,"id":22,"password":"101010"}

问题2:

.get<int>()–js字符串转整型–>错误, 不支持 字符串转int

  • get<T>() 不是设计用来做隐式转换的,而是严格类型检查工具
  • 它的意义在于 强制开发者明确处理类型问题,避免隐藏的运行时错误。
int id = js["id"].get<int>();  // js字符串 转成整型
{"msgid":1,"id":22,"password":"101010"}
//还没有设计下线 状态改变,  自己多试试那几种登录情况

js["id"]int 的几种方式

  1. js["id"].get<int>()
    • "id":2222
    • "id":"22"抛出异常(需 try-catch
  2. int id = js["id"]
    • "id":2222(隐式转换)
    • "id":"22"编译错误
  3. js.value("id", 0)
    • "id":2222
    • "id":"22"返回默认值 0(不抛异常)

用户连接信息与线程安全

聊天服务器是长连接服务器

长连接的特点

  1. 持久连接:客户端与服务器建立连接后保持长时间开放
  2. 双向通信:服务器可以主动向客户端推送消息
  3. 低延迟:省去了频繁建立/断开连接的开销

聊天服务器为何适合长连接

  1. 实时性要求:需要即时推送新消息给在线用户
  2. 频繁交互:聊天场景下用户会持续发送和接收消息
  3. 状态保持:可以维持用户在线状态、未读消息等上下文

如何找到用户B的连接?

服务器必须主动推:当用户A发给用户B时,服务器需要立刻找到用户B的连接,把消息推过去

**解决方案:**维护一个"用户ID ⇨ 连接"的映射表

在业务层存储用户的连接信息

include/server/chatservice.hpp

//存储在线用户的 连接信息unordered_map<int, TcpConnectionPtr> _userConnMap; // 只有登录成功了, 才会存储

src/server/chatservice.cpp

//登陆成功里 添加---具体看自己代码, 哪里是登陆成功
// 存储用户登录信息// _userConnMap.insert({id, conn});_userConnMap.emplace(id,conn);

多线程安全问题

onMessage()—> 在网络库中, 会被多线程调用, 业务层要 考虑 现成安全

在线用户信息 在 运行中, 是不断改变的, 一定要考虑 线程安全!!

加锁!

锁的力度 一定要小!!!

include/server/chatservice.hpp

//定义互斥锁, 保证_userConnMap安全mutex _connMutex;//存储在线用户的 连接信息unordered_map<int, TcpConnectionPtr> _userConnMap; // 只有登录成功了, 才会存储

src/server/chatservice.cpp

使用 作用域, 时加锁力度 尽可能小!!

// 存储用户登录信息// _userConnMap.insert({id, conn});{lock_guard<mutex> lock(_connMutex); // 自动解锁_userConnMap.emplace(id,conn);}

处理客户端异常退出

处理情况:-功能不完善

必须是客户端 断开连接, 服务端不能断开, 就算是 服务端断开连接, 客户端必须先退出, 否则 这个 修改状态 无法完成

客户端只要断开连接, 就需要修改用户的 状态

两个任务:

  1. 删除该用户的 在线用户连接信息
  2. 修改用户在数据库中的状态

include/server/chatservice.hpp

// 客户端异常退出处理void clientCloseException(const TcpConnectionPtr &conn);

src/server/chatservice.cpp

// 客户端异常退出处理void ChatService::clientCloseException(const TcpConnectionPtr &conn)
{User user;{lock_guard<mutex> lock(_connMutex);for (auto it = _userConnMap.begin(); it != _userConnMap.end(); ++it){if (it->second == conn){user.setId(it->first);_userConnMap.erase(it);break;}}}if(user.getId()!=-1){//更新用户状态信息user.setState("offline");_usermodel.updateState(user);}}

src/server/chatserver.cpp

void ChatServer::onConnect(const TcpConnectionPtr& conn)
{// 客户端断开连接 if(!conn->connected()){//客户端异常退出ChatService::instance()->clientCloseException(conn);conn->shutdown();}
}

测试

一定要退出 telnet

完成

点对点聊天业务(在线)

传什么?

msgid: //说明是什么业务
fromid: id
fromname:
to: id
msg: "......"

业务处理逻辑

客户端发送聊天消息 → 服务器收到并解析

首先判断接收者(to)是否在线:

  • **在线:**直接通过 connection 转发消息
  • **离线:**将消息存入离线消息表(后续上线时再发送)

为保证线程安全,访问用户连接表时加锁(用 lock_guard

代码结构规划

include/public.hpp

ONE_CHAT_MSG //一对一聊天

include/server/chatservice.hpp

// 一对一 聊天业务void oneChat(const TcpConnectionPtr &conn, json &js, Timestamp time);

src/server/chatservice.cpp

// 一对一 聊天业务  并进行处理器绑定
void ChatService::oneChat(const TcpConnectionPtr &conn, json &js, Timestamp time)
{int toid = js["to"].get<int>();{lock_guard<mutex> lock(_connMutex);auto it = _userConnMap.find(toid);if(it!=_userConnMap.end()){//在线, 转发消息it->second->send(js.dump());return;}}//不在线, 存储离线消息}

调试与测试:

  • 使用手动构造 JSON 数据模拟客户端行为进行测试
  • 验证用户注册、登录、消息发送与接收是否正常
{"msgid":1,"id":22,"password":"101010"}
{"msgid":3,"name":"hzh1","password":"101010"}
{"msgid":1,"id":23,"password":"101010"}
{"msgid":5,"id":22,"from":"hzh","to":23,"msg":"i am hzh!hello!"}
{"msgid":5,"id":23,"from":"hzh1","to":22,"msg":"我不认识你!"}

离线消息存储业务

设计数据库表:

  • 创建 offline_message 表,包含 user_idmessage 两个字段。
  • 这样设计是为了简单高效地存储每个用户的离线消息。

Model 类封装:

定义了 OfflineMessage 类来封装与离线消息表的交互。这样做的目的是将数据库操作与业务逻辑解耦,符合“分层设计”的原则。通过这一层的封装,业务代码与数据库操作分离,减少了耦合,提高了代码的可维护性。

  • 创建 OfflineMessage 类,提供方法:
    • insert: 存储离线消息。
    • remove: 删除某个用户的所有离线消息(避免用户登录后再收到历史消息)。
    • query: 查询用户的所有离线消息,返回一个 vector 容器。

操作流程:

  • 插入消息:当用户不在线时,将消息存入 offline_message 表。
    • 离线消息的增加会在数据库表 offlinemessage 中插入一条新记录。即使 userid相同,每次插入的消息都会作为一条独立的记录存储在表中,因为没有任何逻辑限制userid的重复。
  • 查询消息:用户登录时,查询该用户的所有离线消息,并返回给用户。
    • 由于会重复, 所以 要用vector, 每条都加进去
  • 删除消息:用户登录后,查询并返回离线消息后,删除该用户的所有消息,确保下次登录时不再重复。

代码结构:-单台

还不考虑 在另一台 上, 需要同步消息

include/server/offlinemessagemodel.hpp

#ifndef OFFLINEMESSAGEMODEL_H
#define OFFLINEMESSAGEMODEL_H#include <string>
#include <vector>
using namespace std;// 提供离线消息表的操作接口方法
class OfflineMessageModel
{
public:// 添加离线消息void insert(int userid, const string &msg);// 删除离线消息void remove(int userid);// 查询离线消息vector<string> query(int userid);
};#endif

src/server/offlinemessagemodel.cpp

#include "offlinemessagemodel.hpp"
#include "db.h"
#include <iostream>// 添加离线消息
void OfflineMessageModel::insert(int userid, const string &msg)
{// 1. 创建数据库连接// 2. 执行 SQL 语句插入离线消息// 3. 关闭数据库连接// 4. 返回插入结果char sql[1024] = {0};sprintf(sql, "insert into offlinemessage (userid, message) values (%d, '%s')", userid, msg.c_str());MySQL mysql;if (mysql.connect()){mysql.update(sql);}}// 删除离线消息
void OfflineMessageModel::remove(int userid)
{// 1. 创建数据库连接// 2. 执行 SQL 语句删除离线消息// 3. 关闭数据库连接// 4. 返回删除结果char sql[1024] = {0};sprintf(sql, "delete from offlinemessage where userid = %d", userid);MySQL mysql;if(mysql.connect()){mysql.update(sql);}}// 查询离线消息
vector<string> OfflineMessageModel::query(int userid)
{// 1. 创建数据库连接// 2. 执行 SQL 语句查询离线消息// 3. 关闭数据库连接// 4. 返回查询结果char sql[1024] = {0};sprintf(sql, "select message from offlinemessage where userid = %d", userid);vector<string> vec;MySQL mysql;if(mysql.connect()){MYSQL_RES *res = mysql.query(sql);   // 数据库apiif(res!=nullptr){MYSQL_ROW row;while((row = mysql_fetch_row(res)) != nullptr){vec.push_back(row[0]); // 将查询到的消息添加到结果向量中}mysql_free_result(res); // 释放一下资源, 否则内存不断泄露}return vec;}return vec;
}

include/server/chatservice.hpp

#include <offlinemessagemodel.hpp>// 离线消息操作对象OfflineMessageModel _offlineMsg;

src/server/chatservice.cpp

//补充 点对点聊天的 离线存储业务    
//不在线, 存储离线消息_offlineMsg.insert(toid, js.dump());
//补充 登录业务, 登陆成功后, 查询是否有离线消息// 查询离线消息
vector<string> vec = _offlineMsg.query(id);
if (!vec.empty())
{response["offlinemsg"] = vec; // 离线消息  js可以直接序列化容器// 离线消息发送完毕, 删除离线消息_offlineMsg.remove(id);       
}

问题

新增了文件, 容易出现 链接错误, 因为找不到

需要把 build里面的全部删了, 重新 cmake

测试

{"msgid":1,"id":22,"password":"101010"}
{"msgid":5,"id":22,"from":"hzh","to":23,"msg":"i am hzh!hello!"}

至此, 数据库 离线消息 表已经存储了, 可以自行查看

{"msgid":1,"id":23,"password":"101010"}

至此, 数据库 离线消息 表已经 删除了

服务器异常下线

功能不完整

之前,客户端异常下线主要导致了用户状态的变化。然而,目前更常遇到的是服务器端异常,例如无法识别字符或格式错误,这些问题通常会导致服务器异常下线。

这种异常 导致 用户 状态 没改变, 所以 需要 进行完善!!

目前 仅处理 服务端 ctrl+c 的异常, 暂不处理 客户端发过来消息 不对导致的 服务器退出问题

解决核心

使用信号捕捉函数: signal------这个实际一般不推荐

(一般使用sigaction()函数)-----老师没用这个

解决步骤

捕获终止信号:使用 signal.h 来捕获 Ctrl+C 终止信号(SIGINT),并在信号处理函数中进行用户状态的重置。

编写重置函数

  • chat service 中添加 reset 方法,用来重置用户的状态。
  • reset 方法调用 user model 中的 update 方法,将所有在线(online)状态的用户状态更新为离线(offline)。

确保正确更新数据库:通过 SQL 语句 UPDATE user SET state = 'offline' WHERE state = 'online' 来批量更新数据库中用户的状态。

测试过程

  • 在正常登录和退出后,测试通过 Ctrl+C 强制结束服务器,确保所有用户的状态都被成功更新为离线(offline)。
  • 在服务器重启后,验证用户状态是否恢复正确。

低耦合性

main.cpp 仅负责信号捕获,ChatService 处理业务逻辑,UserModel 负责数据库操作

代码结构

src/server/main.cpp

使用信号捕捉

#include <signal.h>
#include "chatservice.hpp"void resetHandler(int)
{ChatService::instance()->reset();exit(0);
}//main
signal(SIGINT, resetHandler);

include/server/chatservice.hpp

    // 服务器异常退出处理, 重置用户状态函数void reset();

src/server/chatservice.cpp

// 服务器异常退出处理, 重置用户状态函数
void ChatService::reset()
{// 更新所有用户的状态----把在线用户都设置为离线_usermodel.resetState();
}

include/server/usermodel.hpp

// 重置用户状态void resetState();

src/server/usermodel.cpp

// 重置用户状态
void UserModel::resetState()
{//1. 组装 sql语句char sql[1024]={0};sprintf(sql, "update user set state='offline' where state='online'");MySQL mysql;if(mysql.connect()){mysql.update(sql);}
}

测试

{"msgid":1,"id":22,"password":"101010"}
服务器:  ctrl+c

相关文章:

  • Unitest和pytest使用方法
  • Web网页核心技术解析:从结构到节点操作
  • 如何将当前文件夹及其子文件夹下的所有word提取到一个excel里
  • 常用第三方库:dio网络库使用与封装
  • 超级扩音器手机版:随时随地,大声说话
  • 双指针-11.盛水最多的容器-力扣(LeetCode)
  • UE5 鼠标点击一个物体触发Onclick事件
  • 如何创建Vue3工程
  • 【解决 el-table 树形数据更新后视图不刷新的问题】
  • 【数据结构 · 初阶】- 堆的实现
  • 乐迪电玩发卡查分与控制面板模块逻辑解析
  • 中电金信联合阿里云推出智能陪练Agent
  • 华为S系列交换机CPU占用率高问题排查与解决方案
  • 3、有Bluetooth,LCD,USB,SD卡,PSRAM,FLASH、TP等软硬件驱动开发经验优先考虑
  • PyTorch 分布式 DistributedDataParallel (DDP)
  • Langgraph实战-Agent-ReAct(Reason+Act)概述
  • 扩散模型(Diffusion Models)
  • 客户对质量不满意,如何快速响应?
  • 基于Transformer与随机森林的多变量时间序列预测
  • 商会携手会员单位博阳机械举办DeepSeek大模型技术及应用分享会
  • 美国务卿宣布将对美国务院进行全面重组
  • 夜读丨秦腔里的乡魂
  • 国际金价冲上3500美元,本月已涨超12%!分析人士提醒:警惕短期多头获利了结
  • 新片|真人版《星际宝贝史迪奇》5月23日与北美同步上映
  • 安徽一季度GDP为12265亿元,同比增长6.2%
  • 上海又现昆虫新物种:体长仅1.5毫米,却是凶猛的捕食者