epoll 和ractor模型学习
2.1 流?I/O操作/阻塞
2.1.1 流
- 可以进行I/O操作的内核对象
- 文件、管道、套接字……
- 流的入口:文件描述符(fd)
-
所有对流的读写操作,我们都可以称之为IO操作。
那么当一个流中再没有数据,read的时候,或者说在流中已经写满了数据,再write,我们的IO操作就会出现一种现象,就是阻塞
-
- 阻塞等待
空出大脑可以安心睡觉。(不占用CPU宝贵的时间片)
- 非阻塞,忙轮询
浪费时间,浪费电话费,占用快递员时间(占用CPU,系统资源)
2.2 解决阻塞死等待的办法
while true {
for i in 流[] {
if i has 数据 {
读 或者 其他处理
}
}
}
2.2.3 办法二:select
while true {
select(流[]); //阻塞
for i in 流[] {
if i has 数据 {
读 或者 其他处理
}
}
}
2.2.3 办法三:epoll
while true {
可处理的流[] = epoll_wait(epoll_fd); //阻塞
for i in 可处理的流[] {
读 或者 其他处理
}
}
2.3 什么是epoll
-
与select,poll一样,对I/O多路复用的技术
-
只关心“活跃”的链接,无需遍历全部描述符集合
-
能够处理大量的链接请求(系统可以打开的文件数目)
2.4 epoll API
2.4.1 创建EPOLL
/**
* @param size 告诉内核监听的数目
*
* @returns 返回一个epoll句柄(即一个文件描述符)
*/
int epoll_create(int size);
int epfd = epoll_create(1000);
2.4.2 控制EPOLL
/**
* @param epfd 用epoll_create所创建的epoll句柄
* @param op 表示对epoll监控描述符控制的动作
*
* EPOLL_CTL_ADD(注册新的fd到epfd)
* EPOLL_CTL_MOD(修改已经注册的fd的监听事件)
* EPOLL_CTL_DEL(epfd删除一个fd)
*
* @param fd 需要监听的文件描述符
* @param event 告诉内核需要监听的事件
*
* @returns 成功返回0,失败返回-1, errno查看错误信息
*/
int epoll_ctl(int epfd, int op, int fd,
struct epoll_event *event);
struct epoll_event {
__uint32_t events; /* epoll 事件 */
epoll_data_t data; /* 用户传递的数据 */
}
/*
* events : {EPOLLIN, EPOLLOUT, EPOLLPRI,
EPOLLHUP, EPOLLET, EPOLLONESHOT}
*/
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event new_event;
new_event.events = EPOLLIN | EPOLLOUT;
new_event.data.fd = 5;
epoll_ctl(epfd, EPOLL_CTL_ADD, 5, &new_event);
2.4.3 等待EPOLL
/**
*
* @param epfd 用epoll_create所创建的epoll句柄
* @param event 从内核得到的事件集合
* @param maxevents 告知内核这个events有多大,
* 注意: 值 不能大于创建epoll_create()时的size.
* @param timeout 超时时间
* -1: 永久阻塞
* 0: 立即返回,非阻塞
* >0: 指定微秒
*
* @returns 成功: 有多少文件描述符就绪,时间到时返回0
* 失败: -1, errno 查看错误
*/
int epoll_wait(int epfd, struct epoll_event *event,
int maxevents, int timeout);
struct epoll_event my_event[1000];
int event_cnt = epoll_wait(epfd, my_event, 1000, -1);
2.4.4 epoll编程框架
//创建 epoll
int epfd = epoll_crete(1000);
//将 listen_fd 添加进 epoll 中
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd,&listen_event);
while (1) {
//阻塞等待 epoll 中 的fd 触发
int active_cnt = epoll_wait(epfd, events, 1000, -1);
for (i = 0 ; i < active_cnt; i++) {
if (evnets[i].data.fd == listen_fd) {
//accept. 并且将新accept 的fd 加进epoll中.
}
else if (events[i].events & EPOLLIN) {
//对此fd 进行读操作
}
else if (events[i].events & EPOLLOUT) {
//对此fd 进行写操作
}
}
}
2.5 触发模式
2.5.1 水平触发
水平触发的主要特点是,如果用户在监听epoll事件,当内核有事件的时候,会拷贝给用户态事件,但是如果用户只处理了一次,那么剩下没有处理的会在下一次epoll_wait再次返回该事件。
这样如果用户永远不处理这个事件,就导致每次都会有该事件从内核到用户的拷贝,耗费性能,但是水平触发相对安全,最起码事件不会丢掉,除非用户处理完毕。
##2.5.2 边缘触发
2.6 epoll服务器
#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#define SERVER_PORT (7778) // 服务器端口号
#define EPOLL_MAX_NUM (2048) // epoll 最大监听事件数
#define BUFFER_MAX_LEN (4096) // 缓冲区最大长度
char buffer[BUFFER_MAX_LEN];
// 将字符串转换为大写
void str_toupper(char *str)
{
int i;
for (i = 0; i < strlen(str); i ++) {
str[i] = toupper(str[i]);
}
}
int main(int argc, char **argv)
{
int listen_fd = 0; // 监听套接字
int client_fd = 0; // 客户端套接字
struct sockaddr_in server_addr; // 服务器地址
struct sockaddr_in client_addr; // 客户端地址
socklen_t client_len; // 客户端地址长度
int epfd = 0; // epoll 文件描述符
struct epoll_event event, *my_events; // epoll 事件结构体
// 创建套接字
listen_fd = socket(AF_INET, SOCK_STREAM, 0);
// 绑定地址和端口
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 监听所有地址
server_addr.sin_port = htons(SERVER_PORT); // 设置端口号
bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr));
// 开始监听
listen(listen_fd, 10);
// 创建 epoll 实例
epfd = epoll_create(EPOLL_MAX_NUM);
if (epfd < 0) {
perror("epoll create");
goto END;
}
// 将监听套接字添加到 epoll 中
event.events = EPOLLIN; // 监听可读事件
event.data.fd = listen_fd;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &event) < 0) {
perror("epoll ctl add listen_fd ");
goto END;
}
// 分配内存存储 epoll 事件
my_events = malloc(sizeof(struct epoll_event) * EPOLL_MAX_NUM);
while (1) {
// 等待事件发生
int active_fds_cnt = epoll_wait(epfd, my_events, EPOLL_MAX_NUM, -1);
int i = 0;
for (i = 0; i < active_fds_cnt; i++) {
// 如果是监听套接字的事件
if (my_events[i].data.fd == listen_fd) {
// 接受客户端连接
client_fd = accept(listen_fd, (struct sockaddr*)&client_addr, &client_len);
if (client_fd < 0) {
perror("accept");
continue;
}
// 打印客户端 IP 和端口
char ip[20];
printf("new connection[%s:%d]\n", inet_ntop(AF_INET, &client_addr.sin_addr, ip, sizeof(ip)), ntohs(client_addr.sin_port));
// 将客户端套接字添加到 epoll 中
event.events = EPOLLIN | EPOLLET; // 设置为边缘触发模式
event.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &event);
}
// 如果是客户端套接字的可读事件
else if (my_events[i].events & EPOLLIN) {
printf("EPOLLIN\n");
client_fd = my_events[i].data.fd;
// 读取数据
buffer[0] = '\0';
int n = read(client_fd, buffer, 5);
if (n < 0) {
perror("read");
continue;
}
else if (n == 0) { // 客户端关闭连接
epoll_ctl(epfd, EPOLL_CTL_DEL, client_fd, &event);
close(client_fd);
}
else {
printf("[read]: %s\n", buffer);
buffer[n] = '\0';
#if 1
// 将数据转换为大写并写回客户端
str_toupper(buffer);
write(client_fd, buffer, strlen(buffer));
printf("[write]: %s\n", buffer);
memset(buffer, 0, BUFFER_MAX_LEN);
#endif
/*
// 修改事件为可写
event.events = EPOLLOUT;
event.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, client_fd, &event);
*/
}
}
// 如果是客户端套接字的可写事件
else if (my_events[i].events & EPOLLOUT) {
printf("EPOLLOUT\n");
/*
client_fd = my_events[i].data.fd;
str_toupper(buffer);
write(client_fd, buffer, strlen(buffer));
printf("[write]: %s\n", buffer);
memset(buffer, 0, BUFFER_MAX_LEN);
// 修改事件为可读
event.events = EPOLLIN;
event.data.fd = client_fd;
epoll_ctl(epfd, EPOLL_CTL_MOD, client_fd, &event);
*/
}
}
}
END:
// 关闭 epoll 和监听套接字
close(epfd);
close(listen_fd);
return 0;
}
客户端
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#define MAX_LINE (1024) // 缓冲区最大长度
#define SERVER_PORT (7778) // 服务器端口号
// 设置文件描述符为非阻塞模式
void setnoblocking(int fd)
{
int opts = 0;
opts = fcntl(fd, F_GETFL); // 获取文件描述符的当前状态标志
opts = opts | O_NONBLOCK; // 设置非阻塞标志
fcntl(fd, F_SETFL, opts); // 更新文件描述符状态
}
int main(int argc, char **argv)
{
int sockfd; // 客户端套接字
char recvline[MAX_LINE + 1] = {0}; // 接收缓冲区
struct sockaddr_in server_addr; // 服务器地址结构体
// 检查命令行参数是否正确
if (argc != 2) {
fprintf(stderr, "usage ./client <SERVER_IP>\n");
exit(0);
}
// 创建套接字
if ( (sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
fprintf(stderr, "socket error");
exit(0);
}
// 初始化服务器地址结构体
bzero(&server_addr, sizeof(server_addr)); // 清空结构体
server_addr.sin_family = AF_INET; // 设置地址族为 IPv4
server_addr.sin_port = htons(SERVER_PORT); // 设置端口号(网络字节序)
// 将 IP 地址从文本格式转换为二进制格式
if (inet_pton(AF_INET, argv[1], &server_addr.sin_addr) <= 0) {
fprintf(stderr, "inet_pton error for %s", argv[1]);
exit(0);
}
// 连接服务器
if (connect(sockfd, (struct sockaddr*) &server_addr, sizeof(server_addr)) < 0) {
perror("connect");
fprintf(stderr, "connect error\n");
exit(0);
}
// 设置套接字为非阻塞模式
setnoblocking(sockfd);
char input[100]; // 存储用户输入的字符串
int n = 0; // 记录发送或接收的字节数
int count = 0; // 记录接收数据的总字节数
// 循环读取用户输入并与服务器通信
while (fgets(input, 100, stdin) != NULL)
{
printf("[send] %s\n", input);
n = 0;
// 将用户输入的字符串发送到服务器
n = send(sockfd, input, strlen(input), 0);
if (n < 0) {
perror("send");
}
n = 0;
count = 0;
// 循环读取服务器返回的数据
while (1)
{
n = read(sockfd, recvline + count, MAX_LINE); // 从服务器读取数据
if (n == MAX_LINE) // 如果读取到的数据等于缓冲区大小,继续读取
{
count += n;
continue;
}
else if (n < 0) { // 如果读取失败,打印错误信息并退出循环
perror("recv");
break;
}
else { // 如果读取成功,打印接收到的数据并退出循环
count += n;
recvline[count] = '\0'; // 添加字符串结束符
printf("[recv] %s\n", recvline);
break;
}
}
}
return 0;
}
3.1 reactor反应堆模式
对每一个构架模式的分析,我们都使用参考文献的分析风格,着重分析意图、上下文、问题、解决方案、结构和实现 6个方面的内容。
##1. 意图
在事件驱动的应用中,将一个或多个客户的服务请求分离(demultiplex)和调度(dispatch)给应用程序。
##2. 上下文
在事件驱动的应用中,同步地、有序地处理同时接收的多个服务请求。
##3. 问题
在分布式系统尤其是服务器这一类事件驱动应用中,虽然这些请求最终会被序列化地处理,但是必须时刻准备着处理多个同时到来的服务请求。在实际应用 中,这些请求总是通过一个事件(如CONNECTOR、READ、WRITE等)来表示的。在有序地处理这些服务请求之前,应用程序必须先分离和调度这些 同时到达的事件。为了有效地解决这个问题,我们需要做到以下4方面:
为了提高系统的可测量性和反应时间,应用程序不能长时间阻塞在某个事件源上而停止对其他事件的处理,这样会严重降低对客户端的响应度。
为了提高吞吐量,任何没有必要的上下文切换、同步和CPU之间的数据移动都要避免。
引进新的服务或改良已有的服务都要对既有的事件分离和调度机制带来尽可能小的影响。
大量的应用程序代码需要隐藏在复杂的多线程和同步机制之后。
##4. 解决方案
在一个或多个事件源上等待事件的到来,例如,一个已经连接的Socket描述符就是一个事件源。将事件的分离和调度整合到处理它的服务中,而将分离和调度机制从应用程序对特定事件的处理中分离开,也就是说分离和调度机制与特定的应用程序无关。
具体来说,每个应用程序提供的每个服务都有一个独立的事件处理器与之对应。由事件处理器处理来自事件源的特定类型的事件。每个事件处理器都事先注册 到Reactor管理器中。Reactor管理器使用同步事件分离器在一个或多个事件源中等待事件的发生。当事件发生后,同步事件分离器通知 Reactor管理器,最后由Reactor管理器调度和该事件相关的事件处理器来完成请求的服务。
##5. 结构
在Reactor模式中,有5个关键的参与者。
描述符(handle):由操作系统提供,用于识别每一个事件,如Socket描述符、文件描述符等。在Linux中,它用一个整数来表示。事件可以来自外部,如来自客户端的连接请求、数据等。事件也可以来自内部,如定时器事件。
同步事件分离器(demultiplexer):是一个函数,用来等待一个或多个事件的发生。调用者会被阻塞,直到分离器分离的描述符集上有事件发生。Linux的select函数是一个经常被使用的分离器。
事件处理器接口(event handler):是由一个或多个模板函数组成的接口。这些模板函数描述了和应用程序相关的对某个事件的操作。
具体的事件处理器:是事件处理器接口的实现。它实现了应用程序提供的某个服务。每个具体的事件处理器总和一个描述符相关。它使用描述符来识别事件、识别应用程序提供的服务。
Reactor 管理器(reactor):定义了一些接口,用于应用程序控制事件调度,以及应用程序注册、删除事件处理器和相关的描述符。它是事件处理器的调度核心。 Reactor管理器使用同步事件分离器来等待事件的发生。一旦事件发生,Reactor管理器先是分离每个事件,然后调度事件处理器,最后调用相关的模 板函数来处理这个事件。
通过上述分析,我们注意到,是Reactor管理器而不是应用程序负责等待事件、分离事件和调度事件。实际上,Reactor管理器并没有被具体的 事件处理器调用,而是管理器调度具体的事件处理器,由事件处理器对发生的事件做出处理。这就是类似Hollywood原则的“反向控制”。应用程序要做的 仅仅是实现一个具体的事件处理器,然后把它注册到Reactor管理器中。接下来的工作由管理器来完成。这些参与者的相互关系如图2-1所示。
现在结合第1章分析的框架五元素来看一下Reactor构架模式的参与者与框架五元素之间的关系:Reactor构架模式的具体实现对应了元素1; 事件处理器接口对应元素2;具体的事件处理器对应元素3;Reactor管理器使用了Hollywood原则,可以认为和元素5对应;元素4的功能相对不 明显,没有明确的对应关系。
如果还是没有理解Reactor构架模式,没有关系,源代码会说明所有问题。此时可再分析一遍Reactor构架模式,然后继续以下内容。
#include <stdlib.h>
#include <stdio.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <time.h>
#define MAX_EVENTS 1024 // 最大事件数
#define BUFLEN 128 // 缓冲区大小
#define SERV_PORT 8080 // 服务器端口号
/*
* 自定义事件结构体
* status: 1 表示在监听事件中,0 表示不在
* last_active: 记录最后一次响应时间,用于超时处理
*/
struct myevent_s {
int fd; // 文件描述符(客户端或监听套接字)
int events; // 事件类型(EPOLLIN、EPOLLOUT)
void *arg; // 指向自身结构体的指针
void (*call_back)(int fd, int events, void *arg); // 回调函数
int status; // 是否在监听中
char buf[BUFLEN]; // 数据缓冲区
int len; // 缓冲区数据长度
long last_active; // 最后活跃时间
};
int g_efd; /* epoll_create 返回的句柄 */
struct myevent_s g_events[MAX_EVENTS+1]; /* +1 最后一个用于监听套接字 */
/**
* @brief 初始化事件结构体
* @param ev 事件结构体指针
* @param fd 文件描述符
* @param call_back 回调函数
* @param arg 回调函数参数
*/
void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg)
{
ev->fd = fd;
ev->call_back = call_back;
ev->events = 0;
ev->arg = arg;
ev->status = 0;
ev->last_active = time(NULL); // 设置最后活跃时间
}
/**
* @brief 添加事件到 epoll
* @param efd epoll 文件描述符
* @param events 事件类型
* @param ev 自定义事件结构体
*/
void eventadd(int efd, int events, struct myevent_s *ev)
{
struct epoll_event epv = {0, {0}};
int op;
epv.data.ptr = ev;
epv.events = ev->events = events;
if (ev->status == 1) {
op = EPOLL_CTL_MOD; // 修改事件
}
else {
op = EPOLL_CTL_ADD; // 添加事件
ev->status = 1;
}
if (epoll_ctl(efd, op, ev->fd, &epv) < 0)
printf("event add failed [fd=%d], events[%d]\n", ev->fd, events);
else
printf("event add OK [fd=%d], op=%d, events[%0X]\n", ev->fd, op, events);
}
/**
* @brief 从 epoll 中删除事件
* @param efd epoll 文件描述符
* @param ev 自定义事件结构体
*/
void eventdel(int efd, struct myevent_s *ev)
{
struct epoll_event epv = {0, {0}};
if (ev->status != 1)
return;
epv.data.ptr = ev;
ev->status = 0;
epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &epv);
}
/**
* @brief 处理新连接
* @param lfd 监听套接字
* @param events 事件类型
* @param arg 回调参数
*/
void acceptconn(int lfd, int events, void *arg)
{
struct sockaddr_in cin;
socklen_t len = sizeof(cin);
int cfd, i;
if ((cfd = accept(lfd, (struct sockaddr *)&cin, &len)) == -1) {
if (errno != EAGAIN && errno != EINTR) {
// 暂时不处理错误
}
printf("%s: accept, %s\n", __func__, strerror(errno));
return;
}
do {
for (i = 0; i < MAX_EVENTS; i++) {
if (g_events[i].status == 0)
break;
}
if (i == MAX_EVENTS) {
printf("%s: max connect limit[%d]\n", __func__, MAX_EVENTS);
break;
}
int flag = 0;
if ((flag = fcntl(cfd, F_SETFL, O_NONBLOCK)) < 0)
{
printf("%s: fcntl nonblocking failed, %s\n", __func__, strerror(errno));
break;
}
eventset(&g_events[i], cfd, recvdata, &g_events[i]);
eventadd(g_efd, EPOLLIN, &g_events[i]);
} while(0);
printf("new connect [%s:%d][time:%ld], pos[%d]\n", inet_ntoa(cin.sin_addr), ntohs(cin.sin_port), g_events[i].last_active, i);
}
/**
* @brief 处理接收数据
* @param fd 文件描述符
* @param events 事件类型
* @param arg 回调参数
*/
void recvdata(int fd, int events, void *arg)
{
struct myevent_s *ev = (struct myevent_s *)arg;
int len;
len = recv(fd, ev->buf, sizeof(ev->buf), 0);
eventdel(g_efd, ev);
if (len > 0) {
ev->len = len;
ev->buf[len] = '\0';
printf("C[%d]:%s\n", fd, ev->buf);
// 转换为发送事件
eventset(ev, fd, senddata, ev);
eventadd(g_efd, EPOLLOUT, ev);
}
else if (len == 0) {
close(ev->fd);
printf("[fd=%d] pos[%d], closed\n", fd, (int)(ev - g_events));
}
else {
close(ev->fd);
printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
}
}
/**
* @brief 处理发送数据
* @param fd 文件描述符
* @param events 事件类型
* @param arg 回调参数
*/
void senddata(int fd, int events, void *arg)
{
struct myevent_s *ev = (struct myevent_s *)arg;
int len;
len = send(fd, ev->buf, ev->len, 0);
eventdel(g_efd, ev);
if (len > 0) {
printf("send[fd=%d], [%d]%s\n", fd, len, ev->buf);
eventset(ev, fd, recvdata, ev);
eventadd(g_efd, EPOLLIN, ev);
}
else {
close(ev->fd);
printf("send[fd=%d] error %s\n", fd, strerror(errno));
}
}
/**
* @brief 初始化监听套接字
* @param efd epoll 文件描述符
* @param port 端口号
*/
void initlistensocket(int efd, short port)
{
int lfd = socket(AF_INET, SOCK_STREAM, 0);
fcntl(lfd, F_SETFL, O_NONBLOCK);
eventset(&g_events[MAX_EVENTS], lfd, acceptconn, &g_events[MAX_EVENTS]);
eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]);
struct sockaddr_in sin;
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = INADDR_ANY;
sin.sin_port = htons(port);
bind(lfd, (struct sockaddr *)&sin, sizeof(sin));
listen(lfd, 20);
}
/**
* @brief 主函数
*/
int main(int argc, char *argv[])
{
unsigned short port = SERV_PORT;
if (argc == 2)
port = atoi(argv[1]);
g_efd = epoll_create(MAX_EVENTS+1);
if (g_efd <= 0)
printf("create efd in %s err %s\n", __func__, strerror(errno));
initlistensocket(g_efd, port);
// 事件循环
struct epoll_event events[MAX_EVENTS+1];
printf("server running:port[%d]\n", port);
int checkpos = 0, i;
while (1) {
// 超时验证
long now = time(NULL);
for (i = 0; i < 100; i++, checkpos++) {
if (checkpos == MAX_EVENTS)
checkpos = 0;
if (g_events[checkpos].status != 1)
continue;
long duration = now - g_events[checkpos].last_active;
if (duration >= 60) {
close(g_events[checkpos].fd);
printf("[fd=%d] timeout\n", g_events[checkpos].fd);
eventdel(g_efd, &g_events[checkpos]);
}
}
// 等待事件发生
int nfd = epoll_wait(g_efd, events, MAX_EVENTS+1, 1000);
if (nfd < 0) {
printf("epoll_wait error, exit\n");
break;
}
for (i = 0; i < nfd; i++) {
struct myevent_s *ev = (struct myevent_s *)events[i].data.ptr;
if ((events[i].events & EPOLLIN) && (ev->events & EPOLLIN)) {
ev->call_back(ev->fd, events[i].events, ev->arg);
}
if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT)) {
ev->call_back(ev->fd, events[i].events, ev->arg);
}
}
}
return 0;
}
转载自:libevent深入浅出 · 看云