从零学会epoll的使用和原理
从零学会epoll的使用和原理
第一步:理解 select
/ poll
的缺陷
一、select 和 poll 是什么?
它们是 Linux 提供的 I/O 多路复用机制,可以让我们同时监听多个文件描述符(fd),比如 socket,来等待“是否有数据可以读/写”。
二、select 工作流程(伪代码)
fd_set readfds;
FD_ZERO(&readfds);
FD_SET(sockfd, &readfds);select(maxfd + 1, &readfds, NULL, NULL, NULL);if (FD_ISSET(sockfd, &readfds)) {// 有数据可读
}
三、poll 工作流程(伪代码)
struct pollfd fds[1024];
poll(fds, nfds, timeout);
四、select 和 poll 的主要缺陷
缺陷点 | 描述 |
---|---|
1. fd 数量限制 | select 的 fd 上限是 1024(因为用的是 bitmap) |
2. 每次都要传入整个 fd 集合 | 调用时都需要将所有监听的 fd 从用户态拷贝到内核态 |
3. 事件通知不高效 | select/poll 会遍历所有 fd,查找哪一个就绪,O(n) 时间复杂度 |
4. 无状态 | 每次调用都要重新设置监听 fd 的集合,没法复用 |
5. 边缘触发支持差 | 不支持高效的边缘触发,只能是水平触发(LT) |
📌 总结一句话:
select/poll 太“啰嗦”和“笨重”,当连接数上千上万时,它们效率非常低,而 epoll 专为这种高并发场景优化。
select
示例代码(监听 stdin 和 socket)
#include <iostream>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>int main() {int listen_fd = socket(AF_INET, SOCK_STREAM, 0);sockaddr_in addr{};addr.sin_family = AF_INET;addr.sin_port = htons(8888);addr.sin_addr.s_addr = inet_addr("127.0.0.1");bind(listen_fd, (sockaddr*)&addr, sizeof(addr));listen(listen_fd, 5);std::cout << "Listening on 127.0.0.1:8888...\n";fd_set readfds;int max_fd = std::max(listen_fd, STDIN_FILENO); // 最大 fdwhile (true) {FD_ZERO(&readfds);FD_SET(STDIN_FILENO, &readfds); // 标准输入FD_SET(listen_fd, &readfds); // 监听 socketint ready = select(max_fd + 1, &readfds, nullptr, nullptr, nullptr);if (ready == -1) {perror("select");break;}if (FD_ISSET(STDIN_FILENO, &readfds)) {char buf[1024] = {};read(STDIN_FILENO, buf, sizeof(buf));std::cout << "[STDIN] 输入了: " << buf;}if (FD_ISSET(listen_fd, &readfds)) {sockaddr_in client{};socklen_t len = sizeof(client);int conn_fd = accept(listen_fd, (sockaddr*)&client, &len);std::cout << "[SOCKET] 新连接来自: " << inet_ntoa(client.sin_addr) << ":" << ntohs(client.sin_port) << "\n";close(conn_fd);}}close(listen_fd);return 0;
}
🔍 如何运行这个 demo:
- 编译:
g++ select_demo.cpp -o select_demo
- 运行:
./select_demo
- 打开另一个终端连接:
telnet 127.0.0.1 8888
- 或者在当前终端直接输入一些文字,它会 echo 出来。
📌 你能观察到什么?
- 每次
select()
都要重新设置fd_set
。 - 没有事件时程序就会阻塞在
select()
。 - 随着连接越来越多,你会发现效率会下降。
第二阶段:epoll
的基本使用
一、epoll 的工作流程(核心三步)
epoll 的使用核心是三步:
步骤 | 函数名 | 作用 |
---|---|---|
① 创建 epoll 对象 | epoll_create1() | 创建 epoll 文件描述符 |
② 注册事件 | epoll_ctl() | 将监听的 fd 注册到 epoll 实例 |
③ 等待事件 | epoll_wait() | 阻塞等待就绪事件,返回活跃的 fd |
二、epoll 的基本代码结构(伪代码)
int epfd = epoll_create1(0); // 创建 epoll 实例epoll_event ev;
ev.events = EPOLLIN; // 关注读事件
ev.data.fd = listen_fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev); // 添加 fd 到 epollepoll_event events[1024]; // 返回的事件数组
int n = epoll_wait(epfd, events, 1024, -1); // 阻塞直到有事件发生for (int i = 0; i < n; ++i) {if (events[i].data.fd == listen_fd) {accept(); // 新连接} else {read(); // 读数据}
}
三、epoll 示例:监听 stdin 和 socket(对比 select)
我们来把刚才 select 的例子,用 epoll 重写一遍。
#include <iostream>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>int main() {int listen_fd = socket(AF_INET, SOCK_STREAM, 0);sockaddr_in addr{};addr.sin_family = AF_INET;addr.sin_port = htons(8888);addr.sin_addr.s_addr = inet_addr("127.0.0.1");bind(listen_fd, (sockaddr*)&addr, sizeof(addr));listen(listen_fd, 5);std::cout << "Listening on 127.0.0.1:8888...\n";int epfd = epoll_create1(0); // 创建 epoll 对象epoll_event ev{};ev.events = EPOLLIN;ev.data.fd = listen_fd;epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev); // 添加监听 fdev.data.fd = STDIN_FILENO;epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev); // 添加 stdinepoll_event events[1024];while (true) {int n = epoll_wait(epfd, events, 1024, -1);for (int i = 0; i < n; ++i) {int fd = events[i].data.fd;if (fd == listen_fd) {sockaddr_in client{};socklen_t len = sizeof(client);int conn_fd = accept(listen_fd, (sockaddr*)&client, &len);std::cout << "[SOCKET] 新连接来自: " << inet_ntoa(client.sin_addr) << ":" << ntohs(client.sin_port) << "\n";close(conn_fd);} else if (fd == STDIN_FILENO) {char buf[1024] = {};read(STDIN_FILENO, buf, sizeof(buf));std::cout << "[STDIN] 输入了: " << buf;}}}close(listen_fd);close(epfd);return 0;
}
四、为什么 epoll 更优秀?
比较项 | select/poll | epoll |
---|---|---|
fd 数量上限 | select: 1024,poll 无限制但效率低 | 理论上无上限,效率高 |
是否复用 fd 集合 | 否,每次都要传 | 是,注册一次后直接等待 |
是否遍历所有 fd | 是(每次都查) | 否(事件就绪才通知) |
通知机制 | 轮询 | 事件驱动 |
第三阶段:理解 ET(边缘触发)与非阻塞 I/O 的配合
这部分是 epoll 的精髓,也是它比 select 更高效的关键所在。我们先来搞清楚 LT vs ET,然后说非阻塞,再用代码加深理解。
一、什么是 LT 和 ET?
epoll 支持两种事件触发模式:
模式 | 名称 | 特点 | 默认值 |
---|---|---|---|
LT | Level Trigger(水平触发) | 只要条件满足(比如有数据),每次 epoll_wait 都会返回这个事件 | ✅ 默认 |
ET | Edge Trigger(边缘触发) | 只有状态从无到有变化时,才触发事件通知,只通知一次 | ❌ 不是默认,需要手动设置 |
二、简单对比:LT vs ET
假设 socket 缓冲区有数据:
- LT 模式:
epoll_wait
每次都会告诉你“有数据!”,直到你读完。
- ET 模式:
- 只在数据第一次到达时通知你一次,“之后不管了”。
- 如果你没一次性读完所有数据,就会“丢事件”,程序卡住。
三、为什么 ET 更快?
因为:
- 内核只在状态变化时通知,不会反复通知同一个事件;
- 节省系统调用时间(高并发下效果特别明显);
- 但是使用更复杂——必须配合非阻塞 I/O!
四、非阻塞 I/O 是什么?
默认情况下,socket 是阻塞的:
char buf[1024];
read(fd, buf, sizeof(buf)); // 如果没数据,会卡住等
非阻塞模式下,read 立刻返回:
fcntl(fd, F_SETFL, O_NONBLOCK); // 设置非阻塞
- 有数据就读
- 没数据就返回 -1,errno=EAGAIN / EWOULDBLOCK
五、ET + 非阻塞 I/O 的典型套路:
while (true) {ssize_t n = read(fd, buf, sizeof(buf));if (n == -1) {if (errno == EAGAIN || errno == EWOULDBLOCK) break; // 没数据可读else perror("read error");} else if (n == 0) {// 对方关闭连接break;} else {// 正常读取数据}
}
六、设置 ET 模式和非阻塞 socket 的完整代码片段
// 设置 fd 为非阻塞
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);// 注册 ET 模式
epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = fd;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
七、ET 模式小结
问题 | 答案 |
---|---|
必须配合非阻塞? | ✅ 是的,不然容易卡死 |
要用循环读数据? | ✅ 是的,直到返回 EAGAIN |
什么时候触发? | 数据从没有 ➜ 有,才触发 |
好处? | 高性能,减少系统调用 |
风险? | 没处理好容易漏事件、阻塞 |
巩固下 ET 和非阻塞的相关细节
ET + 非阻塞 I/O 是 epoll 中最容易出 bug 的地方,
一、非阻塞 I/O 设置方式
我们经常对 socket 设置非阻塞,代码如下:
#include <fcntl.h>void set_nonblocking(int fd) {int flags = fcntl(fd, F_GETFL, 0);fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
📌 重点解释:
O_NONBLOCK
是一个 flag,它告诉内核这个 fd 不允许阻塞。fcntl
拿到当前 flags,再 OR 一个非阻塞标志。
二、ET 模式触发行为再强化
我们看一个常见现象来理解 ET 和阻塞 read 的冲突:
❌ 错误代码(阻塞读配合 ET)
// 假设这是 ET 模式下的回调处理
char buf[1024];
read(fd, buf, sizeof(buf)); // 没读完就等死了!
如果缓冲区里数据不够 1024 字节,这个 read 会阻塞——
而 ET 只通知一次! 你程序就卡死在 read 上,永远等不到下一次触发。
三、正确姿势:循环读取直到 EAGAIN
char buf[1024];
while (true) {ssize_t n = read(fd, buf, sizeof(buf));if (n == -1) {if (errno == EAGAIN || errno == EWOULDBLOCK) {// 没有数据了,结束循环break;} else {perror("read error");break;}} else if (n == 0) {// 对方关闭连接printf("Client closed connection\n");close(fd);break;} else {// 正常读取数据printf("Received: %.*s", (int)n, buf);}
}
📌 关键点总结:
行为 | 必须这么做的原因 |
---|---|
循环读取 | ET 只触发一次,要把所有数据读干净 |
判断 errno | 确认是“真的没数据”而不是其他错误 |
处理 n == 0 | 表示连接断开,必须 close(fd) |
四、添加客户端连接时也要设置非阻塞!
int connfd = accept(listen_fd, ...);
set_nonblocking(connfd); // 否则 read 可能阻塞
五、epoll 注册事件时需要设置 EPOLLET
epoll_event ev;
ev.events = EPOLLIN | EPOLLET;
ev.data.fd = connfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
epoll + ET + 非阻塞 I/O 的最小完整 demo
场景功能说明:
- 使用 epoll 的 ET 模式监听所有 socket。
- 所有 fd 设置为非阻塞。
- 客户端发送数据,服务端完整读完并打印。
- 使用
read()
+EAGAIN
机制读取所有数据。
代码:epoll_et_server.cpp
#include <iostream>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
#include <vector>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>void set_nonblocking(int fd) {int flags = fcntl(fd, F_GETFL);fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}int main() {int listen_fd = socket(AF_INET, SOCK_STREAM, 0);sockaddr_in addr{};addr.sin_family = AF_INET;addr.sin_port = htons(8888);addr.sin_addr.s_addr = inet_addr("127.0.0.1");bind(listen_fd, (sockaddr*)&addr, sizeof(addr));listen(listen_fd, SOMAXCONN);set_nonblocking(listen_fd);int epfd = epoll_create1(0);epoll_event ev{};ev.events = EPOLLIN | EPOLLET; // 注意:ET 模式ev.data.fd = listen_fd;epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev);std::vector<epoll_event> events(1024);std::cout << "Server listening on 127.0.0.1:8888\n";while (true) {int n = epoll_wait(epfd, events.data(), events.size(), -1);for (int i = 0; i < n; ++i) {int fd = events[i].data.fd;if (fd == listen_fd) {// 处理所有新连接while (true) {sockaddr_in client{};socklen_t len = sizeof(client);int conn_fd = accept(listen_fd, (sockaddr*)&client, &len);if (conn_fd == -1) break;std::cout << "[New Connection] "<< inet_ntoa(client.sin_addr) << ":" << ntohs(client.sin_port) << "\n";set_nonblocking(conn_fd);epoll_event cev{};cev.events = EPOLLIN | EPOLLET;cev.data.fd = conn_fd;epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &cev);}} else {// 处理已有连接的读事件while (true) {char buf[1024];ssize_t count = read(fd, buf, sizeof(buf));if (count == -1) {if (errno == EAGAIN || errno == EWOULDBLOCK) {// 没数据可读了break;} else {perror("read");close(fd);break;}} else if (count == 0) {// 对方关闭连接std::cout << "[Disconnect] fd = " << fd << "\n";close(fd);break;} else {std::cout << "[Read] fd = " << fd << ", content: "<< std::string(buf, count);}}}}}close(listen_fd);close(epfd);return 0;
}
编译 & 运行方式
g++ epoll_et_server.cpp -o epoll_et_server
./epoll_et_server
打开另一个终端:
telnet 127.0.0.1 8888
# 或用 nc 测试:
# echo "hello" | nc 127.0.0.1 8888
提示:
- 你可以反复
telnet
连多个客户端; - 输入数据,服务端会完整打印;
- 断开连接也能被正常检测并关闭;
🔥 第四阶段:写一个真正的高并发服务器,支持长连接、广播、线程池等!
🧩 项目目标功能
- ✅ 基于
epoll
的 I/O 多路复用 - ✅ 使用 ET 模式 + 非阻塞 I/O
- ✅ 支持多个客户端的 长连接
- ✅ 实现 客户端广播 功能(群发消息)
- ✅ 使用 线程池 解耦 I/O 与业务处理
🚧 模块化开发路线图(按顺序完成)
🔹 第一步:网络通信模块(epoll
+ 非阻塞长连接管理)
- 接受新连接
- 使用
epoll
管理所有 socket - 使用
ET
模式读取客户端数据
🔹 第二步:线程池模块
- 预先创建线程池(工作线程 + 任务队列)
- 支持任务提交与消费
🔹 第三步:广播功能模块
- 所有客户端连接都被管理起来
- 某个客户端发消息 ➜ 广播给所有连接
🔹 第四步:服务端结构模块化 & 重构
- 将各部分封装为类(Server / ThreadPool / Connection 等)
- 保持逻辑清晰、模块解耦
我们现在从第一步开始:
📦 第一步:网络通信模块(只实现接收客户端连接 & 打印数据)
目标:
- 使用
epoll
+ET
+非阻塞
监听并读取每个连接的数据。 - 支持多个客户端长连接、持续交互。
// server.cpp
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <cstring>
#include <vector>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>void set_nonblocking(int fd) {int flags = fcntl(fd, F_GETFL);fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}int main() {int listen_fd = socket(AF_INET, SOCK_STREAM, 0);sockaddr_in server_addr{};server_addr.sin_family = AF_INET;server_addr.sin_port = htons(8888);server_addr.sin_addr.s_addr = INADDR_ANY;bind(listen_fd, (sockaddr*)&server_addr, sizeof(server_addr));listen(listen_fd, SOMAXCONN);set_nonblocking(listen_fd);int epfd = epoll_create1(0);epoll_event ev{};ev.events = EPOLLIN | EPOLLET;ev.data.fd = listen_fd;epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev);std::vector<epoll_event> events(1024);std::cout << "[INFO] Server started on port 8888" << std::endl;while (true) {int n = epoll_wait(epfd, events.data(), events.size(), -1);for (int i = 0; i < n; ++i) {int fd = events[i].data.fd;if (fd == listen_fd) {// 接收所有连接while (true) {sockaddr_in client{};socklen_t len = sizeof(client);int conn_fd = accept(listen_fd, (sockaddr*)&client, &len);if (conn_fd == -1) break;set_nonblocking(conn_fd);epoll_event conn_ev{};conn_ev.events = EPOLLIN | EPOLLET;conn_ev.data.fd = conn_fd;epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &conn_ev);std::cout << "[CONNECT] New client: "<< inet_ntoa(client.sin_addr) << ":"<< ntohs(client.sin_port) << std::endl;}} else {// 读取数据while (true) {char buf[1024];ssize_t count = read(fd, buf, sizeof(buf));if (count == -1) {if (errno == EAGAIN || errno == EWOULDBLOCK) break;perror("read error");close(fd);break;} else if (count == 0) {std::cout << "[DISCONNECT] fd: " << fd << std::endl;close(fd);break;} else {std::cout << "[DATA] fd " << fd << ": "<< std::string(buf, count);}}}}}close(epfd);close(listen_fd);return 0;
}
🔧 编译:
g++ server.cpp -o server
🚀 运行:
./server
然后另开几个终端:
telnet 127.0.0.1 8888
# 或用 nc 测试
# echo "hello" | nc 127.0.0.1 8888
🔹 第二步:线程池模块
// thread_pool.h
#pragma once
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>class ThreadPool {
public:using Task = std::function<void()>;explicit ThreadPool(size_t thread_count = 4): stop_flag(false) {for (size_t i = 0; i < thread_count; ++i) {workers.emplace_back([this]() {while (true) {Task task;{std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [this]() { return stop_flag || !tasks.empty(); });if (stop_flag && tasks.empty()) return;task = std::move(tasks.front());tasks.pop();}task();}});}}~ThreadPool() {{std::lock_guard<std::mutex> lock(mtx);stop_flag = true;}cv.notify_all();for (auto& t : workers) {if (t.joinable()) t.join();}}void enqueue(Task task) {{std::lock_guard<std::mutex> lock(mtx);tasks.push(std::move(task));}cv.notify_one();}private:std::vector<std::thread> workers;std::queue<Task> tasks;std::mutex mtx;std::condition_variable cv;std::atomic<bool> stop_flag;
};
测试一下线程池
好的!我们来写一个简单的测试程序来验证这个 ThreadPool
是否正常工作。
示例:test_thread_pool.cpp
#include "thread_pool.h"
#include <iostream>
#include <chrono>void test_task(int id) {std::cout << "[TASK START] Task " << id << " in thread "<< std::this_thread::get_id() << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(500));std::cout << "[TASK END] Task " << id << " done\n";
}int main() {ThreadPool pool(4); // 启动4个线程for (int i = 0; i < 10; ++i) {pool.enqueue([i]() { test_task(i); });}std::this_thread::sleep_for(std::chrono::seconds(3));std::cout << "[MAIN] Done\n";return 0;
}
🔧 编译:
g++ test_thread_pool.cpp -o test_thread_pool -std=c++11 -pthread
(注意加上 -pthread
)
🚀 运行:
./test_thread_pool
🔹 第三步:广播功能模块
// server.cpp (含线程池和广播结构)
#include <iostream>
#include <unistd.h>
#include <fcntl.h>
#include <cstring>
#include <vector>
#include <unordered_set>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include "thread_pool.h"void set_nonblocking(int fd) {int flags = fcntl(fd, F_GETFL);fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}int main() {int listen_fd = socket(AF_INET, SOCK_STREAM, 0);sockaddr_in server_addr{};server_addr.sin_family = AF_INET;server_addr.sin_port = htons(8888);server_addr.sin_addr.s_addr = INADDR_ANY;bind(listen_fd, (sockaddr*)&server_addr, sizeof(server_addr));listen(listen_fd, SOMAXCONN);set_nonblocking(listen_fd);int epfd = epoll_create1(0);epoll_event ev{};ev.events = EPOLLIN | EPOLLET;ev.data.fd = listen_fd;epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd, &ev);std::vector<epoll_event> events(1024);std::unordered_set<int> clients;ThreadPool pool(4);std::cout << "[INFO] Server started on port 8888" << std::endl;while (true) {int n = epoll_wait(epfd, events.data(), events.size(), -1);for (int i = 0; i < n; ++i) {int fd = events[i].data.fd;if (fd == listen_fd) {while (true) {sockaddr_in client{};socklen_t len = sizeof(client);int conn_fd = accept(listen_fd, (sockaddr*)&client, &len);if (conn_fd == -1) break;set_nonblocking(conn_fd);epoll_event conn_ev{};conn_ev.events = EPOLLIN | EPOLLET;conn_ev.data.fd = conn_fd;epoll_ctl(epfd, EPOLL_CTL_ADD, conn_fd, &conn_ev);clients.insert(conn_fd);std::cout << "[CONNECT] "<< inet_ntoa(client.sin_addr) << ":"<< ntohs(client.sin_port)<< " (fd: " << conn_fd << ")" << std::endl;}} else {while (true) {char buf[1024];ssize_t count = read(fd, buf, sizeof(buf));if (count == -1) {if (errno == EAGAIN || errno == EWOULDBLOCK) break;perror("read error");close(fd);clients.erase(fd);break;} else if (count == 0) {std::cout << "[DISCONNECT] fd: " << fd << std::endl;close(fd);clients.erase(fd);break;} else {std::string msg(buf, count);// 异步处理(广播)pool.enqueue([fd, msg, &clients]() {std::string broadcast = "[Client " + std::to_string(fd) + "] " + msg;for (int client_fd : clients) {if (client_fd != fd) {send(client_fd, broadcast.c_str(), broadcast.size(), 0);}}std::cout << "[BROADCAST from " << fd << "]: " << msg;});}}}}}close(epfd);close(listen_fd);return 0;
}
完成第三步 ✅:将线程池整合进服务器,并构建了广播功能的基础框架。
🧩 你现在拥有的功能:
- 支持多个客户端 长连接
- 客户端发送消息 ➜ 异步投递到线程池中处理
- 消息被广播给所有其他客户端
- 采用
epoll + ET + 非阻塞 I/O
,高效并发
🚀 快速测试建议:
- 编译:
g++ server.cpp -o server -std=c++11 -pthread
- 运行服务端:
./server
- 用多个终端模拟客户端:
telnet 127.0.0.1 8888
# 或者 nc:
nc 127.0.0.1 8888
客户端 A 发消息 ➜ B、C 会收到。
模块封装重构(Server / Connection / ThreadPool 分离)
Server
// server.h
#pragma once
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <memory>
#include "thread_pool.h"
#include "connection.h"class Server {
public:Server(int port = 8888);~Server();void run();private:void set_nonblocking(int fd);void handle_new_connection();void handle_client_event(int fd);int listen_fd_;int epoll_fd_;ThreadPool pool_;std::unordered_map<int, std::shared_ptr<Connection>> connections_;std::unordered_set<int> client_fds_;
};
// server.cpp
#include "server.h"
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <unistd.h>
#include <iostream>
#include <cstring>Server::Server(int port) : pool_(4) {listen_fd_ = socket(AF_INET, SOCK_STREAM, 0);sockaddr_in server_addr{};server_addr.sin_family = AF_INET;server_addr.sin_port = htons(port);server_addr.sin_addr.s_addr = INADDR_ANY;bind(listen_fd_, (sockaddr*)&server_addr, sizeof(server_addr));listen(listen_fd_, SOMAXCONN);set_nonblocking(listen_fd_);epoll_fd_ = epoll_create1(0);epoll_event ev{};ev.events = EPOLLIN | EPOLLET;ev.data.fd = listen_fd_;epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, listen_fd_, &ev);std::cout << "[INFO] Server started on port " << port << std::endl;
}Server::~Server() {close(epoll_fd_);close(listen_fd_);
}void Server::set_nonblocking(int fd) {int flags = fcntl(fd, F_GETFL);fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}void Server::run() {std::vector<epoll_event> events(1024);while (true) {int n = epoll_wait(epoll_fd_, events.data(), events.size(), -1);for (int i = 0; i < n; ++i) {int fd = events[i].data.fd;if (fd == listen_fd_) {handle_new_connection();} else {handle_client_event(fd);}}}
}void Server::handle_new_connection() {while (true) {sockaddr_in client{};socklen_t len = sizeof(client);int conn_fd = accept(listen_fd_, (sockaddr*)&client, &len);if (conn_fd == -1) break;set_nonblocking(conn_fd);epoll_event ev{};ev.events = EPOLLIN | EPOLLET;ev.data.fd = conn_fd;epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, conn_fd, &ev);auto conn = std::make_shared<Connection>(conn_fd);connections_[conn_fd] = conn;client_fds_.insert(conn_fd);std::cout << "[CONNECT] fd: " << conn_fd << std::endl;}
}void Server::handle_client_event(int fd) {auto conn = connections_[fd];bool closed = false;std::string msg = conn->read_data(closed);if (closed) {std::cout << "[DISCONNECT] fd: " << fd << std::endl;close(fd);epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);connections_.erase(fd);client_fds_.erase(fd);} else {pool_.enqueue([fd, msg, this]() {std::string broadcast = "[Client " + std::to_string(fd) + "] " + msg;for (int client_fd : client_fds_) {if (client_fd != fd && connections_.count(client_fd)) {connections_[client_fd]->send_data(broadcast);}}std::cout << "[BROADCAST from " << fd << "]: " << msg;});}
}
我们现在完成了服务端模块的完整封装 ✅:
✅ 模块结构现状:
文件名 | 说明 |
---|---|
connection.h | 封装客户端连接的读写逻辑 |
thread_pool.h | 通用线程池,用于异步任务调度 |
server.h/cpp | Server 类封装 epoll + 连接管理 + 线程池 |
📦 使用方式(主程序 main.cpp
):
#include "server.h"int main() {Server server(8888);server.run();return 0;
}
编译方式:
g++ main.cpp server.cpp -o server -std=c++11 -pthread
C++ 项目结构模板
🗂 项目结构建议
epoll_server/
├── CMakeLists.txt
├── src/
│ ├── main.cpp
│ ├── server.cpp
│ ├── server.h
│ ├── connection.h
│ └── thread_pool.h
├── build/ ← 编译输出目录(CMake 会自动创建)
└── README.md
📄 CMakeLists.txt
cmake_minimum_required(VERSION 3.10)
project(epoll_server)set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -pthread")# 包含头文件目录
include_directories(${PROJECT_SOURCE_DIR}/src)# 源文件
file(GLOB SOURCES src/*.cpp)# 可执行文件
add_executable(server ${SOURCES})
📄 示例 src/main.cpp
#include "server.h"int main() {Server server(8888);server.run();return 0;
}
🧪 构建和运行步骤:
cd epoll_server
mkdir -p build && cd build
cmake ..
make
./server