当前位置: 首页 > news >正文

【项目(一)】-仿mudou库one thread oneloop式并发服务器实现

1、模型框架

客户端处理思想:事件驱动模式

事件驱动处理模式:谁触发了我就去处理谁。

( 如何知道触发了)技术支撑点:I/O的多路复用 (多路转接技术)

1、单Reactor单线程在单个线程中进行事件驱动并处理
对所有客户端进行IO事件监控、哪个客户端触发了事件,就去处理谁
处理:接收它的请求,进行业务处理,进行响应。
优点:单线程操作,操作都是串行化的,思想简单,(不需要考虑进程或者线程间的通信问题,以及安全问题)
缺点:所有的事件监控和业务处理都是在一个线程中完成的,因此很容易造成性能瓶颈
适用场景:客户端数量较少,且业务处理快速简单的场景
2、 单Reactor多线程:一个Reactor线程 + 业务线程池
对所有客户端进行IO事件监控、哪个客户端触发了事件,就去处理谁
(Reactor线程)处理:仅仅进行IO操作
然后将事件进行派发给业务线程
优点:充分利用cpu多核资源,处理效率可以更高,降低了代码的耦合度(IO操作和业务处理进行分离)
缺点:在单个的Reactor线程中,包含了对所有客户端的事件监控,以及所有客户端的IO操作,不利于高并发场景(即每一个时刻都有很多客户端连接请求,我还在处理上一个client的IO操作的话就来不及进行新的client的连接处理)
3、多Reactor多线程:基于单Reator多线程的缺点考虑,如果IO的时候,有连接到来无法处理,因此将连接单独拎出来。
因此让一个Reactor线程仅仅进行新连接处理,让其他的Reactor线程进行IO处理,IO Reactor线程拿到数据分发给业务线程池进行处理。因此,多Reactor多线程模式,也叫主从Reactor模型
主Reactor线程:进行新连接事件监控
从属Reactor线程:进行IO事件监控
业务线程池:进行业务处理
优点:充分利用CPU多核资源,并且可以进行合理分配
但是:执行流并不是越多越好,因为执行流多了,反而会增加cpu切换调度成本。(所以在有些多Reactor多线程模式中从属Reactor线程也会充当业务处理函数。

2、 功能模块划分:

SERVER模块:实现Reactor模型的TCP服务器;
协议模块:对当前的Reactor模型服务器提供应⽤层协议支持
2.1、server模块:  对所有的连接以及线程进⾏管理
⽽具体的管理也分为三个⽅⾯:
监听连接管理:对监听连接进⾏管理。
有监听套接字来获取新连接。所以,首先要有一个监听套接字来获取新连接。
通信连接管理:对通信连接进⾏管理。
获取新连接之后就有了通信套接字。 然后对不同的事件进行处理
超时连接管理:对超时连接进⾏管理
对于超时的连接进行释放来归还资源。
连接模块:
Buffer模块:Buffer模块是⼀个缓冲区模块,⽤于实现通信中⽤⼾态的接收缓冲区和发送缓冲区功能
Socket模块: Socket模块是对套接字操作封装的⼀个模块,主要实现的socket的各项操作
Channel模块: Channel模块是对⼀个描述符需要进⾏的IO事件管理的模块,实现对描述符可读,可写,错误...事件的 管理操作,以及Poller模块对描述符进⾏IO事件监控就绪后,根据不同的事件,回调不同的处理函数功能
Connection模块:
Connection模块是对Buffer模块,Socket模块,Channel模块的⼀个整体封装,实现了对⼀个通信套 接字的整体的管理,每⼀个进⾏数据通信的套接字(也就是accept获取到的新连接)都会使⽤Connection进⾏管理。
Acceptor模块:
Acceptor模块是对Socket模块,Channel模块的⼀个整体封装,实现了对⼀个监听套接字的整体的管理。
TimerQueue模块:
TimerQueue模块是实现固定时间定时任务的模块,可以理解就是要给定时任务管理器,向定时任务管理器中添加⼀个任务,任务将在固定时间后被执⾏,同时也可以通过刷新定时任务来延迟任务的执⾏。
连接监控模块:
Poller模块:
Poller模块是对epoll进⾏封装的⼀个模块,主要实现epoll的IO事件添加,修改,移除,获取活跃连接功能。


bind函数

bind作⽤也可以简单理解为给⼀个函数绑定好参数,然后返回⼀个参数已经设定好或者预留好
的函数,
想要基于print函数,适配⽣成⼀个新的函数,这个函数固定第1个参数传递hello变量,
第⼆个参数预留出来,在调⽤的时候进⾏设置
#includ <iostream>
#include <string>
#include <functional>void print(const std::string &str,int num)
{std::cout << str <<  num << std::endl;
}int main()
{//print("hello");auto func = std::bind(print, "hello",std::placeholders::_1);func(10);//----打印结果 hello 10auto func = std::bind(print, "hello",std::placeholders::_1, std::placeholders::_2);func(10,20);// ---打印结果 hello 10 20即传入的参数是传给第二个、第三个以及之后的参数//func();//直接调用func()就相当于调用print和传入hello参数//std::placeholders::_1, std::placeholders::_2  预留一个参数 预留两个参数return 0;
}

bind函数作用:当我们设计线程池或者任务池的时候,比如要设置一个任务队列,这个任务队列里面要包含两个信息,任务要处理的数据以及这个数据要如何被处理(处理数据的方法) 所以我们要给任务池中添加函数进去、再添加一个数据进去

#includ <iostream>
#include <string>
#include <functional>
#include <vector>void print(const std::string &str,int num)
{std::cout << str <<  num << std::endl;
}int main()
{//using定义类型别名 Task 代表std::function<void()>类型//std::function 是 C++ 标准库 <functional> 头文件里的一个模板类,//它属于通用的多态函数包装器。//其作用是存储、复制和调用任何可调用对象//std::function<void()> 是 std::function 的一个具体实例化using Task = std::function<void()>;std::vector<Task> arry; //一个任务数组  arry.push_back(std::bind(print, "hello",10)); //任务组中放入的是数据和对数据的处理方法//bind它的作用是创建一个新的可调用对象,这个新对象会绑定指定的函数和参数。arry.push_back(std::bind(print, "nihao",20));arry.push_back(std::bind(print, "hhhhh",30));for(auto &f:arry){f();  //f() 调用存储在 f 中的可调用对象,也就是执行之前绑定的 print 函数。}return 0;
}

定时器:

定时去销毁不活跃的连接

1、int  timerfd_create(int  clockid,  int  flags)

创建一个定时器 (linux下一切皆文件)所有定时器的操作也是当作文件去操作的

clockid:CLOCK_REALTIME----以系统时间作为计时基准值(如果系统时间发生改变就会出问题)(一般不用)

CLOCK_MONOTONIC---以系统启动时间进行递增的一个基准值(定时器不会随着系统时间的改变而改变)

返回值:文件描述符

flags: 0 --- 阻塞操作

linux下一切皆文件,定时器的操作也是跟文件操作并没有区别,而是定时器定时原理每隔一段时间(定时器的超时时间),系统就会给这个描述符对应的定时器写入一个8字节的数据

创建一个定时器,定时器定立的超时时间是3s,也就是说每3s计算一次超时

从启动开始,每隔3s,也就是每3s计算一次超时

从启动开始,每隔3s中,系统都会给描述符写入一个1,表示从上一次读取数据到现在超时了1次

假设30s之后开始读数据,则这个时候会读取到10,表示上一次读取数据到现在超时了10次

2、int  timerfd_settime(int  fd, int  flags,  struct  itimerspec  *new, struct  itimerspce  *old);

功能:启动定时器

fd:timerfd_create函数的返回值---文件描述符---创建的定时器的标识符

flags:默认设置为0---使用相对时间(相对于当前的超时时间往后延长多少时间之后的超时)

struct  itimerspec  *new:设置的超时时间

struct timespec {time_t tv_sec; /* 秒 */long tv_nsec; /* 纳秒 */};struct itimerspec {struct timespec it_interval; /* 第⼀次之后的超时间隔时间 */struct timespec it_value; /* 第⼀次超时时间 */};

struct  itimerspce  *old:用于接收当前定时器原有的超时时间设置,保存起来以便于还原(不需要还原也可以直接传空)        

定时器的基本代码:

#include <stdio.h>
#include <unistd.h>
#inlcude <fcntl.h>
#include <stdint.h>
#include <sys/timerfd.h>int main()
{int timerfd_create(CLOCK_MONOTONIC,0);if(timerfd < 0){perror("timerfd_create error");return -1;}struct itimerspec itime;itime.it_value.tv_sec = 1;itime.it_value.tv_nsec = 0;//第一次超时时间为1s后itime.it_interval.tv_sec = 1;itime.it_interval.tv_nsec = 0;//第一次超时后,每次超时的时间间隔timerfd_settime(timerfd, 0, &itime, NULL);while(1){uint64_t times; //8字节大小int ret = read(timerfd, &times, 8);if(ret < 0){perror("read error");return -1;}printf("超时了,距离上一次超时了%d次\n",times);}close(timerfd);return 0;
}

定时器的作用:高并发的服务器需要定时的去清理不活跃的连接,定义一个定时器,每隔一秒去检测,(每隔一秒把连接拿过来遍历一下,看谁是非活跃超时了)超时了就把它释放掉,每隔一秒来一次。

那如果有上万个连接,全遍历一遍效率就会很低很低。

这时候⼤家就会想到,我们可以针对所有的连接,根据每个连接最近⼀次通信的系统时间建立一个小根堆,这样只需要每次针对堆顶部分的连接逐个释放,直到没有超时的连接为⽌,这样也可以大大提高处理的效率。
另一种方案: 时间轮
设置一个tick滴答指针,指向哪里就代表哪里任务超时了
如果tick滴答,以秒为计时单位,如果当前的数组有7个元素,那么最大定时时间就只有7s
如果定时器想要设置超大时间定时任务 (不可能去设置一个超大的数组吧)
可以采⽤多层级的时间轮,有秒针轮,分针轮,时针轮    
设置以天为单位的时间轮:
存在的问题:
1、上面这样的数组,同一时刻的定时任务只能添加一个,需要考虑如何在同一时刻支持添加多个定时任务?

解决方法: 将时间轮的一维数组设计为二维数组(每一个元素都是一个数组)
2、假设当前的定时任务是一个连接的非活跃销毁任务,这个任务什么时候添加到时间轮中比较合适?
 
一个连接30s内都没有通信,则是一个非活跃连接,这时候就销毁
但是一个连接如果在建立的时候添加了一个30s后的销毁任务,但是这个连接30s内人家有数据通信,在第30s的时候不是一个非活跃连接
思想:需要在一个连接有IO事件产生的时候,延迟定时任务的执行
作为一个时间定时器,本身并不关注任务类型,只要是时间到了就需要被执行
解决方案:类的析构函数  +  职能指针share_ptr, 通过这两个技术可以实现定时任务的延时
1、使用一个类,对定时任务进行封装,类实例化的每一个对象,就是一个定时任务对象,当对象被销毁的时候,再去执行定时任务( 将定时任务的执行放到析构函数中
2、share_ptr用于对new的对象进行空间管理,当share_ptr对一个对象进行管理的时候,内部有一个计数器,计数器为0的时候,则释放所管理的对象。
int *a = new  int;
std::share_ptr<int>  pi(a);
std::share_ptr<int>  pi1(pi);
a对象只有在pi计数为0的时候,才会被释放
当针对pi又构建了一个shared_ptr对象pi1,则pi和pi1计数器为2
但是如果时针对原始对象进行构造,并不会跟pi和pi1共享计数
当pi和pi1中任意一个被释放的时候,只是计数器-1,因此它们管理的a对象并没有被释放,只有当pi和pi1都被释放了,计数器为0了,才会释放管理的a对象
基于这个思想,我们可以使用share_ptr来管理定时器任务对象
例如:对象被销毁的时候,任务(task)才会被执行() 智能指针里面有一个ptr指向task,将智能指针放到定时数组里面,两秒之后,智能指针被释放计数器为0,tsak会被释放掉,就会执行任务。如果在两秒之间,连接又发送了数据,这个连接变为活跃的,我们就针对share_ptr再生成一个share_ptr,计数器就变为了2,把智能指针添加到时间轮里面去,第一次不会执行task,只有第二次会执行task。
#include<iostream>
#include<vector>
#include <unordered_map>
#include <cstdint>
#include <functional>
#include <memory>
#include <unistd.h>using TaskFunc = std::function<void()>;
using ReleaseFunc = std::function<void()>;class TimerTask  //这个类代表定时器任务
{
private:uint64_t _id; //定时器任务对象IDuint32_t _timeout; //定时器任务的超时时间bool _canceled;  //false表示没有被取消  true表示被取消了TaskFunc _task_cb; //定时器对象要执行的定时任务ReleaseFunc _release; //用于删除 TimerWheel中保存的定时器对象信息public:TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb) :_id(id),_timeout(delay),//外界自己传入_task_cb(cb),_canceled(false){}~TimerTask()  //执行定时器任务{if(_canceled == false)_task_cb(); //当定时任务触发时,需要执行的具体操作//在析构的时候执行是因为 定时器的任务是销毁不活跃的连接 那么 他的本质任务就是销毁 即可以在类对象析构的时候任务对象被销毁//具体执行什么函数会自己设置 在这个任务构造的时候 需要自己传入的参数第三个_release();// 从TimerWheel 的 _timers 哈希表中删除当前定时器任务的信息 --调用这个函数就是调用TimerWheel类中的RemoveTimer(因为下面的bind函数)}void Cancel(){_canceled = true; //true代表已经被取消}void SetRelease(const ReleaseFunc &cb)  //传入的参数是函数{_release = cb; }uint32_t DelayTime(){return _timeout;}
};class TimerWheel  //管理这些定时器任务
{
private:using WeakTask = std::weak_ptr<TimerTask>;using PtrTask = std::shared_ptr<TimerTask>;int _capacity; //表盘最大数量---就是最大延迟时间//用于管理 TimerTask 对象的生命周期,确保任务对象在被添加到时间轮中并且还有其他地方引用时不会被提前销毁。std::vector<std::vector<PtrTask>> _wheel; //时间轮二维数组里面放的不是任务task而是对任务的share_ptr指针int _tick; //tick走到哪里哪里执行  (即释放哪里的对象)执行哪里的任务//为了避免因哈希表对任务对象的引用而导致对象无法被正常销毁的情况,同时又能在需要时获取到任务对象进行操作。std::unordered_map<uint64_t, WeakTask> _timers; //放入的WeakTask类型,只有这样在后面构造share_ptr的时候才会共享计数,而且自身也不影响计数
private:void RemoveTimer(uint64_t id) //从管理(map)中删除{auto it = _timers.find(id);if(it != _timers.end()){_timers.erase(it);}}
public:TimerWheel():_capacity(60), _tick(0),_wheel(_capacity) {}void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb)//添加定时任务 --第三个参数就是定时器任务触发时,具体需要执行的任务{PtrTask pt(new TimerTask(id, delay, cb));pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));//将RemoveTimer绑定一个参数,得到的函数,作为参数传递给SetRelease函数int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);//数组_timers[id] = WeakTask(pt); //_timers哈希表中,值为id的元素(如果有就跟新,如果没有就新创建)  WeakTask(pt)----以pt这个 std::shared_ptr为参数构建了一个std::weak_ptr<TimerTask> 类型的弱引用}void TimerRefresh(uint64_t id)//刷新/延迟定时任务{//通过保存的定时器对象的weak_ptr构造一个share_ptr出来,添加到轮子中auto it = _timers.find(id);if(it == _timers.end()){return;//没找到定时任务,无法进行刷新,无法延迟}PtrTask pt = it->second.lock(); //lock获取weak_ptr管理的对象对应的share_ptr//it->second代表  与id对应的 std::weak_ptr<TimerTask> 对象//std::weak_ptr 类的一个成员函数,它的作用是尝试创建一个指向 std::weak_ptr 所观察对象的 std::shared_ptr//从 _timers 哈希表中找到与给定 id 对应的 std::weak_ptr<TimerTask> 对象,//然后调用其 lock() 方法尝试获取一个指向该 TimerTask 对象的 std::shared_ptr。//如果该 TimerTask 对象还存在(即其引用计数不为 0),则 lock() 方法会返回一个有效的 std::shared_ptr,//并将其赋值给 pt;如果该 TimerTask 对象已经被销毁(引用计数为 0),则 lock() 方法会返回一个空的 std::shared_ptr。//为什么这样写????//由于 _timers 中存储的是 std::weak_ptr,我们不能直接通过它来操作对象。//因此,需要调用 lock() 方法获取一个 std::shared_ptr,这样才能确保在操作对象时,对象是存在的。//同时,使用 std::shared_ptr 操作对象可以保证在操作期间对象不会被意外销毁,因为 std::shared_ptr 会增加对象的引用计数。int dalay = pt->DelayTime();//DelayTime() 这个时间外界自己传入int pos = (_tick + dalay) % _capacity;_wheel[pos].push_back(pt); //重新更新位置}void TimerCancel(uint64_t id){auto it = _timers.find(id);if(it != _timers.end()){return;//没找到定时任务,无法进行刷新,无法延迟}PtrTask pt = it->second.lock(); //lock获取weak_ptr管理的对象对应的share_ptrif(pt)pt->Cancel();}//这个函数应该每秒被执行一次,相当于秒针向后走了一步void RunTimerTask(){_tick = (_tick + 1) % _capacity;_wheel[_tick].clear();//清空指定位置的数组,就会把数组中保存的所有管理定时器对象的share_ptr释放掉//它会调用 std::vector 的 clear 方法,将该槽对应的 std::vector<PtrTask> 中的所有 std::shared_ptr<TimerTask> 移除。//当 std::shared_ptr 被移除时,如果该 std::shared_ptr 是最后一个指向 TimerTask 对象的强引用,//那么它所管理的 TimerTask 对象的引用计数会变为 0,从而触发 TimerTask 对象的析构函数 ~TimerTask()。}
};
class Test
{
public:Test(){std::cout << "构造" << std::endl;}~Test(){std::cout << "构造" << std::endl;}
};
void DelTest(Test *t)
{delete t;
}int main()
{TimerWheel tw;Test *t = new Test();tw.TimerAdd(888,5,std::bind(DelTest,t));//设置具体的任务id、延时时间、以及定时器触发时具体要执行的任务for(int i = 0; i < 5; i++){tw.TimerRefresh(888);//刷新定时任务tw.RunTimerTask();//向后移动秒针sleep(1);}while(1){tw.RunTimerTask();sleep(1);}return 0;
}

相关文章:

  • 【Qt】界面优化
  • 4.1.1 Redis相关命令详解及其原理
  • 深入解析分类模型评估指标:ROC曲线、AUC值、F1分数与分类报告
  • OCCT 入门(3)核心模块与架构
  • 游戏测试入门知识
  • matlab中进行海浪模型仿真
  • 利用pnpm patch命令实现依赖包热更新:精准打补丁指南
  • ARM Cortex汇编宏定义
  • 基于 PyGetWindow 获取窗口信息和控制窗口
  • 安防监控视频管理平台EasyCVR助力建筑工地施工4G/5G远程视频监管方案
  • AgentGPT 在浏览器中组装、配置和部署自主 AI 代理 入门介绍
  • [250415] OpenAI 推出 GPT-4.1 系列,支持 1M token
  • Python正则表达式有哪些常用匹配字符?
  • 刚刚丨OpenAI发布最新模型——GPT-4.1
  • 【大模型实战篇】--阿里云百炼搭建MCP Agent
  • 位图和布隆过滤器
  • idea如何克隆拉取远程git项目到本地
  • 快速幂+公共父节点
  • 机器人发展未来两年会有突破吗?
  • Tauri 桌面端开发
  • 流浪猫给车主造成困扰,长春一小区拟投药应对?律师:此举欠妥
  • 国务院任免国家工作人员:饶权任国家文物局局长
  • 4月份全国93个国家气象站日最高气温达到或突破极值
  • 上海第三家“胖永辉”在浦东开业,设立了外贸产品专区
  • 加拿大驾车撞人事件遇难人数升到11人
  • 媒体:每一个“被偷走的人生”,都该得到公道和正义