Linux第十讲:进程间通信IPC
Linux第十讲:进程间通信IPC
- 1.进程间通信介绍
- 1.1什么是进程间通信
- 1.2为什么要进程间通信
- 1.3怎么进行进程间通信
- 2.管道
- 2.1理解管道
- 2.2匿名管道的实现代码
- 2.3管道的五种特性
- 2.3.1匿名管道,只能用来进行具有血缘关系的进程进行通信(通常是父子)
- 2.3.2管道文件,自带同步机制
- 2.3.3管道是面向字节流的
- 2.3.4管道是单向通信的
- 2.3.5(管道)文件的声明周期,是随进程的
- 2.4管道的四种通信情况
- 2.4.1写快,读慢
- 2.4.2写慢,读快
- 2.4.3写关,继续读
- 2.4.4读关闭,写继续
- 2.5知识补充
- 3.进程池的实现
- 3.1什么是进程池
- 3.2进程池框架搭建
- 3.3在进程池中创建多个进程,并分配管道
- 3.4任务分配和执行
- 3.5任务执行策略 && 任务执行代码实现
- 3.6进程池的回收
- 3.7结果展示
- 3.8问题处理
- 3.8.1解决方法1
- 3.8.2解决方法2
- 4.命名管道
- 4.1什么是命名管道 && 命名管道的原理
- 4.2命名管道的创建 && 进程间通信使用
- 4.3进程间通信的封装实现
- 4.4命名管道与匿名管道的区别
- 5.system V共享内存
- 5.1什么是共享内存
- 5.2共享内存接口的使用
- 5.3代码实现两个进程看到同一份资源
- 5.4进程间通信代码封装实现
- 5.5共享内存实现的进程间通信操作
- 5.5.1基于共享内存实现的进程间通信的优缺点分析
- 5.5.2处理共享内存的缺点
- 5.6 shmdt
- 5.7一个细节问题
- 5.8描述共享内存的数据结构
- 6.system V信号量
- 6.1前提知识补充
- 6.2什么是信号量
- 6.3信号量的接口和系统调用
- 7.附录 -- 进程池完整代码
1.进程间通信介绍
进程都是独立的,都有自己独立的内容和数据,有着自己独立的结构体,如果是父子进程的话,修改数据还会发生写时拷贝,那么究竟是怎么进行通信的呢?
1.1什么是进程间通信
进程间通信(IPC)是指运行在一台计算机或不同计算机上的多个进程之间进行数据交换和通信的技术
1.2为什么要进程间通信
1.3怎么进行进程间通信
我们现在只需要知道:进程间通信的本质,是让不同的进程,先看到同一份资源,然后才有通信的条件,那么这份资源是谁提供的呢?是进程提供的吗?不是!因为进程还没有这么大的权限,这份资源是由操作系统提供的,而操作系统要提供资源,就需要进行系统调用,所以说就必须要有特定的通信接口!我们学习进程间通信,不仅仅要学习原理,还要学习接口的使用:
2.管道
2.1理解管道
管道是很早以前实现的一种进程间通信的方式它的本质是基于文件的通信:
我们先了解一下管道的简单实现思想:
然后我们再看一下真正的管道的实现原理:
2.2匿名管道的实现代码
理解管道之后,我们就要掌握管道的创建方法了:
我们先看一个创建管道的系统调用:
创建管道的具体操作:
#include <iostream>
#include <unistd.h>
#include <sys/types.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/wait.h>void Child_Write(int wfd)
{char buffer[1024];int cnt = 0;while(true){sleep(1);snprintf(buffer, sizeof(buffer), "I am child, pid: %d, cnt: %d", getpid(), cnt++);//这里我们使用C语言提供的函数write(wfd, buffer, strlen(buffer));}
}void Read_Parent(int rfd)
{char buffer[1024];while(true){buffer[0] = 0;sleep(1);ssize_t n = read(rfd, buffer, sizeof(buffer)-1);if(n > 0){buffer[n] = 0;std::cout << "Child say: " << buffer << std::endl;}}
}int main()
{//1.创建管道int fds[2] = {0};int n = pipe(fds);if(n < 0){std::cout << "pipe error" << std::endl;return 1;}//2.创建子进程pid_t id = fork();if(id == 0){//假设我们让子进程写,父进程读close(fds[0]);//子进程写入,需要关闭读端Child_Write(fds[1]);close(fds[1]);//无论是父进程还是子进程,最后都需要进行关闭所有文件的操作exit(0);}close(fds[1]);Read_Parent(fds[0]);waitpid(id, nullptr, 0);close(fds[0]);return 0;
}
结果:
这样我们就可以验证得出,进程运行时拿取到的数据,因为子进程创建了一个cnt的临时变量
上面讲的是匿名管道的原理以及通信方式,而匿名管道经常用于父子进程之间、兄弟进程之间的通信,所以我们还要掌握其它的通信方式:
2.3管道的五种特性
我们先理解管道,然后再学习其它通信方式:
总结:(这里有的知识理解起来比较困难,以后会讲到,比如字节流)
1.匿名管道,只能用来进行具有血缘关系的进程进行通信(通常是父子)
2.管道文件,自带同步机制
3.管道是面向字节流的
4.管道是单向通信的
5.(管道)文件的生命周期,是随进程的
2.3.1匿名管道,只能用来进行具有血缘关系的进程进行通信(通常是父子)
这个在上面讲过了,因为只有存在血缘关系的进程之间才会对files_struct进行浅拷贝
2.3.2管道文件,自带同步机制
我们先看一个代码:
void Child_Write(int wfd)
{char buffer[1024];int cnt = 0;while(true){sleep(1);snprintf(buffer, sizeof(buffer), "I am child, pid: %d, cnt: %d", getpid(), cnt++);write(wfd, buffer, strlen(buffer));}
}void Read_Parent(int rfd)
{char buffer[1024];while(true){buffer[0] = 0;ssize_t n = read(rfd, buffer, sizeof(buffer)-1);if(n > 0){buffer[n] = 0;std::cout << "Child say: " << buffer << std::endl;}}
}
当子进程进行管道写入,每次写入都要sleep上1s,父进程不断进行读取,会发生什么呢?:
我们先来说一下同步的概念:同步的本质就是按照顺序、有规律地进行操作。比如买火车票,必须排队依次买票,不能插队。
父进程一直需要从文件中读取数据,但是子进程还没有写入,这时,父进程就要进行阻塞状态,当子进程写入之后,父进程进行读取,然后再次进入阻塞状态,所以说,文件自带同步机制
2.3.3管道是面向字节流的
我们再来看一个场景:
void Child_Write(int wfd)
{char buffer[1024];int cnt = 0;while(true){//sleep(1);snprintf(buffer, sizeof(buffer), "I am child, pid: %d, cnt: %d", getpid(), cnt++);write(wfd, buffer, strlen(buffer));}
}void Read_Parent(int rfd)
{char buffer[1024];while(true){sleep(5);buffer[0] = 0;ssize_t n = read(rfd, buffer, sizeof(buffer)-1);if(n > 0){buffer[n] = 0;std::cout << "Child say: " << buffer << std::endl;}}
}
当子进程一直写,而父进程每5秒读一次,会发生什么呢?:
可以看出,每次读取一块,先输出一个概念:管道是有一个特定的大小的,当写入的数据达到一定大小,就不能再写入了,就需要父进程读取数据,然后再进行写入。
然而,对于文件流,我们之后再说,我们只需要知道有这个东西就行
2.3.4管道是单向通信的
半双工:任何一个时刻,一个发,一个收(就像是上课,老师讲,自己听)
全双工:任何一个时刻,可以同时发收(就像吵架,你不仅要输出,同时还要听)
而管道属于半双工的一种特殊情况,当收发确定时,就不可再更改了
2.3.5(管道)文件的声明周期,是随进程的
管道其实就是一个文件,子进程和父进程同时指向管道,当子进程/父进程结束了,管道就要被回收
2.4管道的四种通信情况
总结:
1.写快,读慢
2.写慢,读快
3.写关,继续读
4.读关,继续写
2.4.1写快,读慢
这个上面已经举过例子了,也就是当一直写,但是每个5秒进行读时。
管道文件的大小固定,当写入固定大小数据之后,就不再进行写入了,此时写要进入阻塞状态,等待读端
2.4.2写慢,读快
这个上面也讲过了,也就是一直读,但是每个1s才写。
此时读端进入阻塞状态,等待写端写入
2.4.3写关,继续读
void Child_Write(int wfd)
{char buffer[1024];int cnt = 0;while(true){//sleep(1);snprintf(buffer, sizeof(buffer), "I am child, pid: %d, cnt: %d", getpid(), cnt++);write(wfd, buffer, strlen(buffer));close(wfd);exit(1);}
}void Read_Parent(int rfd)
{char buffer[1024];while(true){sleep(1);buffer[0] = 0;ssize_t n = read(rfd, buffer, sizeof(buffer)-1);if(n > 0){buffer[n] = 0;std::cout << "Child say: " << buffer << std::endl;}else if(n == 0){std::cout << "n: " << n << std::endl;std::cout << "Child退出,我也退出" << std::endl;break;}else{break;}}
}
结果:
此时,read会读取数据,读到返回值为0,表示文件结尾,但是此时子进程不会退出,而是不再进行写入操作,但是还会执行其它的指令
2.4.4读关闭,写继续
void Child_Write(int wfd)
{char buffer[1024];int cnt = 0;while(true){sleep(1);snprintf(buffer, sizeof(buffer), "I am child, pid: %d, cnt: %d", getpid(), cnt++);write(wfd, buffer, strlen(buffer));//close(wfd);exit(1);//printf("hello");}
}void Read_Parent(int rfd)
{char buffer[1024];while(true){//sleep(1);buffer[0] = 0;ssize_t n = read(rfd, buffer, sizeof(buffer)-1);close(rfd);if(n > 0){buffer[n] = 0;std::cout << "Child say: " << buffer << std::endl;}else if(n == 0){std::cout << "n: " << n << std::endl;std::cout << "Child退出,我也退出" << std::endl;break;}else{break;}}printf("test\n");
}
当提前关闭读时,此时写入管道就没有任何意义了,因为写入了也没有什么用,而操作系统不会允许这样的情况发生,所以操作系统就会向子进程发送13号(SIGPIPE)信号,从而终止子进程,我们可以进行验证:
结果:
退出码是13!
2.5知识补充
1.管道的容量我们可以通过每次写入一个字节,cnt计数++的方法来确定,在我的ubunto系统下,是64kb
2.管道的写入是原子性的,其实也就是指如果要执行写入操作的话,就必须将整段话写完,才能进行读取。比如说,当我们写入hello world,只有当hello world写完之后,才能进行读取,这个以后会讲到
3.进程池的实现
进程池实现的目的在于加深对于管道的认识和使用:
3.1什么是进程池
3.2进程池框架搭建
// 先描述 -- 管道
class Channel
{
public:Channel() {};~Channel() {};private:int _wfd; // 读端fdpid_t _subid; // 进程pidstd::string _name; // 用来方便管道标识
};// 再组织 -- 管道管理
class ChannelManager
{
public:ChannelManager() {};~ChannelManager() {};private:std::vector<Channel> _channels; // 用来管理所有的管道
};// 进程池
class ProcessPoll
{
public:ProcessPoll() {};~ProcessPoll() {};private:ChannelManager _cm; // 对管道进行管理int _process_num; // 创建多少个进程
};
3.3在进程池中创建多个进程,并分配管道
进程池的框架搭好之后,需要启动进程池,从而创建出需要的进程和管道:
int main()
{//创建进程池,假如进程池中需要5个进程ProcessPoll pp(5);//启动进程池,执行进程创建工作pp.start();return 0;
}
// 先描述
class Channel
{
public:Channel(int wfd, pid_t subid):_wfd(wfd), _subid(subid){_name = "channel-" + std::to_string(_wfd) + "-" + std::to_string(_subid);}~Channel() {};private:int _wfd; // 读端fdpid_t _subid; // 进程pidstd::string _name; // 用来方便管道标识
};// 再组织
class ChannelManager
{
public:ChannelManager() {};void Insert(int wfd, pid_t subid){_channels.push_back({wfd, subid});}~ChannelManager() {};private:std::vector<Channel> _channels; // 用来管理所有的管道
};// 进程池
class ProcessPoll
{
public:ProcessPoll(int num):_process_num(num){};//子进程工作bool Work(int rfd) {}//在进程池中创建进程,并将管道进行管理bool start(){for(int i = 0; i<_process_num; i++)//创建定量的进程{//1.创建管道int pidfd[2] = {0};int n = pipe(pidfd);if(n < 0) return false;//2.创建子进程pid_t subid = fork();if(subid < 0){printf("创建子进程失败\n");return false;}else if(subid == 0){//子进程 -- 读端 -- pidfd[0]close(pidfd[1]);Work(pidfd[0]);close(pidfd[0]);//work执行完毕,证明进程结束,关闭文件exit(1);}else{//父进程 -- 写端 -- pidfd[1]close(pidfd[0]);_cm.Insert(pidfd[1], subid);//将创建的进程和管道进行管理//此时还不确定需要执行什么任务,所以只能让子进程处于阻塞状态//通过其它函数确定好执行任务之后,父进程才进行写入}}}~ProcessPoll(){};private:ChannelManager _cm; // 对管道进行管理int _process_num; // 创建多少个进程
};
在看上面的代码时,先看ProcessPoll(进程池)结构体,然后再看其它的就行
3.4任务分配和执行
int main()
{//创建进程池,假如进程池中需要5个进程ProcessPoll pp(5);//启动进程池,执行进程创建工作pp.start();//有了进程池,就能够分配任务并让进程池中的进程执行了int cnt = 10;while(cnt--){pp.Run();//执行10次命令sleep(1);}return 0;
}
我们假设任务是随机分配的,我们将任务清单和任务管理再写一个文件:
#pragma once
#include <iostream>
#include <vector>
#include <ctime>typedef void (*task_t)();
/Task/
void PrintLog()
{std::cout << "我是一个打印日志的任务" << std::endl;
}void Download()
{std::cout << "我是一个下载的任务" << std::endl;
}void Upload()
{std::cout << "我是一个上传的任务" << std::endl;
}
/Task/class TaskManager
{
public:TaskManager(){srand(time(nullptr));}//将任务插入到任务管理表中void Register(task_t task){_tasks.push_back(task);}//任务码(每一个任务码对应一个任务,拿到一个任务码,就执行对应的任务)int Code(){return rand() % _tasks.size();}//任务执行void Execute(int code)//拿到任务码执行任务{if(code>=0 && code<_tasks.size()){_tasks[code]();}}~TaskManager() {};private:std::vector<task_t> _tasks;
};
所以说,我们的进程池中将需要执行的任务进行插入,然后再拿到任务码执行任务就可以了:
// 进程池
class ProcessPoll
{
public:ProcessPoll(int num):_process_num(num){//将任务进行插入_tm.Register(PrintLog);_tm.Register(Download);_tm.Register(Upload);}//执行命令void Run(){//假如执行的任务是随机分配的int taskcode = _tm.Code();//任务执行_tm.Execute(taskcode);}~ProcessPoll(){};private:ChannelManager _cm; // 对管道进行管理int _process_num; // 创建多少个进程TaskManager _tm;
};
但是这里会有问题:如果任务都让一个进程执行的话,那么会导致累的累死,闲的闲死的情况,所以我们要通过一定的策略进行任务的执行
3.5任务执行策略 && 任务执行代码实现
下面我们来实现代码:
// 先描述
class Channel
{
public://任务码的发送void Send(int code){//拿到需要执行的任务之后,我们需要将任务码先发送出去,子进程拿到任务码之后,就可以进行任务执行了int n = write(_wfd, &code, sizeof(code));(void)n;//防止警告}private:int _wfd; // 读端fdpid_t _subid; // 进程pidstd::string _name; // 用来方便管道标识
};// 再组织
class ChannelManager
{
public:// 选择一个需要执行的进程Channel &Select(){auto &c = _channels[_next];_next++;_next %= _channels.size();return c;}private:std::vector<Channel> _channels; // 用来管理所有的管道int _next; // 下一个需要执行的进程
};// 进程池
class ProcessPoll
{
public:// 子进程工作void Work(int rfd) {//创建好子进程之后,子进程就进入该函数,进入阻塞状态//一旦父进程通过函数调用写入了一个任务码,就可以进行任务执行while(true){int code = 0;ssize_t n = read(rfd, &code, sizeof(code));if(n > 0)//读取成功{if(n != sizeof(code)) continue;//读取不符std::cout << "子进程[" << getpid() << "]收到一个任务码: " << code << std::endl;_tm.Execute(code);}else if(n == 0)//父进程关闭写端{std::cout << "子进程退出" << std::endl;break;}else{printf("read失败\n");break;}}}// 执行命令void Run(){// 假如执行的任务是随机分配的int taskcode = _tm.Code();// 任务执行 -- 采用轮询的方式,所以我们要先知道该哪个进程执行了// 1.选择一个能够执行任务的进程auto &c = _cm.Select();std::cout << "选择了一个子进程: " << c.Name() << std::endl;// 2. 发送任务码c.Send(taskcode);std::cout << "发送了一个任务码: " << taskcode << std::endl;}// 在进程池中创建进程,并将管道进行管理bool start(){for (int i = 0; i < _process_num; i++) // 创建定量的进程{// 1.创建管道int pidfd[2] = {0};int n = pipe(pidfd);if (n < 0)return false;// 2.创建子进程pid_t subid = fork();if (subid < 0){printf("创建子进程失败\n");return false;}else if (subid == 0){close(pidfd[1]);//创建子进程之后,子进程直接进入工作状态Work(pidfd[0]);close(pidfd[0]);exit(1);}else{close(pidfd[0]);_cm.Insert(pidfd[1], subid);}}return true;}private:ChannelManager _cm; // 对管道进行管理int _process_num; // 创建多少个进程TaskManager _tm;
};
上面的代码并不是完整的代码,因为完整的代码容易造成混乱
3.6进程池的回收
进程池工作完毕之后,要进行资源的回收:
#pragma once#include <iostream>
#include <vector>
#include <unistd.h>
#include <sys/types.h>
#include <cstdlib>
#include <sys/wait.h>
#include "Task.hpp"// 先描述
class Channel
{
public:void Close(){close(_wfd);}void Wait(){pid_t rid = waitpid(_subid, nullptr, 0);(void)rid;}private:int _wfd; // 读端fdpid_t _subid; // 进程pidstd::string _name; // 用来方便管道标识
};// 再组织
class ChannelManager
{
public:ChannelManager():_next(0) {}//管道的回收 -- 关闭wfdvoid StopSubProcess(){for (auto &channel : _channels){channel.Close();std::cout << "关闭: " << channel.Name() << std::endl;}}//进程回收 -- waitpidvoid WaitSubProcess(){for (auto &channel : _channels){channel.Wait();std::cout << "回收: " << channel.Name() << std::endl;}}private:std::vector<Channel> _channels; // 用来管理所有的管道int _next; // 下一个需要执行的进程
};// 进程池
class ProcessPoll
{
public:ProcessPoll(int num) : _process_num(num){_tm.Register(PrintLog);_tm.Register(Download);_tm.Register(Upload);}//进程的回收void Stop(){//管道的回收,只需要将父进程的读端关闭即可_cm.StopSubProcess();//子进程的回收,需要waitpid_cm.WaitSubProcess();}private:ChannelManager _cm; // 对管道进行管理int _process_num; // 创建多少个进程TaskManager _tm;
};
3.7结果展示
确实很好的执行了我们想要的效果!
3.8问题处理
但是这里会出现一个问题,我们来看:
当回收进程时,我们直接使用一个函数调用进行回收,会发生什么?会发生进程池没有正确地被回收,卡着的情况,我们讲一下原理:
3.8.1解决方法1
我们很容易能够想到的解决方法是:从后向前进行进程的关闭不就行了?
void CloseAndWait()
{//解决方法1:倒着回收for(int i = _channels.size()-1; i>=0; i--){_channels[i].Close();_channels[i].Wait();}
}
3.8.2解决方法2
难道不可以真的只让父进程有w端的指向吗?:
void CloseAndWait()
{//解决方法2:只让父进程有w端的指向for (auto &channel : _channels){channel.Close();channel.Wait();}
}void CloseAll()
{for(auto& channel : _channels){channel.Close();}
}else if (subid == 0)
{// 子进程 -- 读端 -- pidfd[0]_cm.CloseAll();//在子进程创建出来之后就将子进程多余的指向进行关闭close(pipefd[1]);Work(pipefd[0]);close(pipefd[0]);exit(1);
}
但是我们可能会有疑问:子进程执行关闭工作的时候,父进程也在将创建的子进程进行插入到管道表中呀,如果将子进程自己插入到表中了,然后子进程恰好把自己关了,那不就错了吗?
上面的疑问是对于写时拷贝不清楚,当子进程创建之后,就继承有父进程的一张管道管理表,而这张表中是没有自己进程的数据的,因为自己刚创建出来,当父进程需要向表中进行写入工作时,发生写时拷贝,所以子进程永远拿不到有自己进程号的那张表,只会拿到之前的进程表!
4.命名管道
4.1什么是命名管道 && 命名管道的原理
之前我们讲的匿名管道会有一个局限性,也就是只能满足有血缘关系的进程之间的通信需要,不能满足两个完全不同的进程之间的通信需求,所以我们还需要学习命名管道:
4.2命名管道的创建 && 进程间通信使用
我们可以使用mkfifo命令来实现命名管道的创建操作:
#include <sys/types.h>
#include <sys/stat.h>int mkfifo(const char *pathname, mode_t mode);
使用,我们先掌握一下命令的使用和删除管道操作(当然可以使用rm命令):
可以看出,命名管道是p开头的文件,而该函数第二个参数其实就是设置权限就行,返回值:成功返回0,否则返回-1:
int main()
{umask(0);int n = mkfifo(FIFO_FILE, 0666);if(n != 0){std::cout << "mkfifo error" << std::endl;return 1;}return 0;
}
那么两个完全不同的进程之间的通信的实现是如何完成的?:
// sercer.cc文件 -- 用于创建出管道,并向管道中进行读取操作int main()
{// 创建管道umask(0);int n = mkfifo(FIFO_FILE, 0666);if (n == -1){std::cout << "mkfifo filed" << std::endl;exit(0);}std::cout << "mkfifo success" << std::endl;// 进行管道的读取操作int fd = open(FIFO_FILE, O_RDONLY);if (fd > 0){while (true){// 先打开管道文件char buffer[1024];ssize_t number = read(fd, buffer, sizeof(buffer) - 1);if (number > 0){buffer[number] = 0;std::cout << buffer << std::endl;}else{// TODO}}}else{std::cout << "read open filed" << std::endl;}close(fd);// 删除管道n = unlink(FIFO_FILE);if (n == -1){std::cout << "unlink filed" << std::endl;exit(0);}std::cout << "unlink success" << std::endl;return 0;
}////client.cc文件 -- 进行管道的写入操作int main()
{int fd = open(FIFO_FILE, O_WRONLY);if(fd > 0){std::string message;int cnt = 1;pid_t id = getpid();while(true){std::getline(std::cin, message);message += (",message number: " + std::to_string(cnt) + ", [" + std::to_string(id) + "]");ssize_t number = write(fd, message.c_str(), sizeof(message));}}else{std::cout << "write open filed" << std::endl;}close(fd);return 0;
}
当我们运行管道的读端server.cc文件,结果为:
也就是说此时并没有进行文件的创建操作
我们直接输出结论:当write方法没有执行open的时候,read方法就会在open内部进行阻塞,知道有人把管道文件打开了,open才会返回
此时还有一个问题,当写端关闭的时候,读端并没有关闭,原因如下:
// 进行管道的读取操作
int fd = open(FIFO_FILE, O_RDONLY);
if (fd > 0)
{while (true){// 先打开管道文件char buffer[1024];ssize_t number = read(fd, buffer, sizeof(buffer) - 1);if (number > 0){buffer[number] = 0;std::cout << buffer << std::endl;}else if(number == 0){// TODOstd::cout << "write close, me too" << std::endl;break;}else{//处理其它的情况std::cerr << "read error" << std::endl;break;}}
}
4.3进程间通信的封装实现
下面我们要将对于管道的创建、管道的读取等操作进行一系列的封装,来达到隐藏内部细节,在外部可以直接使用的目的:
//comm.hpp文件 -- 对命名管道的封装操作// 命名管道类实现 -- 包含管道的创建和析构操作
class Namedfifo
{
public:Namedfifo(const std::string &path, const std::string &name): _path(path), _name(name){// 构造函数中,需要创建一个管道umask(0);_fifoname = _path + "/" + _name;int n = mkfifo(_fifoname.c_str(), 0666);if (n == -1){std::cout << "mkfifo filed" << std::endl;exit(0);}std::cout << "mkfifo success" << std::endl;}~Namedfifo(){// 删除管道int n = unlink(_fifoname.c_str());if (n == -1){std::cout << "unlink filed" << std::endl;exit(0);}std::cout << "unlink success" << std::endl;}private:std::string _path;std::string _name;std::string _fifoname;
};// 文件操作类实现 -- 包含对管道的一系列操作
class Fileoper
{
public:Fileoper(const std::string &path, const std::string &name): _path(path), _name(name){_fifoname = _path + "/" + _name;}// 管道读操作void OpenForRead(){_fd = open(_fifoname.c_str(), O_RDONLY);if (_fd > 0){while (true){// 先打开管道文件char buffer[1024];ssize_t number = read(_fd, buffer, sizeof(buffer) - 1);if (number > 0){buffer[number] = 0;std::cout << buffer << std::endl;}else if (number == 0){std::cout << "write close, me too" << std::endl;break;}else{// 处理其它的情况std::cerr << "read error" << std::endl;break;}}}else{std::cout << "read open filed" << std::endl;}close(_fd);}// 管道写操作void OpenForWrite(){_fd = open(_fifoname.c_str(), O_WRONLY);if (_fd > 0){std::string message;int cnt = 1;pid_t id = getpid();while (true){std::getline(std::cin, message);message += (",message number: " + std::to_string(cnt) + ", [" + std::to_string(id) + "]");ssize_t number = write(_fd, message.c_str(), sizeof(message));}}else{std::cout << "write open filed" << std::endl;}close(_fd);}void Close(){if (_fd > 0)close(_fd);}private:std::string _path;std::string _name;std::string _fifoname;int _fd;
};
这样就极大的简化了操作:
// sercer.cc文件 -- 用于创建出管道,并向管道中进行读取操作int main()
{Namedfifo fifo(".", "fifo");// 进行管道的读取操作Fileoper readfile(".", "fifo");readfile.OpenForRead();return 0;
}// sercer.cc文件 -- 用于创建出管道,并向管道中进行读取操作int main()
{Namedfifo fifo(".", "fifo");// 进行管道的读取操作Fileoper readfile(".", "fifo");readfile.OpenForRead();return 0;
}
但是我们每次进行write或open等操作时,对于错误都要进行单独的处理,比较麻烦,所以我们可以使用宏函数来处理这个问题:
#define ERR_EXIT(m) \do \{ \perror(m); \exit(EXIT_FAILURE); \} while (0)Namedfifo(const std::string &path, const std::string &name): _path(path), _name(name)
{umask(0);_fifoname = _path + "/" + _name;int n = mkfifo(_fifoname.c_str(), 0666);if (n == -1){//在使用时,就可以直接使用了:ERR_EXIT("mkfifo");}std::cout << "mkfifo success" << std::endl;
}
4.4命名管道与匿名管道的区别
命名管道与匿名管道的区别只有:命名管道可以用来进行不相关的进程之间的通信
其它的特性和匿名管道相同(五种特性都是一样的)
5.system V共享内存
system V其实是一种标准,是早期UNIX操作系统的一个版本,但是它提供了一套丰富的IPC机制,我们已经了解过,只有在某一领域非常领先之后才可以指定标准。Linux内核支持了这种标准,专门为此设计了一个IPC通信模块,其实也就是提供了通信的接口设计
5.1什么是共享内存
我们都知道,IPC的本质是:让不同的进程,看到同一份资源,了解共享内存才可以对其进行使用:
5.2共享内存接口的使用
我们先认识一下使用的接口:
我们通过代码来使用这些接口,直接进行封装了:
const std::string pathname = ".";
const int projid = 0x66;
const int gdefaultid = -1;
const int gsize = 4096;#define ERR_EXIT(m) \do \{ \perror(m); \exit(EXIT_FAILURE); \} while (0);class Shm
{
public:Shm() :_shmid(gdefaultid), _size(gsize){}// 创建一个共享内存void Creat(){key_t k = ftok(pathname.c_str(), projid);if (k < 0){ERR_EXIT("ftok");}printf("ftok return value is : 0x%x\n", k);_shmid = shmget(k, _size, IPC_CREAT | IPC_EXCL);if(_shmid < 0){ERR_EXIT("shmget");}printf("shmget return value is : %d\n", _shmid);}~Shm(){}private:int _shmid;int _size;
};
运行之后,我们可以使用ipcs -m来查看我们刚创建的共享内存:
当我们关闭xshell之后,再次查看共享内存,会发现,共享内存还在,其实,共享内存的声明周期是随内核的,只有当操作系统重启之后,共享内存才会被回收,那么回收使用的指令级操作是什么呢?:
我们写代码时肯定不会写指令级的代码,那么有没有一个函数,是用来进行共享内存的操作的呢?有!shmctl:
那么代码实现共享内存的操作如下:
//删除共享内存
void Destroy()
{if(_shmid == gdefaultid) return;int n = shmctl(_shmid, IPC_RMID, nullptr);if(n < 0){ERR_EXIT("shmctl");}printf("shmctl delete shm: %d success\n", _shmid);
}
5.3代码实现两个进程看到同一份资源
我们上面讲过,让两个进程看到同一个共享空间,需要让虚拟进程空间的指针通过页表与共享空间的指针进行对应,这个操作称为关联,而shmat(at: attach)就是代码级别的操作:
那么共享内存的关联操作的代码实现如下:
// 共享内存的关联操作
void Attach()
{if (_shmid == gdefaultid)return;_start_mem = shmat(_shmid, nullptr, 0);if ((long long)_start_mem < 0)ERR_EXIT("shmat");printf("shmat success\n");
}
// 返回关联的虚拟空间内的地址
void *VirtualAddr()
{printf("VirtualAddr: %p\n", _start_mem);return _start_mem;
}
但是我们运行时可能会报错:Permission Denied,也就是说,我们的进程没有权限访问共享内存,下面我们进行讲解:
我们实现了共享内存的创建、删除和关联操作,那么为了实现让两个进程都可以看到同一个共享内存,还差一个操作,也就是如何让另一个进程看到这个共享内存,原理其实就是找到同一个key值,然后进行关联即可:
// 创建一个共享内存
void Creat()
{key_t k = ftok(pathname.c_str(), projid);if (k < 0){ERR_EXIT("ftok");}printf("ftok return value is : 0x%x\n", k);_shmid = shmget(k, _size, IPC_CREAT | IPC_EXCL | 0666);if (_shmid < 0){ERR_EXIT("shmget");}printf("shmid is : %d\n", _shmid);
}
// 让另外一个进程获取共享内存
void Get()
{key_t k = ftok(pathname.c_str(), projid);if (k < 0){ERR_EXIT("ftok");}printf("ftok return value is : 0x%x\n", k);_shmid = shmget(k, _size, IPC_CREAT);if (_shmid < 0){ERR_EXIT("shmget");}printf("shmid is : %d\n", _shmid);
}
我们可以发现,这两个操作很相似,知识shmget的参数有一点区别,所以我们进行封装:
private:void CreatHelper(int flg){key_t k = ftok(pathname.c_str(), projid);if (k < 0){ERR_EXIT("ftok");}printf("ftok return value is : 0x%x\n", k);_shmid = shmget(k, _size, flg);if (_shmid < 0){ERR_EXIT("shmget");}printf("shmid is : %d\n", _shmid);}public:// 创建一个共享内存void Creat(){CreatHelper(IPC_CREAT | IPC_EXCL | 0666);}// 让另外一个进程获取共享内存void Get(){CreatHelper(IPC_CREAT);}
完整代码实现:
const std::string pathname = ".";
const int projid = 0x66;
const int gdefaultid = -1;
const int gsize = 4096;#define ERR_EXIT(m) \do \{ \perror(m); \exit(EXIT_FAILURE); \} while (0);class Shm
{
private:void CreatHelper(int flg){key_t k = ftok(pathname.c_str(), projid);if (k < 0){ERR_EXIT("ftok");}printf("ftok return value is : 0x%x\n", k);_shmid = shmget(k, _size, flg);if (_shmid < 0){ERR_EXIT("shmget");}printf("shmid is : %d\n", _shmid);}public:Shm() : _shmid(gdefaultid), _size(gsize), _start_mem(nullptr){}// 创建一个共享内存void Creat(){CreatHelper(IPC_CREAT | IPC_EXCL | 0666);}// 让另外一个进程获取共享内存void Get(){CreatHelper(IPC_CREAT);}// 共享内存的关联操作void Attach(){if (_shmid == gdefaultid)return;_start_mem = shmat(_shmid, nullptr, 0);if ((long long)_start_mem < 0)ERR_EXIT("shmat");printf("shmat success\n");}// 返回关联的虚拟空间内的地址void *VirtualAddr(){printf("VirtualAddr: %p\n", _start_mem);return _start_mem;}// 删除共享内存void Destroy(){if (_shmid == gdefaultid)return;int n = shmctl(_shmid, IPC_RMID, nullptr);if (n < 0){ERR_EXIT("shmctl");}printf("shmctl delete shm: %d success\n", _shmid);}~Shm(){}private:int _shmid;int _size;void *_start_mem;
};
5.4进程间通信代码封装实现
上面我们实现了让共享内存被两个不同的进程成功拿到并映射,本质是拿到同一份资源,但是我们并不想让结构体中的太多函数暴露给上层,所以我们要对我们的代码再进行一定的优化:
const std::string pathname = ".";
const int projid = 0x66;
const int gdefaultid = -1;
const int gsize = 4096;
#define CREATER "creater"
#define USER "user"#define ERR_EXIT(m) \do \{ \perror(m); \exit(EXIT_FAILURE); \} while (0);class Shm
{
private:void CreatHelper(int flg){key_t k = ftok(pathname.c_str(), projid);if (k < 0){ERR_EXIT("ftok");}printf("ftok return value is : 0x%x\n", k);_shmid = shmget(k, _size, flg);if (_shmid < 0){ERR_EXIT("shmget");}printf("shmid is : %d\n", _shmid);}// 共享内存的关联操作void Attach(){if (_shmid == gdefaultid)return;_start_mem = shmat(_shmid, nullptr, 0);if ((long long)_start_mem < 0)ERR_EXIT("shmat");printf("shmat success\n");}// 删除共享内存void Destroy(){if (_shmid == gdefaultid)return;int n = shmctl(_shmid, IPC_RMID, nullptr);if (n < 0){ERR_EXIT("shmctl");}printf("shmctl delete shm: %d success\n", _shmid);}public:Shm(const std::string &pathname, const std::string &usertype): _shmid(gdefaultid),_size(gsize),_start_mem(nullptr),_pathname(pathname),_usertype(usertype){if (_usertype == CREATER)CreatHelper(IPC_CREAT | IPC_EXCL | 0666);else if (_usertype == USER)CreatHelper(IPC_CREAT);else{}Attach();}// 返回共享内存的起始地址void *VirtualAddr(){printf("VirtualAddr: %p\n", _start_mem);return _start_mem;}// 返回共享内存的大小int Size(){printf("Shm Size is: %d\n", _size);return _size;}~Shm(){//只有创建共享内存的需要执行删除操作if(_usertype == CREATER)Destroy();}private:int _shmid;int _size;void *_start_mem;std::string _pathname;std::string _usertype;
};
此时我们的代码就会被优化很多了
5.5共享内存实现的进程间通信操作
我们之前进行的操作的核心在于让不同的进程看到同一份资源,那么怎么实现依靠于共享内存进程间通信呢?:
5.5.1基于共享内存实现的进程间通信的优缺点分析
5.5.2处理共享内存的缺点
对于共享内存的这个缺点,我们可以使用什么方法来解决呢?可以使用命名管道的方法解决,但是这个方法显然不是正规的解决方法,我们先来看该方法的思路:
为什么说这个方法不正规呢?因为根本不会有人使用这个方法,而且这个方法引出的问题也很多,我们只是学习,下面来看:
完整代码实现:
5.6 shmdt
我们始终都忽略了一个接口,也就是我们只实现了共享内存的挂起,也就是将共享内存映射到了物理内存中,建立了页表的映射,但是我们忽略了共享内存使用结束之后的取消挂起操作,也就是取消进程页表和物理内存的映射关系:
5.7一个细节问题
我们开辟的共享内存为4096,那么我们如果开辟了4097的空间,那么操作系统给我们的空间是多少?我们实际可以使用的空间又是多少?
在内核中,共享内存在创建的时候,它的大小,必须是4096的倍数,所以操作系统给我们的实际空间大小为4096*2,但是我们只能使用4096的空间!
5.8描述共享内存的数据结构
6.system V信号量
system V标准其实并不重要,想要了解ststem V消息队列的可以自己进行搜索学习,但是我们要通过该标准引入信号和信号量
6.1前提知识补充
6.2什么是信号量
6.3信号量的接口和系统调用
因为system V信号量有它自己的缺点,所以这里我么不再进行讲述,想再了解的直接搜索:system V信号量,但是我们进行一些结论的输出:
共享内存、消息队列、信号量的接口,都是使用key值来进行唯一性的区分的,OS将这三种当作成了同一种资源,这也是为什么这三个叫做system V标准了!在底层,因为这三种资源都是使用key进行唯一性的区分的,所以OS使用了一个结构体,就对这三种资源进行了统一的管理!!!
7.附录 – 进程池完整代码
进程池实现:
#pragma once#include <iostream>
#include <vector>
#include <unistd.h>
#include <sys/types.h>
#include <cstdlib>
#include <sys/wait.h>
#include "Task.hpp"// 先描述
class Channel
{
public:Channel(int wfd, pid_t subid) : _wfd(wfd), _subid(subid){_name = "channel-" + std::to_string(_wfd) + "-" + std::to_string(_subid);}//任务码的发送void Send(int code){//拿到需要执行的任务之后,我们需要将任务码先发送出去,子进程拿到任务码之后,就可以进行任务执行了ssize_t n = write(_wfd, &code, sizeof(code));(void)n;//防止警告}void Close(){close(_wfd);std::cout << "关闭: " << _name << std::endl;}void Wait(){pid_t rid = waitpid(_subid, nullptr, 0);std::cout << "回收: " << _name << std::endl;(void)rid;}int Fd() { return _wfd; }pid_t SubId() { return _subid; }std::string Name() { return _name; }~Channel() {}private:int _wfd; // 读端fdpid_t _subid; // 进程pidstd::string _name; // 用来方便管道标识
};// 再组织
class ChannelManager
{
public:ChannelManager():_next(0) {}void Insert(int wfd, pid_t subid){_channels.emplace_back(wfd, subid);}// 选择一个需要执行的进程Channel &Select(){auto &c = _channels[_next];_next++;_next %= _channels.size();return c;}void PrintChannel(){for (auto &channel : _channels){std::cout << channel.Name() << std::endl;}}//管道的回收 -- 关闭wfdvoid StopSubProcess(){for (auto &channel : _channels){channel.Close();std::cout << "关闭: " << channel.Name() << std::endl;}}//进程回收 -- waitpidvoid WaitSubProcess(){for (auto &channel : _channels){channel.Wait();std::cout << "回收: " << channel.Name() << std::endl;}}void CloseAndWait(){// //解决方法1:倒着回收// for(int i = _channels.size()-1; i>=0; i--)// {// _channels[i].Close();// _channels[i].Wait();// }//解决方法2:只让父进程有w端的指向for (auto &channel : _channels){channel.Close();channel.Wait();}}void CloseAll(){for(auto& channel : _channels){channel.Close();}}~ChannelManager() {}private:std::vector<Channel> _channels; // 用来管理所有的管道int _next; // 下一个需要执行的进程
};const int gdefaultnum = 5;// 进程池
class ProcessPoll
{
public:ProcessPoll(int num) : _process_num(num){_tm.Register(PrintLog);_tm.Register(Download);_tm.Register(Upload);}// 子进程工作void Work(int rfd) {//创建好子进程之后,子进程就进入该函数,进入阻塞状态//一旦父进程通过函数调用写入了一个任务码,就可以进行任务执行while(true){int code = 0;ssize_t n = read(rfd, &code, sizeof(code));if(n > 0)//读取成功{if(n != sizeof(code)) continue;//读取不符std::cout << "子进程[" << getpid() << "]收到一个任务码: " << code << std::endl;_tm.Execute(code);}else if(n == 0)//父进程关闭写端{std::cout << "子进程退出" << std::endl;break;}else{printf("read失败\n");break;}}}// 执行命令void Run(){// 假如执行的任务是随机分配的int taskcode = _tm.Code();// 任务执行 -- 采用轮询的方式,所以我们要先知道该哪个进程执行了// 1.选择一个能够执行任务的进程auto &c = _cm.Select();std::cout << "选择了一个子进程: " << c.Name() << std::endl;// 2. 发送任务码c.Send(taskcode);std::cout << "发送了一个任务码: " << taskcode << std::endl;}// 在进程池中创建进程,并将管道进行管理bool Start(){for (int i = 0; i < _process_num; i++) // 创建定量的进程{// 1.创建管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)return false;// 2.创建子进程pid_t subid = fork();if (subid < 0){printf("创建子进程失败\n");return false;}else if (subid == 0){// 子进程 -- 读端 -- pidfd[0]_cm.CloseAll();close(pipefd[1]);Work(pipefd[0]);close(pipefd[0]); // work执行完毕,证明进程结束,关闭文件exit(1);}else{// 父进程 -- 写端 -- pidfd[1]close(pipefd[0]);_cm.Insert(pipefd[1], subid); // 将创建的进程和管道进行管理// 此时还不确定需要执行什么任务,所以只能让子进程处于阻塞状态// 通过其它函数确定好执行任务之后,父进程才进行写入}}return true;}void Debug(){_cm.PrintChannel();}//进程的回收void Stop(){// //管道的回收,只需要将父进程的读端关闭即可// _cm.StopSubProcess();// //子进程的回收,需要waitpid// _cm.WaitSubProcess();_cm.CloseAndWait();}~ProcessPoll() {}private:ChannelManager _cm; // 对管道进行管理int _process_num; // 创建多少个进程TaskManager _tm;
};
任务实现:
#pragma once
#include <iostream>
#include <vector>
#include <ctime>typedef void (*task_t)();/Task/
void PrintLog()
{std::cout << "我是一个打印日志的任务" << std::endl;
}void Download()
{std::cout << "我是一个下载的任务" << std::endl;
}void Upload()
{std::cout << "我是一个上传的任务" << std::endl;
}
/Task/class TaskManager
{
public:TaskManager(){srand(time(nullptr));}//将任务插入到任务管理表中void Register(task_t task){_tasks.push_back(task);}//任务码(每一个任务码对应一个任务,拿到一个任务码,就执行对应的任务)int Code(){return rand() % _tasks.size();}//任务执行void Execute(int code)//拿到任务码执行任务{if(code>=0 && code<_tasks.size()){_tasks[code]();}}~TaskManager(){}private:std::vector<task_t> _tasks;
};
Main:
#include <iostream>
#include "ProcessPoll.hpp"int main()
{//创建进程池,假如进程池中需要5个进程ProcessPoll pp(gdefaultnum);//启动进程池,执行进程创建工作pp.Start();//有了进程池,就能够分配任务并让进程池中的进程执行了int cnt = 10;while(cnt--){pp.Run();//执行10次命令sleep(1);}//进程的回收pp.Stop();return 0;
}