【Linux篇】探索进程间通信:如何使用匿名管道构建高效的进程池
从零开始:通过匿名管道实现进程池的基本原理
- 一. 进程间通信
- 1.1 基本概念
- 1.2 通信目的
- 1.3 通信种类
- 1.3.1 同步通信
- 1.3.2 异步通信
- 1.4 如何通信
- 二. 管道
- 2.1 什么是管道
- 2.2 匿名管道
- 2.2.1 pipe()
- 2.2.2 示例代码:使用 pipe() 进行父子进程通信
- 2.2.3 管道容量
- 2.2.4 特点
- 三. 基于匿名管道原理实现进程池
- 3.1 简介
- 3.2 实现进程池
- 3.2.1 Channel类
- 3.2.2 ChannelManager 类
- 3.2.3 taskManager类
- 3.2.4 ProcessPool 类
- 3.2.5 总结
- 四. 最后
进程间通信(IPC)的匿名管道是一种无名的通信方式,通常用于父子进程之间的数据传递。它通过在内核中创建一个缓冲区,允许一个进程将数据写入管道,另一个进程从管道读取数据。管道只能在创建它的进程及其子进程之间使用,因此具有“匿名”特性。匿名管道通过文件描述符实现数据流动,一个进程将数据写入管道,另一个进程从中读取。进程池的实现可以利用匿名管道将任务分配给多个工作进程,通过管道实现父进程与工作进程之间的有效通信,提高系统处理能力和资源利用率。
💬 欢迎讨论:如果你在学习过程中有任何问题或想法,欢迎在评论区留言,我们一起交流学习。你的支持是我继续创作的动力!
👍点赞、收藏与分享:觉得这篇文章对你有帮助吗?别忘了点赞、收藏并分享给更多的小伙伴哦!你们的支持是我不断进步的动力!
🚀分享给更多人:如果你觉得这篇文章对你有帮助,欢迎分享给更多对Linux OS感兴趣的朋友,让我们一起进步!
一. 进程间通信
1.1 基本概念
进程间通信(IPC, Inter-Process Communication)是指不同进程之间为了交换数据或协调工作所使用的一种机制。在多任务操作系统中,进程是独立的执行实体,通常无法直接访问彼此的内存空间。因此,进程间通信提供了一种让进程之间传递信息、同步操作和共享资源的方式。
1.2 通信目的
- 数据交换:不同进程需要交换数据,例如一个进程处理数据后,另一个进程需要读取或使用这些数据。IPC提供了数据传递的途径,如管道、消息队列等。
- 进程同步与控制:在多进程环境中,多个进程可能需要协同工作。通过IPC,进程可以进行同步控制,确保按照预定的顺序执行,避免竞争条件。例如,信号量和条件变量可以用于进程同步。
- 资源共享:多个进程可能需要共享某些资源,如内存、文件或硬件设备。通过共享内存等IPC机制,进程能够安全高效地共享资源,避免冲突。
- 异步通信:某些进程可能需要在后台运行,处理任务后通知主进程或其他进程。IPC允许实现异步通信,主进程不需要等待所有子进程完成,可以继续执行其他任务。
1.3 通信种类
1.3.1 同步通信
在同步通信中,发送方和接收方需要在通信过程中保持同步。发送方在发送消息后会等待接收方的响应或确认才能继续执行。
- 管道(pipe):匿名管道,命名管道
- 消息队列(Message Queue)
- 共享内存(Shared Memory)
1.3.2 异步通信
在异步通信中,发送方和接收方不需要在同一时间进行操作。发送方发送消息后可以继续执行,而接收方可以在适当的时机接收消息。
- 套接字(Socket)
- 事件驱动机制
1.4 如何通信
先让不同的进程看到同一份资源,然后才有通信的条件。
声明:本文先讲述同步通信中管道(匿名管道)的原理,后面的将在后面的文章中一一详解。
二. 管道
2.1 什么是管道
管道(Pipe)是一种进程间通信(IPC)机制,允许一个进程将数据传输到另一个进程。管道提供了一种简单、高效的方式来在同一台机器上的进程之间传递数据,常见于父子进程或兄弟进程之间的通信。管道通过一个缓冲区传递数据,发送进程将数据写入管道,而接收进程从管道中读取数据。
2.2 匿名管道
- 匿名管道通常用于子进程之间的通信,不需要命名,存在于内存中,生命周期与进程的生命周期相关。
- 它通常只能在具有亲缘关系的进程之间使用(如父进程与子进程)。
- 数据流动是单向的,即进程只能写入数据或读取数据。
- 匿名管道的典型应用是Unix/Linux系统中的管道命令(|)连接不同的命令执行。
2.2.1 pipe()
函数原型:
int pipe(int pipefd[2]);
参数:
- pipefd:整数数组,系统调用会将管道的文件描述符数组通过该数组返回,其中pipefd[0]:读端,pipefd[1]:写端
返回值: - 成功:返回0
- 失败:返回-1,同时error被设置
2.2.2 示例代码:使用 pipe() 进行父子进程通信
#include <stdio.h>
#include <unistd.h>
#include <string.h>int main() {int pipefd[2];pid_t pid;char message[] = "Hello from parent process!";char buffer[100];// 创建管道if (pipe(pipefd) == -1) {perror("pipe");return 1;}pid = fork(); // 创建子进程if (pid == -1) {perror("fork");return 1;}if (pid == 0) { // 子进程close(pipefd[1]); // 关闭写端read(pipefd[0], buffer, sizeof(buffer)); // 从管道中读取数据printf("Child received: %s\n", buffer);close(pipefd[0]); // 关闭读端} else { // 父进程close(pipefd[0]); // 关闭读端write(pipefd[1], message, strlen(message) + 1); // 向管道写数据close(pipefd[1]); // 关闭写端}return 0;
}
该示例子进程从父进程写入管道的数据中读取数据。
原理图:
原理:父进程关闭读端,子进程关闭写端。
四种不同的通信情况:
- 写端往管道中写入数据得慢,而读端从管道中读数据读得快,读端就会发生阻塞(进程),等待写端写入数据。
- 写端往管道中写入数据得快,而读端从管道中读数据读得慢,管道中数据满了,写端就要发生阻塞。
- 写端直接关闭,而读端继续从管道中读数据,read会读到返回值为0,表示文件结尾。
- 读端直接关闭,而写端继续从管道中写数据,OS会终止写端进程,发送信号13(SIGPIPE)
2.2.3 管道容量
在 Unix/Linux 系统中,管道的容量(也称为缓冲区大小)是管道可以存储的最大数据量。管道的容量由操作系统内核设定,并且在不同的操作系统和系统配置中可能会有所不同。管道容量的大小对数据的读写性能以及进程间通信的效率有直接影响。
管道容量的大小
- 默认容量:在大多数现代 Unix 和 Linux 系统中,匿名管道的默认容量通常是 4 KB 到 64 KB
之间。这意味着,管道缓冲区可以存储一定量的数据,在缓冲区满时,写入操作会阻塞直到有数据被读取。 - 可配置性:在某些操作系统和文件系统中,管道容量的大小可以被调整。用户可以通过内核参数来修改管道的缓冲区大小。在 Linux
系统中,可以通过 sysctl 或修改 /proc/sys/fs/pipe-max-size 来调整管道的缓冲区大小。
查看管道缓冲区大小语法:
cat /proc/sys/fs/pipe-max-size
2.2.4 特点
- 单向数据流:管道本身只支持单向的数据流,但可以通过双向管道(通过两个管道实现)来模拟双向通信。
- 缓冲区机制:管道实现了数据缓冲区,发送方写入数据,接收方从管道读取数据,管道自动管理数据的存储和传递。
- 阻塞性:当管道缓冲区已满时,写入操作会被阻塞,直到接收方读取数据。反之,当管道为空时,读取操作会被阻塞,直到写入方写入数据。
三. 基于匿名管道原理实现进程池
3.1 简介
进程池是一种多进程并发处理模型,它通过预先创建一定数量的进程来处理任务,从而避免了在每个任务执行时频繁创建和销毁进程的开销。进程池中的进程通常在任务到达时被复用,任务完成后,进程不会被销毁,而是返回进程池等待下一个任务。
进程池的主要目的是提高系统的并发性和响应速度,同时减少频繁创建和销毁进程带来的性能损耗。
3.2 实现进程池
本进程池分为四个模块,分别为 Channel 类,ChannelManager 类,ProcessPool 类,taskManager类。下面将详细介绍各个模块的功能及成员方法。
3.2.1 Channel类
- 功能:
管道类,表示父进程与子进程之间的通信通道。它提供了发送任务、关闭通道、等待子进程等功能。
(Channel 是管道的封装类)该类主要职责:
- 通过管道发送任务(Send() 方法)。
- 管理进程间的通信,确保任务能够顺利地传递到子进程。
- 管理与子进程相关的资源,例如等待子进程的结束(Wait() 方法)。
- 提供进程 ID 和管道文件描述符(Fd() 和 SubId() 方法)。
示例代码:
class Channel
{
public:Channel(int fd, pid_t id) : _wfd(fd), _subid(id){_name = "channel-" + std::to_string(_wfd) + "-" + std::to_string(_subid);}~Channel() {}void Send(int code){int n = write(_wfd, &code, sizeof(code));(void)n;}void Close(){close(_wfd);}void Wait(){pid_t rid = waitpid(_subid, nullptr, 0);(void)rid;}int Fd() { return _wfd; }pid_t SubId() { return _subid; }std::string Name() { return _name; }private:int _wfd;pid_t _subid;std::string _name;int loadnum;
};
Channel 类不仅封装了管道的创建与关闭,还可以通过 Send() 方法向子进程发送任务,Wait() 方法确保父进程能够正确地等待子进程结束。
3.2.2 ChannelManager 类
- 功能:
管理多个 Channel 对象,负责处理所有管道和子进程的管理。它可以选择一个空闲的 Channel,发送任务,关闭所有通道等。
(ChannelManager 负责管理多个 Channel 对象)该类主要职责:
- Insert() 方法:将一个新的 Channel 对象加入管理列表。
- Select() 方法:轮询选择一个空闲的 Channel,实现负载均衡。
- CloseAll() 和 StopSubProcess():关闭所有管道及对应的进程,回收资源。
- WaitSubProcess() 和 CloseAndWait():确保所有子进程都被回收,并进行资源的清理。
示例代码:
class ChannelManager
{
public:ChannelManager() : _next(0){}void Insert(int wfd, pid_t subid){_channels.emplace_back(wfd, subid);// Channel c(wfd,subid);//_channels.push_back(std::move(c));}Channel &Select(){auto &c = _channels[_next];_next++;_next %= _channels.size();return c;}void PrintChannel(){for (auto &channel : _channels){std::cout << channel.Name() << std::endl;}}void CloseAll(){for (auto &channel : _channels){channel.Close();//std::cout << "关闭: " << channel.Name() << std::endl;}}void StopSubProcess(){for (auto &channel : _channels){channel.Close();std::cout << "关闭: " << channel.Name() << std::endl;}}void WaitSubProcess(){for (auto &channel : _channels){channel.Wait();std::cout << "回收: " << channel.Name() << std::endl;}}void CloseAndWait(){for (auto &channel : _channels){channel.Close();std::cout << "关闭: " << channel.Name() << std::endl;channel.Wait();std::cout << "回收: " << channel.Name() << std::endl;}// 方案1:从后往前关闭管道// for (int i = _channels.size() - 1; i >= 0; i--)// {// _channels[i].Close();// std::cout << "关闭: " << _channels[i].Name() << std::endl;// _channels[i].Wait();// std::cout << "回收: " << _channels[i].Name() << std::endl;// }}~ChannelManager() {}private:std::vector<Channel> _channels;int _next;
};
通过 ChannelManager 管理的多个 Channel,可以高效地实现对子进程的管理和任务的分配。
3.2.3 taskManager类
- 功能:
负责注册和执行任务,这里假设它是一个任务管理器,可以注册不同的任务类型,并根据任务码执行对应的任务。
(任务管理器(taskManager)负责管理各种类型的任务,包括任务的注册和执行)该类主要职责:
- 任务注册:通过 Register() 方法,将任务函数(如 PrintLog, DownLoad, UpLoad)与任务码绑定。
- 任务执行:通过 Execute() 方法,根据任务码执行对应的任务。任务的执行通过 tm.Execute(code) 完成。
示例代码:
#pragma once#include<iostream>
#include<vector>
#include<ctime>typedef void (*task_t)();void PrintLog()
{std::cout << "我是一个打印日志的任务" << std::endl;
}
void DownLoad()
{std::cout << "我是一个下载的任务" << std::endl;
}
void UpLoad()
{std::cout<< "我是一个上传的任务" << std::endl;
}class taskManager
{
public:taskManager(){srand(time(nullptr));}void Register(task_t t){_tasks.push_back(t);}int Code(){return rand()%_tasks.size();}void Execute(int code){if(code >= 0 && code < _tasks.size()){_tasks[code]();}}~taskManager(){}
private:std::vector<task_t> _tasks;
};
3.2.4 ProcessPool 类
- 功能:
进程池类,负责管理子进程的创建、任务分配、进程回收等。
(ProcessPool 负责整个进程池的初始化、任务分配、任务执行和进程回收。)该类主要职责:
- 进程池初始化:通过 Start() 方法创建多个子进程,并为每个子进程创建一个管道。每个子进程从管道中接收任务,并执行对应的操作。
- 任务分配:通过 Run() 方法从任务管理器中选择一个任务码,并选择一个空闲的 Channel,将任务分配给相应的子进程。
- 进程停止与回收:通过 Stop() 方法关闭所有管道并回收所有子进程,确保资源得到释放。
示例代码:
class ProcessPool
{
public:ProcessPool(int num) : _process_num(num){_tm.Register(PrintLog);_tm.Register(DownLoad);_tm.Register(UpLoad);}void Work(int rfd){while (true){// std::cout << "我是子进程, 我的rfd是:" << rfd << std::endl;// sleep(5);int code = 0;ssize_t n = read(rfd, &code, sizeof(code));if (n > 0){if (n != sizeof(code)){continue;}std::cout << "子进程[" << getpid() << "]收到一个任务码:" << code << std::endl;_tm.Execute(code);}else if (n == 0){// 当所有写端关闭后,读端会读到0,表示EOF。但如果有多个写端,情况会复杂一些。// 当某个写端关闭时,其他写端可能还在打开状态,此时读端可能仍然阻塞,直到所有数据被读取或所有写端关闭。// 管道EOF触发机制// 当且仅当所有写端都关闭时,读端才会收到EOF(read返回0)// 若存在多个写端,即使部分写端关闭,只要还有一个写端保持打开状态,读端就会持续阻塞等待数据std::cout << "子进程退出" << std::endl;break;}else{std::cout << "读取错误" << std::endl;break;}}}bool Start(){for (int i = 0; i < _process_num; i++){// 1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)return false;// 2.创建子进程pid_t subid = fork();if (subid < 0)return false;else if (subid == 0){// child// 让子进程关闭自己继承下来的,它哥哥进程w段关闭即可!// 3.关闭不需要的文件描述符_cm.CloseAll();//关闭所有的w端close(pipefd[1]);Work(pipefd[0]);close(pipefd[0]);exit(0);}else{// parent// 3.关闭不需要的文件描述符close(pipefd[0]);_cm.Insert(pipefd[1], subid);}}return true;}void debug(){_cm.PrintChannel();}void Run(){// 1.选择一个任务int taskcode = _tm.Code();// 1.选择一个信道[子进程],负载均衡的选择一个信道,完成任务auto &c = _cm.Select();std::cout << "选择一个子进程:" << c.Name() << std::endl;// 2.发送任务c.Send(taskcode);std::cout << "发送了一个任务码:" << taskcode << std::endl;}void Stop(){// //关闭父进程所有wfd的写端// _cm.StopSubProcess();// //回收所有子进程// _cm.WaitSubProcess();_cm.CloseAndWait();}~ProcessPool(){}private:ChannelManager _cm;int _process_num;taskManager _tm;
};#endif
该类封装了创建进程池,均衡的选择任务,同时回收和终止进程相关的资源等。
3.2.5 总结
这个进程池设计的过程体现了如何通过管道和子进程的协作实现任务的并发处理和负载均衡。通过合理的封装和资源管理,确保了系统能够高效地处理大量并发任务,同时能够避免过多的资源浪费。
四. 最后
本文介绍了如何通过匿名管道实现进程池的基本原理。进程池通过预先创建一组子进程来避免频繁创建和销毁进程的开销,提高系统并发性和资源利用率。文章首先讲解了进程间通信的概念,并重点介绍了匿名管道的工作原理及其应用。接着,详细解析了如何利用管道实现进程池,包括 Channel、ChannelManager、taskManager 和 ProcessPool 四个模块。每个模块的设计确保了任务的有效分配、进程的管理和资源的回收。最终,通过合理的进程复用和负载均衡,提升了系统的性能和效率。
路虽远,行则将至;事虽难,做则必成
亲爱的读者们,下一篇文章再会!!!