当前位置: 首页 > news >正文

Linux:进程间通信->匿名管道实现内存池

1. 进程间通信

(1) 概念

进程间通信(IPC)  就是不同进程间交换数据的方法,进程间是独立的所以不能访问彼此的内存,需要某种机制来通信(管道、消息队列,共享内存等)

(2) 目的

数据传输:一个进程需要他的数据发送给另一个进程

资源共享:多个进程之间共享同样的资源

通知事件:一个进程需要向另一个或一组进程发送消息,通知它们发生了某种事件(例如子进程终止通知父进程)

进程控制:有些进程希望完全控制另一个进程的执行,此时控制进程希望能够拦截另一个进程的所有陷入和异常,并及时知道它的状态改变

 (3) 本质

不同的进程先看到同一份资源(内存) 才有通信的条件。 同一份资源由操作系统OS来提供系统调用,OS的接口->设计统一的通信接口。

2. 匿名管道

管道是Unix中进程通信的形式,把一个进程链接到另一个进程的数据流称为一个管道.

匿名管道是单向通信:一端写另一端读。用于做父子通信,通过对文件的读写来实现通信,通过文件描述符表来实现fd。

使用原理是创建一个子进程,让父子进程都指向一个文件,最后我们就可以让父进程写入或读取,子进程读取或写入数据了。

创建匿名管道
#include <unistd.h>
int fd[2];
pipe(fd);

pipe系统调用,创建匿名管道,fd数组存储两个文件描述符

所需头文件<unistd.h>

fd[0]:读端(从管道文件读取数据)

fd[1]:写端(向管道文件写入数据)

匿名管道的五种特性
  1. 只能用来进行具有血缘关系的进程间通信(常用于父与子)
  2. 管道文件自带同步机制 快的等待慢的
  3. 管道是面向字节流的
  4. 管道是单向通信的(特殊的半双工) 
  5. 管道文件的生命周期是随进程的(所有拥有其读写端fd的都close关闭)进程终止时,所有未关闭的fd都会被内核自动关闭。
 四种通信情况
  1. 写慢,读快---读端就要阻塞(等待写端写入)。
  2. 写快,读慢---满了的时候,写就要阻塞等待
  3. 写关闭,读继续---read就会读到返回值为0,表示文件结尾
  4. 读关闭,写继续---写端再写入也没有任何意义了
    操作系统OS不做无意义的事情,OS会(发送异常信号)杀掉写端进程

3. 进程池实现

三个类来描述再组织

channel

描述这个管道

class Channel
{
public:Channel(int wfd, pid_t chpid): _wfd(wfd),_chpid(chpid){// to_string 将数字转化为string形式_name = "channel-" + std::to_string(_wfd) + "+" + std::to_string(_chpid);}int Fd() { return _wfd; }            // 返回_wfdpid_t Chpid() { return _chpid; }     //..std::string Name() { return _name; } // 返回名字void Send(int code){int n= write(_wfd,&code,sizeof code);(void)n;//调用一下n,防止检查}void Close(){close(_wfd);}void Wait(){pid_t rid=waitpid(_chpid,nullptr,0);(void)rid;//操作一下这个变量}~Channel() {}private:int _wfd;          // 文件描述符pid_t _chpid;      // 对应的子进程是谁std::string _name; // 命名信道方便区分
};

Senf写入任务码

Close关闭写端fd

Wait等待子进程

ChannelManage

用来管理创建的匿名管道

// 管理Channel管道
class ChannelManage
{
public:ChannelManage(){}void InsterChannel(int wfd, pid_t chpid) // 写的fd 与子进程pid{// Channel c(wfd,chpid);// _channel.push_back(std::move(c));_channel.emplace_back(wfd, chpid); // 免去拷贝}void PrintfChannel(){for (auto &channel : _channel){std::cout << channel.Name() << std::endl;}}Channel& Select(){auto& c=_channel[_next];_next++;if(_next==_channel.size())_next=0;return c;}// void StopSubprocess()//暂停子进程// {//     for(auto& channel:_channel)//     {//         channel.Close();//关闭接口//         std::cout<<"关闭:"<<channel.Name()<<std::endl;//     }// }// void WaitSubprocess()//等待子进程结束// {//     for(auto& channel:_channel)//     {//         channel.Wait();//等待进程//         std::cout<<"回收:"<<channel.Name()<<std::endl;//     }// }void CloseAll(){for(auto& channel:_channel){channel.Close();//关闭父进程历史打开的写端// std::cout<<"关闭:"<<channel.Name()<<std::endl;}}void CloseAndWait(){//越早开辟的进程,写端引用计数的越多,关闭不了for(auto& channel:_channel){channel.Close();//关闭接口std::cout<<"关闭:"<<channel.Name()<<std::endl;channel.Wait();//等待进程std::cout<<"回收:"<<channel.Name()<<std::endl;}//解决方法1://倒着关闭所有进程// for(int i= _channel.size()-1;i>=0;i--)// {//     _channel[i].Close();//关闭接口//     std::cout<<"关闭:"<<_channel[i].Name()<<std::endl;//     _channel[i].Wait();//等待进程//     std::cout<<"回收:"<<_channel[i].Name()<<std::endl;// }//解决方案2://让父进程一个指向所有管道写端w}~ChannelManage(){}private:std::vector<Channel> _channel; // 存储管道int _next=0;
};

InsterChannel :插入新的匿名管道 写端fd 与 子进程的pid

PrintfChannel :输出所有匿名管道的名字

Select :按顺序返回管道

CloseAll :关闭父进程历史打开的写端,让写端基本不会重复打开

CloseAndWait :关闭写端,CloseAll用来解决写端引用计数过多无法关闭的情况。

 Select的作用

干活不能只让一个人干,执行任务不能总让一个子进程去执行所以有三种方法

1. 轮询 2. 随机 3. Channel增添一个负载指标,通过这个指标来选择谁干活

这里Select采用了轮询的方式

TaskManager 与 ProcessPool
typedef void (*task_t)();//函数指针void PrintfLog()
{std::cout<<"我是打印日志的任务"<<std::endl;
}void Download()
{std::cout<<"我是下载文件的任务"<<std::endl;
}void Upload()
{std::cout<<"我是上传文件的任务"<<std::endl;
}class TaskManager
{
public:TaskManager(){srand(time(nullptr));}void Register(task_t t)//注册{_tasks.push_back(t);}int Code(){return rand()%_tasks.size();}void Execute(int code){if(code>=0&&code< _tasks.size())//在数组范围内{_tasks[code]();//根据传递来的任务码来执行任务}}~TaskManager(){}
private:std::vector<task_t> _tasks;
};

 存储各种任务

class ProcessPool
{
public:ProcessPool(int num=5): _process_num(num){_tm.Register(PrintfLog);//将任务都存入_tm管理_tm.Register(Download);_tm.Register(Upload);}void Work(int rfd) // 读端文件描述符{while (true){int code=0;//最大读入长度为4,所以ssize_t n=read(rfd,&code,sizeof code);if(n > 0)//读成功{if(n!=sizeof code)//读取数据长度要为我们规定的长度{continue;//不然重新读}std::cout<<"子进程"<<getpid()<<"收到任务码,任务码为:"<<code<<std::endl;//要执行任务了_tm.Execute(code);//执行code这个任务}else if(n==0)//读到头了{std::cout<<"子进程退出"<<std::endl;break;}else//n<0 读取失败了{std::cout<<"读取数据失败"<<std::endl;break;// exit(1);}// std::cout << "我是子进程,我的rfd是" << rfd << std::endl;sleep(1);}}bool Create() // 初始化管道{for (int i = 0; i < _process_num; i++){// 创建匿名管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)return false; // 失败了// 创建子进程pid_t nid = fork();if (nid == 0) // 子进程负责读{//让子进程关闭自己继承下来的哥哥进程的写端关闭就可以了//只会影响引用计数不关闭父进程写端_cm.CloseAll();//写实拷贝所以不会影响父进程。子进程每次关闭都只关子进程的close(pipefd[1]); // 将写端关闭Work(pipefd[0]); // 读端工作close(pipefd[0]);exit(0); // 工作完就退出即可}else if (nid < 0) // 创建失败{std::cout << "子进程创建失败" << std::endl;return false;}else // 父进程{// char buf[1024];close(pipefd[0]); // 读端关闭_cm.InsterChannel(pipefd[1], nid); // 将这个子进程加入数组保存// close(pipefd[1]); // 写端关闭要在最后才关闭}}return true;}void Debug(){_cm.PrintfChannel();}void PushTask()//写任务{//不能选择一个子进程一直用啊//下面三种实现负载均衡方案//1. 轮询 2. 随机 3. Channel添加一个负载指标,通过是否达成指标来//1. 选一个任务int taskcode=_tm.Code();//随机选一个任务//2. 选择一个信道 子进程auto& c=_cm.Select();std::cout<<"选择了一个子进程"<<c.Name()<<std::endl;//3. 发送任务c.Send(taskcode);//传任务码std::cout<<"发送了一个任务码:"<<taskcode<<std::endl;}void Stop(){// //关闭父进程所有的wfd// _cm.StopSubprocess();// //回收所有子进程// _cm.WaitSubprocess();_cm.CloseAndWait();}~ProcessPool(){}private:ChannelManage _cm; // 管理所有的管道int _process_num;  // 进程数量TaskManager _tm;//管理所有的任务
};

三个参数 

ChannelManage _cm; // 管理所有的管道 
int _process_num;  // 记录进程数量
TaskManager _tm;//管理所有的任务

Create:pipe创建匿名管道,并将父子进程通过if else分开执行任务

Work :参数为读端文件描述符,所执行任务码通过read 来获得

PushTask :选择一个任务来执行通过轮询机制

Stop :关闭所有写端

 4. 源码

processpol.hpp

hpp文件是一种在C++中的特殊头文件,通常用于将类的声明和实现放在同一个文件中。

.h文件和.cpp文件将类的声明和实现分离的方式不同,hpp文件将这两者结合在一起。

#pragma once
#include <cstdio>
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <vector>
#include <string>
#include<sys/wait.h>
#include<sys/types.h>
#include "Task.hpp"
class Channel
{
public:Channel(int wfd, pid_t chpid): _wfd(wfd),_chpid(chpid){// to_string 将数字转化为string形式_name = "channel-" + std::to_string(_wfd) + "+" + std::to_string(_chpid);}int Fd() { return _wfd; }            // 返回_wfdpid_t Chpid() { return _chpid; }     //..std::string Name() { return _name; } // 返回名字void Send(int code){int n= write(_wfd,&code,sizeof code);(void)n;//调用一下n,防止检查}void Close(){close(_wfd);}void Wait(){pid_t rid=waitpid(_chpid,nullptr,0);(void)rid;//操作一下这个变量}~Channel() {}private:int _wfd;          // 文件描述符pid_t _chpid;      // 对应的子进程是谁std::string _name; // 命名信道方便区分
};// 管理Channel管道
class ChannelManage
{
public:ChannelManage(){}void InsterChannel(int wfd, pid_t chpid) // 写的fd 与子进程pid{// Channel c(wfd,chpid);// _channel.push_back(std::move(c));_channel.emplace_back(wfd, chpid); // 免去拷贝}void PrintfChannel(){for (auto &channel : _channel){std::cout << channel.Name() << std::endl;}}Channel& Select(){auto& c=_channel[_next];_next++;if(_next==_channel.size())_next=0;return c;}// void StopSubprocess()//暂停子进程// {//     for(auto& channel:_channel)//     {//         channel.Close();//关闭接口//         std::cout<<"关闭:"<<channel.Name()<<std::endl;//     }// }// void WaitSubprocess()//等待子进程结束// {//     for(auto& channel:_channel)//     {//         channel.Wait();//等待进程//         std::cout<<"回收:"<<channel.Name()<<std::endl;//     }// }void CloseAll(){for(auto& channel:_channel){channel.Close();//关闭父进程历史打开的写端// std::cout<<"关闭:"<<channel.Name()<<std::endl;}}void CloseAndWait(){//越早开辟的进程,写端引用计数的越多,关闭不了for(auto& channel:_channel){channel.Close();//关闭接口std::cout<<"关闭:"<<channel.Name()<<std::endl;channel.Wait();//等待进程std::cout<<"回收:"<<channel.Name()<<std::endl;}//解决方法1://倒着关闭所有进程// for(int i= _channel.size()-1;i>=0;i--)// {//     _channel[i].Close();//关闭接口//     std::cout<<"关闭:"<<_channel[i].Name()<<std::endl;//     _channel[i].Wait();//等待进程//     std::cout<<"回收:"<<_channel[i].Name()<<std::endl;// }//解决方案2://让父进程一个指向所有管道写端w}~ChannelManage(){}private:std::vector<Channel> _channel; // 存储管道int _next=0;
};class ProcessPool
{
public:ProcessPool(int num=5): _process_num(num){_tm.Register(PrintfLog);//将任务都存入_tm管理_tm.Register(Download);_tm.Register(Upload);}void Work(int rfd) // 读端文件描述符{while (true){int code=0;//最大读入长度为4,所以ssize_t n=read(rfd,&code,sizeof code);if(n > 0)//读成功{if(n!=sizeof code)//读取数据长度要为我们规定的长度{continue;//不然重新读}std::cout<<"子进程"<<getpid()<<"收到任务码,任务码为:"<<code<<std::endl;//要执行任务了_tm.Execute(code);//执行code这个任务}else if(n==0)//读到头了{std::cout<<"子进程退出"<<std::endl; break;}else//n<0 读取失败了{std::cout<<"读取数据失败"<<std::endl;break;// exit(1);}// std::cout << "我是子进程,我的rfd是" << rfd << std::endl;sleep(1);}}bool Create() // 初始化管道{for (int i = 0; i < _process_num; i++){// 创建匿名管道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)return false; // 失败了// 创建子进程pid_t nid = fork();if (nid == 0) // 子进程负责读{//让子进程关闭自己继承下来的哥哥进程的写端关闭就可以了//只会影响引用计数不关闭父进程写端_cm.CloseAll();//写实拷贝所以不会影响父进程。子进程每次关闭都只关子进程的close(pipefd[1]); // 将写端关闭Work(pipefd[0]); // 读端工作close(pipefd[0]);exit(0); // 工作完就退出即可}else if (nid < 0) // 创建失败{std::cout << "子进程创建失败" << std::endl;return false;}else // 父进程{// char buf[1024];close(pipefd[0]); // 读端关闭_cm.InsterChannel(pipefd[1], nid); // 将这个子进程加入数组保存// close(pipefd[1]); // 写端关闭要在最后才关闭}}return true;}void Debug(){_cm.PrintfChannel();}void PushTask()//写任务{//不能选择一个子进程一直用啊//下面三种实现负载均衡方案//1. 轮询 2. 随机 3. Channel添加一个负载指标,通过是否达成指标来//1. 选一个任务int taskcode=_tm.Code();//随机选一个任务//2. 选择一个信道 子进程auto& c=_cm.Select();std::cout<<"选择了一个子进程"<<c.Name()<<std::endl;//3. 发送任务c.Send(taskcode);//传任务码std::cout<<"发送了一个任务码:"<<taskcode<<std::endl;}void Stop(){// //关闭父进程所有的wfd// _cm.StopSubprocess();// //回收所有子进程// _cm.WaitSubprocess();_cm.CloseAndWait();}~ProcessPool(){}private:ChannelManage _cm; // 管理所有的管道int _process_num;  // 进程数量TaskManager _tm;//管理所有的任务
};
Task.hpp
#pragma once
#include <cstdio>
#include <iostream>
#include <cstring>
#include <unistd.h>
#include <vector>
#include <string>
#include <ctime>
typedef void (*task_t)();//函数指针void PrintfLog()
{std::cout<<"我是打印日志的任务"<<std::endl;
}void Download()
{std::cout<<"我是下载文件的任务"<<std::endl;
}void Upload()
{std::cout<<"我是上传文件的任务"<<std::endl;
}class TaskManager
{
public:TaskManager(){srand(time(nullptr));}void Register(task_t t)//注册{_tasks.push_back(t);}int Code(){return rand()%_tasks.size();}void Execute(int code){if(code>=0&&code< _tasks.size())//在数组范围内{_tasks[code]();//根据传递来的任务码来执行任务}}~TaskManager(){}
private:std::vector<task_t> _tasks;
};
main.cpp
#include"processpoll.hpp"int main()
{ProcessPool pp(5);pp.Create();// pp.Debug();int cnt=10;while(cnt--){//1. 选择一个信道pp.PushTask();sleep(1);}pp.Stop();// sleep(1000);return 0;
}

这篇就到这里啦(๑′ᴗ‵๑)I Lᵒᵛᵉᵧₒᵤ❤

相关文章:

  • 深入剖析 Vue 双向数据绑定机制 —— 从响应式原理到 v-model 实现全解析
  • Android中的多线程
  • ubuntu20.04安装x11vnc远程桌面
  • 如何成功防护T级超大流量的DDoS攻击
  • 【Leetcode 每日一题】2845. 统计趣味子数组的数目
  • 汽车售后 D - PDU 和 J2543 详细介绍
  • 驱动开发硬核特训 · Day 21(下篇): 深入剖析 PCA9450 驱动如何接入 regulator 子系统
  • Serverless 在云原生后端的实践与演化:从函数到平台的革新
  • classfinal 修改过源码,支持jdk17 + spring boot 3.2.8
  • 【k8s】sidecar边车容器
  • 项目maven版本不一致 导致无法下载
  • 【遥感图像分类】【综述】遥感影像分类:全面综述与应用
  • python实现简单的UI交互
  • redis客户端库redis++在嵌入式Linux下的交叉编译及使用
  • 多物理场耦合低温等离子体装置求解器PASSKEy2
  • ROS 快速入门教程04
  • 【Vue】静态站点生成(VitePress)
  • 星火燎原:大数据时代的Spark技术革命在数字化浪潮席卷全球的今天,海量数据如同奔涌不息的洪流,传统的数据处理方式已难以满足实时、高效的需求。
  • 【Python数据库编程实战】从SQL到ORM的完整指南
  • 大数据分析04 数据查询分析
  • 咖啡与乳腺健康之间,究竟有着怎样的复杂关系?
  • 滁州一交通事故责任认定引质疑:民警和司法鉴定人被处罚,已中止诉讼
  • 政治局会议:根据形势变化及时推出增量储备政策,加强超常规逆周期调节
  • 马上评丨马拉松“方便门”被处罚,是一针清醒剂
  • 韩国检方以受贿嫌疑起诉前总统文在寅
  • 央媒关注脑瘫女骑手:7年跑出7.3万多份单,努力撑起生活