Linux笔记---进程间通信:匿名管道
1. 管道通信
1.1 管道的概念与分类
管道(Pipe) 是进程间通信(IPC)的一种基础机制,主要用于在具有亲缘关系的进程(如父子进程、兄弟进程)之间传递数据,其核心特性是通过内核缓冲区实现单向或半双工的数据传输。
- 匿名管道:通常用于具有亲缘关系的进程之间通信,如父子进程或兄弟进程。它是半双工的,数据只能在一个方向上流动,有固定的读端和写端,且只存在于内存中,不属于任何文件系统,但可以使用普通的read、write等函数进行读写。
- 命名管道(FIFO):可以在无关的进程之间进行通信,有路径名与之相关联,以一种特殊设备文件形式存在于文件系统中。创建后,无关进程可以通过该文件进行通信,通信方式类似于使用文件传输数据,遵循先进先出原则。
管道是轻量级且高效的进程间通信方式,适用于简单的数据流场景,但其单向性和容量限制使其不适合复杂需求。命名管道扩展了应用范围,但需注意文件系统的依赖。
1.2 管道的原理
在操作系统还不支持进程间通信的时候,人们尝试使用操作系统已有的功能来实现进程间通信。
要实现进程间通信,就需要两个进程访问共享的资源,什么资源是各个进程都可以共享访问的呢?
答案显而易见:文件。
父进程打开一个文件并创建子进程,子进程就会继承父进程的文件描述符表,这样父子进程就可以访问同一个文件,通过向文件当中进行读写就可以实现进程间通信。
当然,对文件的访问是需要同步与互斥机制的,这一点由操作系统来实现,我们并不关心。
两个进程之间的通信一般都是些临时的小体量的消息,无需将其正真存入到文件当中(而且存入文件当中会造成较大的访存消耗)。实际上,我们只需要在struct file维护的文件缓冲区当中进行信息交换即可。
于是,在操作系统在这个思路的基础之上,实现了管道机制。
所谓管道,就是一种特殊的管道文件,其本质上是内核管理的一段环形内存缓冲区,通过文件描述符提供单向或半双工的数据流传输。
2. 匿名管道的使用
2.1 pipe函数
我们说,管道文件是一种特殊的文件,那么其打开的方式(或者说创建的方式)自然也要与一般的文件进行区别。
在Linux当中,我们使用pipe函数来创建一个匿名管道:
#include <unistd.h>
int pipe(int pipefd[2]);
返回值:成功返回 0,失败返回 -1 并设置 errno。
参数:pipefd 是长度为 2 的整型数组,用于返回两个文件描述符:
- pipefd[0]:管道的读端,只能用于读取数据。
- pipefd[1]:管道的写端,只能用于写入数据。
注意,管道只能进行单向数据传输,这意味着共享管道的父子进程一个只能读,一个只能写。
在实践当中,我们应当关闭当前进程未使用的端口:
#include <unistd.h>int main()
{int pipefd[2] = {0};int n = pipe(pipefd);if(n == -1){perror("pipe:");return 1;}int id = fork();if(id == 0){// 子进程写close(fd[0]);// ...}else{// 父进程读close(fd[1]);// ...}
}
2.2 管道读写规则
当没有数据可读时:
- O_NONBLOCK disable:read调用阻塞,即进程暂停执行,一直等到有数据来到为止。
- O_NONBLOCK enable:read调用返回 -1,errno值为EAGAIN。
当管道满的时:
- O_NONBLOCK disable: write调用阻塞,直到有进程读走数据
- O_NONBLOCK enable:调用返回-1,errno值为EAGAIN
文件描述符关闭:
- 如果所有管道写端对应的文件描述符被关闭:read不再阻塞而是返回0。
- 如果所有管道读端对应的文件描述符被关闭:write操作会产生信号SIGPIPE,进而可能导致write进程退出。
原子性规则:
- 小数据写入(≤ PIPE_BUF,通常 4KB): 内核保证写入的原子性,即数据要么完整写入,要么完全不写入。
- 大数据写入(PIPE_BUF): 不保证原子性,数据可能被其他进程的写入操作穿插,且可能部分写入。
注:O_NONBLOCK为pipe2的选项(比pipe多一个选项参数)。
3. 进程池
学习完匿名管道的基本使用,我们可以动手尝试编写一个基于匿名管道的进程池。
平时,各个子进程就阻塞在read处等待,当父进程通过管道对其下达任务时就会将其唤醒。
.hpp后缀的文件其实就是.cpp和.h文件的结合体,类似于java的包。
3.1 Channel.hpp
首先,我们定义一个Channel类用于管理父子进程之间的通信管道(信道):
#include <vector>
#include <sys/types.h>
#include <sys/wait.h>
#include <cassert>
#include <iostream>class Channel
{
public:Channel(int wfd, int pid):_wfd(wfd), _process(pid){}~Channel(){}void CloseAndWait(){close(_wfd);std::cout << _process << "的信道关闭成功" << std::endl;waitpid(_process, nullptr, 0);std::cout << "进程" << _process << "已被成功回收" << std::endl;}// 通过信道将任务提交给子进程执行void ExecuteTask(int code){std::cout << "将任务" << code << "派遣给" << _process << std::endl;write(_wfd, &code, sizeof(code));}int GetPid(){return _process;}
private:int _wfd;pid_t _process;
};
由于进程池中进程的数量可能很多,信道也相对变多,我们应当定义一个类来管理这些信道:
class ChannelManager
{
public:void Insert(Channel&& channel){_Channels.push_back(channel);}int Size(){return _Channels.size();}// 选择进程并将任务分派出去void GiveTask(int code){int channel = SelectChannel();std::cout << "选择进程: " << _Channels[channel].GetPid() << std::endl;_Channels[channel].ExecuteTask(code);}void CloseAndWait(){for(auto& channel : _Channels){channel.CloseAndWait();}}private:// 选择进程int SelectChannel(){// 轮询分派任务static int next = 0;assert(_Channels.size());int tmp = next;next = (next + 1) % _Channels.size();return tmp;}std::vector<Channel> _Channels;
};
3.2 Task.hpp
任务实际上就是一个个的函数,同样地,由于任务可能有很多,我们也使用一个类来进行管理:
#include <functional>
#include <vector>
#include <iostream>
#include <unistd.h>
#include <cassert>
using Task = std::function<void()>;class TaskManager
{
public:// 注册,即将任务插入数组并管理起来void RegisterTask(Task&& task){_Tasks.push_back(task);}int Size(){return _Tasks.size();}// 根据任务码(数组下标)返回相应的任务对象Task& GetTask(int code){assert(code >= 0 && code < _Tasks.size());return _Tasks[code];}
private:std::vector<Task> _Tasks;
};
3.3 ProcessPool.hpp
完成上面的准备工作,我们就可以开始着手构建我们ProcessPool类了,TODO:
- 对ChannelManager和TaskManager进行封装。
- 提供给用户插入任务,发布任务等的接口。
- 开启(Start):创建子进程并使其开始等待任务到达、创建信道并插入ChannelManager。
- 终止(Stop):销毁信道并回收子进程。
#include "Channel.hpp"
#include "Task.hpp"class ProcessPool
{
public:ProcessPool(int size = 5):_size(size){std::cout << "ProcessPool已创建" << std::endl;}~ProcessPool(){// 假如用户忘记终止并回收进程if(_activate){_CM.CloseAndWait();}}// 子进程转入此函数并循环等待任务到达后执行void Work(int rfd){int code = 0;std::cout << "子进程" << getpid() << "开始工作" << std::endl;while(true){ssize_t n = read(rfd, &code, sizeof(code));if(n == 0) {std::cout << "进程" << getpid() << "退出" << std::endl;break;}else if(n < 0){std::cout << "进程" << getpid() << "获取任务时发生错误" << std::endl;break;}else _TM.GetTask(code)();}}void Start(){for(int i = 0; i < _size; i++){int fds[2] = {0};int n = pipe(fds);if(n == -1){perror("pipe:");}int id = fork();if(id < 0){perror("fork:");exit(1);}else if(id == 0){// 子进程close(fds[1]);Work(fds[0]);close(fds[0]);exit(0);}// 父进程close(fds[0]);_CM.Insert(Channel(fds[1], id));}_activate = true;}// 用户发布任务的接口,交由ChannelManager处理void LaunchTask(int code){assert(code >= 0 && code <= _TM.Size());std::cout << "发布任务: " << code << std::endl;_CM.GiveTask(code);}void Stop(){_CM.CloseAndWait();_activate = false;}// 封装TaskManager的接口,使用户自定义任务void RegisterTask(Task&& task){_TM.RegisterTask(std::forward<Task>(task));}
private:int _size;bool _activate = false;ChannelManager _CM;TaskManager _TM;
};
3.4 Main.cpp
#include "ProcessPool.hpp"
#include <ctime>int main()
{std::cout << "程序启动" << std::endl;srand((unsigned int)time(nullptr));ProcessPool processpool;// 生成n个测试任务int n = 10;for(int i = 0; i < 10; i++){processpool.RegisterTask(([i](){std::cout << "进程" << getpid() << "正在执行任务" << i << std::endl;}));}processpool.Start();// 随机发布10个任务while(n--){int code = rand() % 10;processpool.LaunchTask(code);sleep(2);}processpool.Stop();return 0;
}
3.5 匿名管道的死锁问题
上面的代码实际上存在一个严重的问题,那就是在10个任务执行结束之后进行信道的销毁时:
在第一个信道提示关闭之后,并没有显式子进程退出的消息,而是直接卡住不动了。查看源代码会发现问题就是出在这一行,说明在信道被关闭之后,子进程并没有退出。
这是由于后创建的子进程继承了父进程对之前创建的子进程的写端口:
所以,在父进程的视角上关闭信道之后,管道1的写端依然没有完全关闭,子进程就会继续在read处阻塞等待。子进程因等待父进程下达指令或关闭信道而阻塞;父进程因等待子进程退出而阻塞。 此时就形成了死锁,导致程序卡住。
解决方案
- 方案1:先关闭所有的信道再等待子进程退出。
- 方案2:逆向关闭信道并退出。
- 方案3:关闭子进程从父进程那里继承下来的写入端。
4. 最终代码
代码最终采用的是第三种方案,因为该方案的安全性更高,当然前两种方案被部分注释了,读者可以自己尝试修改死锁的解决方案。
4.1 Channel.hpp
#include <vector>
#include <string>
#include <sys/types.h>
#include <sys/wait.h>
#include <cassert>
#include <iostream>class Channel
{
public:Channel(int wfd, int pid):_wfd(wfd), _process(pid){}~Channel(){}void SubProcessCloseBrother(){close(_wfd);}void Close(){std::cout << "关闭" << _process << "的信道" << std::endl;close(_wfd);std::cout << _process << "的信道关闭成功" << std::endl;}void Wait(){waitpid(_process, nullptr, 0);std::cout << "进程" << _process << "已被成功回收" << std::endl;}// 确保调用该函数的信道为当前最后启动的 或者 事先关闭所有子进程的写入端,否则会造成死锁void CloseAndWait(){close(_wfd);std::cout << _process << "的信道关闭成功" << std::endl;waitpid(_process, nullptr, 0);std::cout << "进程" << _process << "已被成功回收" << std::endl;}void ExecuteTask(int code){std::cout << "将任务" << code << "派遣给" << _process << std::endl;write(_wfd, &code, sizeof(code));}int GetPid(){return _process;}
private:int _wfd;pid_t _process;
};class ChannelManager
{
public:void Insert(Channel&& channel){_Channels.push_back(channel);}int Size(){return _Channels.size();}void GiveTask(int code){int channel = SelectChannel();std::cout << "选择进程: " << _Channels[channel].GetPid() << std::endl;_Channels[channel].ExecuteTask(code);}// 方案1:先关闭后回收void CloseChannels(){for(auto& channel : _Channels){channel.Close();}}void WaitProcesses(){for(auto& channel : _Channels){std::cout << "回收进程" << channel.GetPid() << std::endl;channel.Wait();}}// 方案2:反向关闭回收// void CloseAndWait()// {// for(int i = _Channels.size() - 1; i >= 0; i--)// {// _Channels[i].CloseAndWait();// }// }// 方案3:关闭所有子进程的写入端,可以任意方式关闭回收void CloseAndWait(){for(auto& channel : _Channels){channel.CloseAndWait();}}void SubProcessCloseBrothers(){for(auto& channel : _Channels){channel.SubProcessCloseBrother();}}private:int SelectChannel(){// 轮询分派任务static int next = 0;assert(_Channels.size());int tmp = next;next = (next + 1) % _Channels.size();return tmp;}std::vector<Channel> _Channels;
};
4.2 Task.hpp
#include <functional>
#include <vector>
#include <iostream>
#include <unistd.h>
#include <cassert>
using Task = std::function<void()>;class TaskManager
{
public:void RegisterTask(Task&& task){_Tasks.push_back(task);}int Size(){return _Tasks.size();}Task& GetTask(int code){assert(code >= 0 && code < _Tasks.size());return _Tasks[code];}
private:std::vector<Task> _Tasks;
};
4.3 ProcessPool.hpp
#include "Channel.hpp"
#include "Task.hpp"class ProcessPool
{
public:ProcessPool(int size = 5):_size(size){std::cout << "ProcessPool已创建" << std::endl;}~ProcessPool(){if(_activate){_CM.CloseChannels();_CM.WaitProcesses();}}void Work(int rfd){int code = 0;std::cout << "子进程" << getpid() << "开始工作" << std::endl;while(true){ssize_t n = read(rfd, &code, sizeof(code));if(n == 0) {std::cout << "进程" << getpid() << "退出" << std::endl;break;}else if(n < 0){std::cout << "进程" << getpid() << "获取任务时发生错误" << std::endl;break;}else _TM.GetTask(code)();}}void Start(){for(int i = 0; i < _size; i++){int fds[2] = {0};int n = pipe(fds);if(n == -1){perror("pipe:");}int id = fork();if(id < 0){perror("fork:");exit(1);}else if(id == 0){// 子进程close(fds[1]);// 将子进程的写入端全部关闭_CM.SubProcessCloseBrothers();Work(fds[0]);close(fds[0]);exit(0);}// 父进程close(fds[0]);_CM.Insert(Channel(fds[1], id));}_activate = true;}void LaunchTask(int code){assert(code >= 0 && code <= _TM.Size());std::cout << "发布任务: " << code << std::endl;_CM.GiveTask(code);}void Stop(){// _CM.CloseChannels();// _CM.WaitProcesses();_CM.CloseAndWait();_activate = false;}void RegisterTask(Task&& task){_TM.RegisterTask(std::forward<Task>(task));}
private:int _size;bool _activate = false;ChannelManager _CM;TaskManager _TM;
};
4.4 Makefile
ProcessPool:Main.cppg++ -o $@ $^ -std=c++11.PHONY:clean
clean:rm ProcessPool