I/O多路转接之select、poll、epoll详解
目录
I/O多路转接之select
select初识
select函数
select工作原理
select 就绪条件
select的缺点
如何实现一个select服务器?
关于select服务器的一些说明
I/O多路转接之poll
poll初始
poll函数
poll 工作原理
Poll 的优点
Poll 的缺点
如何实现一个 poll 服务器?
I/O多路转接之epoll
epoll初始
epoll相关函数
epoll_create()
epoll_ctl()
epoll_wait()
Epoll 工作原理
如何实现一个 Epoll 服务器?
Epoll 的优点
Epoll工作方式
I/O多路转接之select
select初识
select是一个系统调用,用来进行I/O多路转接
-
select可以让程序同时监视多个文件描述符上的事件是否就绪
-
当监视的多个文件描述符中有一个或多个事件就绪时,select才会成功返回并将对应的文件描述符就绪的事件告知调用者
select函数
函数原型
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
-
nfds:表示监视的文件描述符的最大值+1
-
readfds:输入输出型参数,输入表示要监视哪些文件描述符的读事件是否就绪,输出表示监视的哪些文件描述符的读事件已经就绪
-
writefds:输入输出型参数,输入表示要监视哪些文件描述符的写事件是否就绪,输出表示监视的哪些文件描述符的写事件已经就绪
-
exceptfds:输入输出型参数,输入表示要监视哪些文件描述符的异常事件是否就绪,输出表示监视的哪些文件描述符的异常事件已经就绪
-
timeout:输入输出型参数,输入由用户设置select的等待时间,输出表示timeout的剩余时间
参数timeout的设定值
-
NULL:select调用后进行阻塞等待,直到被监视的文件描述符的某个事件就绪才会返回
-
0:select调用后进行非阻塞等待,无论被监视的文件描述符上的某个事件是否就绪,select检测后都立即返回
-
特定时间:select调用后在指定的时间进行等待,如果指定时间内被监视的文件描述符没有就绪,则select超时返回
select返回值
-
如果函数调用成功,则返回事件就绪的文件描述符的个数
-
如果在指定的timeout时间内没有文件描述符就绪,select返回0,表示没有文件描述符就绪
-
如果函数调用失败,select返回-1,同时错误码被设置
调用失败时,错误码可能被设置为
-
EBADF:表示select监视的文件描述符无效,例如该文件被关闭
-
EINTR:表示select调用被一个信号中断
-
EINVAL:表示无效的参数,例如 nfds 参数小于0或者timeout参数的值为负数
-
ENOMEM:表示系统资源不足,无法完成select系统调用
-
EFAULT:表示无效的指针,例如 readfds、writefds、exceptfds 或 timeout 指向的内存地址无效
fd_set结构
fd_set是一个位图结构,用于将多个文件描述符组织在一起,以便select能够同时监控这些文件描述符的状态,fd_set一般能容纳的文件描述符数量是1024个。为了方便操作 fd_set,系统定义了一系列宏,
-
FD_ZERO:清空文件描述符集合,将所有位设置为0 。如:
fd_set readfds; FD_ZERO(&readfds);
-
FD_SET:将指定的文件描述符添加到集合中。如:
int fd=3; FD_SET(fd,&readfds);
-
FD_CLR:从集合中移除指定的文件描述符。如:
FD_CLR(fd,&readfds);
-
FD_ISSET:检查指定的文件描述符是否在集合中。如:
FD_ISSET(fd,&readfds);
timeval结构
timeout是一个指向timeval的结构体,可以根据需求设置秒数或者微秒数,来表示select的超时时间
struct timeval {long tv_sec; // 秒数long tv_usec; // 微秒数(1秒 = 1,000,000微秒) };
select工作原理
我们通过FD_SET将需要监视的文件描述符设置进集合里,fd_set对应的二进制位就会被设置为1,例如:fd=5被设置,则fd_set的第6个比特位设置为1 (因为fd_set的下标是从0开始计算的,类似于数组下标从0开始),select是系统调用,需要从用户态进入内核态,进行文件描述符的检测,当select返回时,从内核态进入用户态,如果检测到对应的fd事件已经就绪,则fd_set中对应的位保持不变,若对应的fd事件没有就绪,则fd_set中对应的位设置位0。这样一次 select 调用就完成了
select 就绪条件
读就绪
-
socket内核中,接收缓冲区的字节数大于等于低水位标记
SO_RCVLOWAT(默认为1)
,表示接收缓冲区中有数据可读,读事件就绪 -
socket TCP通信中,对端关闭连接(即发送一个FIN包),表示发送方已经完成数据发送,该套接字应该标记为读就绪,因为该套接字的接收缓冲区可能还有未读取的数据,如果接收缓冲区没有数据,则调用 read 或 recv 会返回0,表示对端已经关闭了连接
-
监听的socket上有新的连接请求,该套接字被标记为读就绪,调用 accept 接收新连接
-
socket上有未处理的错误,该套接字被标记为读就绪,调用 read 或 recv 函数会返回 -1,并设置错误码,以此来提醒进程检查错误状态
写就绪
-
socket内核中,发送缓冲区的可用空间大于等于低水位标记
SO_SNDLOWAT(默认为1)
,表示发送缓冲区中有足够的空间可以写入数据,写事件就绪 -
socket的写操作被关闭(close或者shutdown),写入数据的操作会失败(例如触发
SIGPIPE
信号或返回错误),select将写事件标记为就绪,提醒进程检查写操作的状态 -
socket使用非阻塞connect,连接建立成功,写事件就绪,可以发送数据,连接建立失败,写事件就绪,进程可以主动检查连接状态,避免继续写入数据
-
socket上有未读取的错误,select会认为写事件就绪,调用 write 或 send 函数会返回 -1,并设置错误码,以此来提醒进程检查错误状态
异常就绪
-
socket上收到带外数据。带外数据和TCP的紧急模式相关,TCP报头当中的URG标志位和16位紧急指针搭配使用,就能够发送/接收带外数据。
select的缺点
-
每次调用select,都要手动设置fd_set集合
-
每次调用select,都要把fd_set集合从用户态拷贝到内核态,然后在内核态遍历所有的fd,这个开销比较大
-
select支持的文件描述符数量太少
如何实现一个select服务器?
服务端
-
使用socket编程,完成套接字的创建,地址信息绑定,监听
-
定义一个fd_array数组,用于存储监听套接字和建立连接的套接字
-
每次调用select函数之前,定义一个fd_set readfds用于监视读事件 。将fd_array中的文件描述符添加进readfds中,
-
如果是监听套接字读事件就绪,说明有新的连接,调用accept获取新连接,并将该连接添加到fd_array中进行监视
-
如果是客户端连接读事件就绪,则调用read函数进行读操作
-
读事件就绪,也有可能是客户端断开连接,服务器应该调用close关闭该套接字,并将该套接字从fd_array中清除
SelectServer.hpp 头文件
#pragma once #include <iostream> #include <sys/select.h> #include "Socket.hpp" #include "Log.hpp" #include "InetAddr.hpp" using namespace socket_ljp; class SelectServer {const static int gnum = sizeof(fd_set) * 8;//fd_array数组最多能监听gnum个文件描述符const static int gdefaultfd = -1;//默认fd_array的初始值为-1 public://创建套接字SelectServer(uint16_t port) : _port(port), _listensock(std::make_unique<TcpSocket>()){_listensock->BuildListenSocket(_port);}//初始化fd_array数组,默认为-1void InitServer(){for (int i = 0; i < gnum; i++){fd_array[i] = gdefaultfd;}// 默认将_listensock->Sockfd()监听套接字添加到fd_array[0]中fd_array[0] = _listensock->Sockfd(); }// _listensock->Sockfd()套接字就绪,可以接收新连接void Accepter(){InetAddr addr;int sockfd = _listensock->Accepter(&addr); // 一定不会被阻塞!if (sockfd > 0){std::cout<<"get a new link"<<std::endl;//获取到了新的sockfd,但是不知道sockfd是否满足读取条件,所以需要添加到fd_array中,由select监管bool flag = false;//判断fd_array是否添加满了,for (int pos = 1; pos < gnum; pos++)//从下标为1开始,因为fd_array[0]是_listensock->Sockfd(){if (fd_array[pos] == gdefaultfd)//发现fd_array[pos]没有被添加,将获取到的fd添加到fd_array中{flag = true;fd_array[pos] = sockfd;//LOG(INFO, "add %d to fd_array success!\n", sockfd);break;}}if (!flag){//LOG(WARNING, "Server Is Full!\n");//如果fd_array添加满了,不再接收新的fd,将接收到的fd关闭,断开连接::close(sockfd);}}}// 连接到的fd就绪,可以开始读取void HandlerIO(int i){char buffer[1024];ssize_t n = ::recv(fd_array[i], buffer, sizeof(buffer) - 1, 0); // 这里读取不会阻塞if (n > 0){buffer[n] = 0;std::cout << "client say# " << buffer << std::endl;std::string content = "<html><body><h1>hello bite</h1></body></html>";std::string echo_str = "HTTP/1.0 200 OK\r\n";echo_str += "Content-Type: text/html\r\n";echo_str += "Content-Length: " + std::to_string(content.size()) + "\r\n\r\n";echo_str += content;// echo_str += buffer;::send(fd_array[i], echo_str.c_str(), echo_str.size(), 0); }else if (n == 0){//客户端退出了//LOG(INFO, "client quit...\n");// 关闭fd::close(fd_array[i]);// select 不要在关心这个fd了fd_array[i] = gdefaultfd;}else{//LOG(ERROR, "recv error\n");// 关闭fd::close(fd_array[i]);// select 不要在关心这个fd了fd_array[i] = gdefaultfd;}} // 一定会存在大量的fd就绪,可能是获取的sockfd,也可能是 _listensock->Sockfd()void HandlerEvent(fd_set &rfds){// 事件派发for (int i = 0; i < gnum; i++){if (fd_array[i] == gdefaultfd)continue;// fd一定是合法的fd// 合法的fd不一定就绪, 判断fd是否就绪?if (FD_ISSET(fd_array[i], &rfds)){// 读事件就绪// 1. listensockfd ,可以接收连接if (_listensock->Sockfd() == fd_array[i]){Accepter();}else{HandlerIO(i);//2. normal sockfd就绪}}}}void Loop(){while (true){// 1. 文件描述符进行初始化fd_set rfds;FD_ZERO(&rfds);int max_fd = gdefaultfd;// 2. 合法的fd 添加到rfds集合中for (int i = 0; i < gnum; i++){if (fd_array[i] == gdefaultfd)continue;FD_SET(fd_array[i], &rfds);// 2.1 更新出最大的文件fd的值if (max_fd < fd_array[i]){max_fd = fd_array[i];}}//设置超时时间struct timeval timeout = {30, 0};//listensock && accept 我们把它也看做IO类的函数。只关心新链接到来int n = ::select(max_fd + 1, &rfds, nullptr, nullptr, nullptr /*&timeout*/); // 临时switch (n){case 0:std::cout<<"time out"<<std::endl;//超时,没有连接准备就绪break;case -1:std::cout<<"select error"<<std::endl;//发生错误break;default:std::cout<<"haved event ready,n: "<<n<<std::endl;// 有n个事件准备就绪HandlerEvent(rfds);//事件派发PrintDebug();//将获取的已经就绪的fd,打印出来// sleep(1);break;}}}void PrintDebug(){std::cout << "fd list: ";for (int i = 0; i < gnum; i++){if (fd_array[i] == gdefaultfd)continue;std::cout << fd_array[i] << " ";}std::cout << "\n";}~SelectServer() {} private:uint16_t _port;std::unique_ptr<Socket> _listensock;// 1. select要正常工作,需要借助一个辅助数组,来保存所有合法fdint fd_array[gnum]; };
main.cc 源文件
#include"SelectServer.hpp" #include<memory> //./select_server 8888 int main(int argc,char* argv[]) {if(argc!=2){std::cerr<<"Usage: "<<argv[0]<<" local-port: "<<std::endl;exit(0);}uint16_t port=std::stoi(argv[1]);EnableScreen();std::unique_ptr<SelectServer> svr=std::make_unique<SelectServer>(port);svr->InitServer();svr->Loop();return 0; }
关于select服务器的一些说明
-
select同时监控文件描述符的个数取决于
fd_set
的位数,每一个比特位表示一个文件描述符 -
将fd加入到fd_set集合中,还需要一个数组fd_array来保存被监控的文件描述符,因为:
-
select返回后,从fd_array中找到文件描述符与 fd_set 进行 FD_ISSET 判断
-
select返回后,会把没有触发事件的文件描述符从fd_set中清除,所以每次select检测,都要重新把文件描述符从 fd_arry 添加到fd_set中,同时记录最大文件描述符的值,作为select的第一个参数
-
-
fd_set的大小可以调整,可能涉及到重新编译内核
I/O多路转接之poll
poll初始
poll也是一个系统调用,用来进行I/O多路转接
poll函数
函数原型
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
-
fds:指向一个 struct pollfd 类型的数组,每个元素表示一个要监视的文件描述符
-
nfds:表示 fds 数组的长度
-
timeout:表示poll的超时时间,单位是毫秒
timeout的设定值
-
-1:poll 调用后进行阻塞等待,直到被监视的文件描述符的某个事件就绪才会返回
-
0:poll 调用后进行非阻塞等待,无论被监视的文件描述符上的某个事件是否就绪,poll检测后都立即返回
-
特定时间:poll 调用后在指定的时间进行等待,如果指定时间内被监视的文件描述符没有就绪,则poll 超时返回
poll 返回值
-
如果函数调用成功,则返回事件就绪的文件描述符的个数
-
如果在指定的timeout时间内没有文件描述符就绪,poll 返回0,表示没有文件描述符就绪
-
如果函数调用失败,poll 返回-1,同时错误码被设置
调用失败后,错误码可能被设置
-
EBADF:表示 poll 监视的文件描述符无效,例如该文件被关闭
-
EINTR:表示 poll 调用被一个信号中断
-
EINVAL:表示无效的参数,例如 nfds 参数小于0或者timeout参数的值为负数
-
ENOMEM:表示系统资源不足,无法完成 poll 系统调用
-
EFAULT:表示无效的指针,例如 fds 指向的内存地址无效
struct pollfd的结构
struct pollfd {int fd; // 文件描述符short events; // 要监视的事件类型short revents; // 实际发生的事件类型 };
events 和 revents 的可能取值
事件 | 描述 | 是否可作为输入 | 是否可作为输出 |
---|---|---|---|
POLLIN | 数据(包括普通数据和优先数据)可读 | 是 | 是 |
POLLRDNORM | 普通数据可读 | 是 | 是 |
POLLRDBAND | 优先级带数据可读(Linux不支持) | 是 | 是 |
POLLPRI | 高优先级数据可读,比如TCP带外数据 | 是 | 是 |
POLLOUT | 数据(包括普通数据和优先数据)可写 | 是 | 是 |
POLLWRNORM | 普通数据可写 | 是 | 是 |
POLLWRBAND | 优先级带数据可写 | 是 | 是 |
POLLRDHUP | TCP连接被对方关闭,或者对方关闭了写操作,它由GNU引入 | 是 | 是 |
POLLERR | 错误 | 否 | 是 |
POLLHUP | 挂起。比如管道的写端被关闭后,读端描述符上将收到POLLHUP事件 | 否 | 是 |
POLLNVAL | 文件描述符没有打开 | 否 | 是 |
这些取值都是以宏定义的,我们将需要监视的类型添加到 events 中,poll返回后,检测revents是否有对应的事件就绪
poll 工作原理
内核实现基本和select一样,只是解决了文件描述符数量限制问题
Poll 的优点
-
pollfd 中包含了要监视的 events 和发生的 revents ,不再用传参的方式,更加方便
-
poll没有最大文件描述符数量限制(但是数量过大,性能也会下降)
Poll 的缺点
-
每次调用 poll 都需要把大量的 pollfd 结构从用户态拷贝到内核态 ,开销较大
-
poll 同时监视大量文件描述符,而大量文件描述符可能只有少数处于就绪状态,所以监视的文件描述符数量越多,效率也会下降
如何实现一个 poll 服务器?
通过改写 select 代码,实现一个简单的poll服务器
PollServer.hpp 头文件
#pragma once #include <iostream> #include <poll.h> #include "Socket.hpp" #include "Log.hpp" #include "InetAddr.hpp" using namespace socket_ljp; class PollServer {const static int gnum = sizeof(fd_set) * 8;//可以自己设置,决定fd_events可以同时监管多少个fdconst static int gdefaultfd = -1;//默认fd_events的初始值为-1 public://创建套接字PollServer(uint16_t port) : _port(port), _listensock(std::make_unique<TcpSocket>()){_listensock->BuildListenSocket(_port);}//初始化fd_array数组,默认为-1void InitServer(){for (int i = 0; i < gnum; i++){fd_events[i].fd = gdefaultfd;fd_events[i].events=0;fd_events[i].revents=0;}fd_events[0].fd = _listensock->Sockfd(); // 默认将_listensock->Sockfd()监听套接字添加到fd_array[0]中fd_events[0].events=POLLIN;}void Accepter(){InetAddr addr;int sockfd = _listensock->Accepter(&addr); // 会不会被阻塞!一定不会!if (sockfd > 0){std::cout<<"get a new link"<<std::endl;bool flag = false;for (int pos = 1; pos < gnum; pos++){if (fd_events[pos].fd == gdefaultfd){flag = true;fd_events[pos].fd = sockfd;fd_events[pos].events = POLLIN;LOG(INFO, "add %d to fd_array success!\n", sockfd);break;}}if (!flag){LOG(WARNING, "Server Is Full!\n");::close(sockfd);}}}// 连接到的fd就绪,可以开始读取void HandlerIO(int i){char buffer[1024];ssize_t n = ::recv(fd_events[i].fd, buffer, sizeof(buffer) - 1, 0); // 这里读取会阻塞吗?不会if (n > 0){buffer[n] = 0;std::cout << "client say# " << buffer << std::endl;std::string content = "<html><body><h1>hello bite</h1></body></html>";std::string echo_str = "HTTP/1.0 200 OK\r\n";echo_str += "Content-Type: text/html\r\n";echo_str += "Content-Length: " + std::to_string(content.size()) + "\r\n\r\n";echo_str += content;// echo_str += buffer;::send(fd_events[i].fd, echo_str.c_str(), echo_str.size(), 0); // 临时方案}else if (n == 0){//LOG(INFO, "client quit...\n");// 关闭fd::close(fd_events[i].fd);// select 不要在关心这个fd了fd_events[i].fd = gdefaultfd;fd_events[i].events = 0;fd_events[i].revents = 0;}else{//LOG(ERROR, "recv error\n");// 关闭fd::close(fd_events[i].fd);// select 不要在关心这个fd了fd_events[i].fd = gdefaultfd;fd_events[i].events = 0;fd_events[i].revents = 0;}} // 一定会存在大量的fd就绪,可能是获取的sockfd,也可能是 _listensock->Sockfd()void HandlerEvent(){// 事件派发for (int i = 0; i < gnum; i++){if (fd_events[i].fd == gdefaultfd)continue;// fd一定是合法的fd// 合法的fd不一定就绪, 判断fd是否就绪?if (fd_events[i].revents & POLLIN){// 读事件就绪// 1. listensockfd 2. normal sockfd就绪?if (_listensock->Sockfd() == fd_events[i].fd){Accepter();}else{HandlerIO(i);}}}}void Loop(){int timeout = -1;while (true){// _listensock->Accepter();// 不能,listensock && accept 我们把他也看做IO类的函数。只关心新链接到来,等价于读事件就绪!int n = ::poll(fd_events, gnum, timeout); // 临时switch (n){case 0:std::cout<<"time out"<<std::endl;//LOG(DEBUG, "time out\n");break;case -1:std::cout<<"poll error"<<std::endl;//LOG(ERROR, "poll error\n");break;default:std::cout<<"haved event ready, n : "<<n<<std::endl;HandlerEvent();PrintDebug();// sleep(1);break;}}} void PrintDebug(){std::cout << "fd list: ";for (int i = 0; i < gnum; i++){if (fd_events[i].fd == gdefaultfd)continue;std::cout << fd_events[i].fd << " ";}std::cout << "\n";}~PollServer() {} private:uint16_t _port;std::unique_ptr<Socket> _listensock;struct pollfd fd_events[gnum];//结构体数组//struct pollfd {// int fd; 要监视的文件描述符// short events; 感兴趣的事件(如可读、可写等)// short revents; 实际发生的事件(由内核填充)//}; };
main.cc 源文件
#include"PollServer.hpp" #include<memory> //./poll_server 8888 int main(int argc,char* argv[]) {if(argc!=2){std::cerr<<"Usage: "<<argv[0]<<" local-port: "<<std::endl;exit(0);}uint16_t port=std::stoi(argv[1]);EnableScreen();std::unique_ptr<PollServer> svr=std::make_unique<PollServer>(port);svr->InitServer();svr->Loop();return 0; }
I/O多路转接之epoll
epoll初始
epoll也是一个系统调用,用来进行I/O多路转接
-
epoll就是为了同时处理大量文件描述符而改进的poll
-
epoll在2.5.44内核中被引进,它几乎具备了select和poll的所有优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法
epoll相关函数
epoll_create()
创建一个新的 epoll 实例,返回一个文件描述符,用于后续操作
函数原型
int epoll_create(int size);
-
size:自从linux2.6.8 之后,size 参数是被忽略的,可以传入任意正整数
-
epoll模型创建成功返回其对应的文件描述符,否则返回-1,同时错误码会被设置
-
当不再使用时,必须调用 close 函数关闭 epollfd,当所有引用epoll实例的文件描述符都被关闭时,内核将销毁该 epoll 实例并释放相关资源
epoll_ctl()
向 epoll 实例中添加、修改或删除文件描述符
函数原型
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
-
epfd: epoll_create() 返回的文件描述符。
-
op:操作类型,可以是 EPOLL_CTL_ADD 、 EPOLL_CTL_MOD 或 EPOLL_CTL_DEL
-
fd:要操作的文件描述符。
-
event:指向 struct epoll_event 的指针,用于指定要监视的事件。
-
返回值:函数调用成功返回0,调用失败返回-1,同时错误码会被设置。
epoll_event 结构体
struct epoll_event {uint32_t events; // 事件类型epoll_data_t data; // 附加数据 };
-
events:表示要监视的事件类型
-
data:是一个联合体,一般使用联合体中的fd,表示要监视的文件描述符
epoll_data_t 联合体
typedef union epoll_data {void *ptr; // 指针int fd; // 文件描述符uint32_t u32; // 32 位无符号整数uint64_t u64; // 64 位无符号整数 } epoll_data_t;
events 常用的取值
-
EPOLLIN:表示对应的文件描述符可以读(包括对端SOCKET正常关闭)
-
EPOLLOUT:表示对应的文件描述符可以写
-
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)
-
EPOLLERR:表示对应的文件描述符发送错误
-
EPOLLHUP:表示对应的文件描述符被挂断,即对端将文件描述符关闭了
-
EPOLLET:将epoll的工作方式设置为边缘触发(Edge Triggered)模式
-
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听该文件描述符的话,需要重新将该文件描述符添加到epoll模型中
epoll_wait()
用于等待 epoll 中文件描述符的I/O事件
函数原型
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
-
epfd: epoll_create() 返回的文件描述符。
-
events:指向一个 struct epoll_event 数组,用于存储返回的事件。
-
maxevents:events数组的最大大小
-
timeout:超时时间,单位是毫秒
timeout的设定值
-
-1:epoll_wait 调用后进行阻塞等待,直到被监视的文件描述符的某个事件就绪才会返回
-
0:epoll_wait 调用后进行非阻塞等待,无论被监视的文件描述符上的某个事件是否就绪,epoll_wait 检测后都立即返回
-
特定时间:epoll_wait 调用后在指定的时间进行等待,如果指定时间内被监视的文件描述符没有就绪,则 epoll_wait 超时返回
epoll_wait 返回值
-
如果函数调用成功,则返回事件就绪的文件描述符的个数
-
如果在指定的timeout时间内没有文件描述符就绪,epoll_wait 返回0,表示没有文件描述符就绪
-
如果函数调用失败,epoll_wait 返回-1,同时错误码被设置
Epoll 工作原理
红黑树和就绪队列
当进程调用epoll_create 函数时,Linux内核会创建一个 eventpoll 结构体,也就是epoll模型
struct eventpoll{...//红黑树的根节点,这棵树中存储着所有添加到epoll中的需要监视的事件struct rb_root rbr;//就绪队列中则存放着将要通过epoll_wait返回给用户的满足条件的事件struct list_head rdlist;... }
epoll模型中的红黑树用于存储监视的文件描述符与它们所对应的事件,调用epoll_ctl函数就是对红黑树进行增删查改
epoll模型中的就绪队列存储某些事件已经就绪的文件描述符,调用epoll_wait函数就是从就绪队列中获取已经就绪的事件
-
红黑树是一种二叉搜索树,因此必须有键值key,文件描述符就是红黑树的key值
-
调用epoll_ctl向红黑树当中新增节点时,如果设置了 EPOLLONESHOT 选项,当监听完这次事件后,如果还需要继续监听该文件描述符则需要重新将其添加到epoll模型中,本质就是当设置了 EPOLLONESHOT 选项的事件就绪时,操作系统会自动将其从红黑树当中删除
-
而如果调用 epoll_ctl 向红黑树当中新增节点时没有设置 EPOLLONESHOT,那么该节点插入红黑树后就会一直存在,除非用户调用epoll_ctl将该节点从红黑树当中删除
回调机制
-
添加到红黑树的文件描述符,都会建立回调方法,这个回调方法再内核中叫做 ep_poll_callback
-
epoll不需要主动检测文件描述符是否就绪,当红黑树中的文件描述符就绪时,会自动调用对应的回调方法,将就绪的文件描述符添加到就绪队列中
-
用户调用epoll_wait函数获取就绪事件时,只需要关注就绪队列是否为空即可
如何实现一个 Epoll 服务器?
EpollServer.hpp 头文件
#pragma once #include <iostream> #include <string> #include <memory> #include <sys/epoll.h> #include "Log.hpp" #include "Socket.hpp" using namespace socket_ljp; class EpollServer {const static int size = 128;const static int num = 128; public:EpollServer(uint16_t port) : _port(port), _listensock(std::make_unique<TcpSocket>()){_listensock->BuildListenSocket(port);_epfd = ::epoll_create(size);if (_epfd < 0){LOG(FATAL, "epoll_create error!\n");exit(1);}LOG(INFO, "epoll create success, epfd: %d\n", _epfd);}void InitServer(){// 新链接到来,我们认为是读事件就绪struct epoll_event ev;ev.events = EPOLLIN;// ev.events = EPOLLIN | EPOLLET;ev.data.fd = _listensock->Sockfd(); // 为了在事件就绪的时候,得到是那一个fd就绪了// 必须先把listensock 添加到epoll中.int n = ::epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock->Sockfd(), &ev);if (n < 0){LOG(FATAL, "epoll_ctl error!\n");exit(2);}LOG(INFO, "epoll_ctl success, add new sockfd : %d\n", _listensock->Sockfd());}std::string EventsToString(uint32_t events){std::string eventstr;if (events & EPOLLIN)eventstr = "EPOLLIN";if (events & EPOLLOUT)eventstr += "|EPOLLOUT";return eventstr;}void Accepter(){InetAddr addr;int sockfd = _listensock->Accepter(&addr); // 肯定不会被阻塞if (sockfd < 0){LOG(ERROR, "获取连接失败\n");return;}LOG(INFO, "得到一个新的连接: %d, 客户端信息: %s:%d\n", sockfd, addr.Ip().c_str(), addr.Port());// 得到了一个新的sockfd,我们能不能要进行read、recv?不能.// 将新的sockfd添加到epoll中!struct epoll_event ev;ev.data.fd = sockfd;ev.events = EPOLLIN;::epoll_ctl(_epfd, EPOLL_CTL_ADD, sockfd, &ev);LOG(INFO, "epoll_ctl success, add new sockfd : %d\n", sockfd);}void HandlerIO(int fd){char buffer[4096];// 你怎么保证buffer就是一个完整的请求?或者有多个请求??// 一个fd,都要有一个自己的缓冲区// 引入协议int n = ::recv(fd, buffer, sizeof(buffer) - 1, 0); // 会阻塞吗?不会if (n > 0){buffer[n] = 0;std::cout << buffer;std::string response = "HTTP/1.0 200 OK\r\n";std::string content = "<html><body><h1>hello bite, hello world</h1></body></html>";response += "Content-Type: text/html\r\n";response += "Content-Length: " + std::to_string(content.size()) + "\r\n";response += "\r\n";response += content; ::send(fd, response.c_str(), response.size(), 0);}else if (n == 0){LOG(INFO, "client quit, close fd: %d\n", fd);// 1. 从epoll中移除, 从epoll中移除fd,这个fd必须是健康&&合法的fd. 否则会移除出错::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);// 2. 关闭fd::close(fd);}else{LOG(ERROR, "recv error, close fd: %d\n", fd);// 1. 从epoll中移除, 从epoll中移除fd,这个fd必须是健康&&合法的fd. 否则会移除出错::epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, nullptr);// 2. 关闭fd::close(fd);}}void HandlerEvent(int n){for (int i = 0; i < n; i++){int fd = revs[i].data.fd;uint32_t revents = revs[i].events;LOG(INFO, "%d 上面有事件就绪了,具体事件是: %s\n", fd, EventsToString(revents).c_str());if (revents & EPOLLIN){// listensock 读事件就绪, 新连接到来了if (fd == _listensock->Sockfd())Accepter();elseHandlerIO(fd);}}}void Loop(){int timeout = -1;while (true){// 事件通知,事件派发int n = ::epoll_wait(_epfd, revs, num, timeout);switch (n){case 0:LOG(INFO, "epoll time out...\n");break;case -1:LOG(ERROR, "epoll error\n");break;default:LOG(INFO, "haved event happend!, n : %d\n", n);HandlerEvent(n);break;}}}~EpollServer(){if (_epfd >= 0)::close(_epfd);_listensock->Close();} private:uint16_t _port;std::unique_ptr<Socket> _listensock;int _epfd;struct epoll_event revs[num]; };
main.cc 源文件
#include"EpollServer.hpp" #include<memory> //./poll_server 8888 int main(int argc, char *argv[]) {if (argc != 2){std::cerr << "Usage: " << argv[0] << " local-port" << std::endl;exit(0);}uint16_t port = std::stoi(argv[1]);EnableScreen();std::unique_ptr<EpollServer> svr = std::make_unique<EpollServer>(port);svr->InitServer();svr->Loop();return 0; }
Epoll 的优点
-
接口使用方便:虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
-
数据拷贝轻量:只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到内核中, 这个操作并不频繁(而select/poll 都是每次循环都要进行拷贝)
-
事件回调机制:避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符结构加入到就绪队列中, epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就绪,这个操作时间复杂度O(1)。即使文件描述符数目很多, 效率也不会受到影响
-
没有数量限制:文件描述符数目无上限
Epoll工作方式
epoll 有两种工作方式,水平触发和边缘触发模式
水平触发 (LT,Level Triggered)
-
当epoll检测文件描述符有事件就绪时,调用epoll_wait 就会返回有事件就绪,
-
当事件就绪时,用户可以不立即处理或者处理一部分,只要底层数据没处理完,下一次调用epoll_wait 还会通知事件就绪
-
select 和 poll工作模式就是水平触发模式
-
支持阻塞读写和非阻塞读写
边缘触发 (ET,Edge Triggered)
如果在第一步将文件描述符添加到 epoll 时候使用 EPOLLET 标志,epoll进入ET工作模式
-
ET模式下,epoll只在文件描述符从不可读变为可读,或从不可写变为可写时,epoll才会返回该文件描述符
-
当epoll检测到文件描述符上事件就绪时,必须立刻处理,而且处理完毕,如果不处理完毕,且此后再没有事件就绪,epoll 也不会通知用户进行事件处理,那么没有处理完的数据就丢失了
-
ET工作模式下 epoll 通知用户的次数一般比 LT 少,因此 ET 的性能一般比 LT 性能更高,Nginx 就是默认采用ET模式使用 epoll 的。
-
只支持非阻塞的读写。
在ET模式下,一个文件描述符就绪之后,用户不会反复收到通知,看起来比LT 更高效,但是在LT 模式下,如果epoll一次通知,用户都能将就绪的事件处理完,LT 和 ET 的性能是一样的。