当前位置: 首页 > news >正文

Linux操作系统4-进程间通信3(基于管道的进程池设计)

上篇文章:Linux操作系统4-进程间通信2(命名管道进行通信client与server)-CSDN博客

本篇Gitee代码:myLerningCode · 橘子真甜/Linux操作系统与网络编程学习 - 码云 - 开源中国 (gitee.com)

本篇重点:使用匿名管道实现一个简单的进程池

目录

一. 进程池

二. 子进程执行任务的方法表

三. 创建子进程,并维护好管道的通信

3.1 类SubEndpoint 

3.4 父进程发送信息码

3.3 子进程接收信息码

3.4 创建子进程与管道文件

四. 父进程负载均衡控制子进程执行任务

五. 父进程等待子进程

六. 主函数测试代码 


一. 进程池

        让一个进程去控制多个进程完成我们的任务。这样可以有效的管理进程,提高整个系统的效率。

        这里,我们使用匿名管道简单实现一个进程池,让父进程去控制多个子进程负载均衡完成任务 。

二. 子进程执行任务的方法表

        提前设计好方法表之后,我们只需要启动程序之后将方法表加载好之后,子进程就可以直接调用这些方法去完成任务。

        如果有新的任务需要完成,只需简单的修改加载方法表的代码即可。

方法表的加载测试代码如下:

#include <iostream>
#include <functional>
#include <string>
#include <vector>

#include <cstdlib>
#include <cassert>

#include <unistd.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <fcntl.h>

// 函数指针
typedef void (*func_t)();

// 进程的方法
void add()
{
    std::cout << "id为" << getpid() << "正在执行加法任务" << std::endl;
    sleep(1); // 模拟该任务花费的时间
}

void sub()
{
    std::cout << "id为" << getpid() << "正在执行减法任务" << std::endl;
    sleep(1);
}

void print()
{
    std::cout << "id为" << getpid() << "正在执行打印任务" << std::endl;
    sleep(1); // 模拟该任务花费的时间
}

void download()
{
    std::cout << "id为" << getpid() << "正在执行下载任务" << std::endl;
    sleep(1);
}

// 建立一个任务集合表  func_t
//此处既可以使用函数指针,也可以使用C++自带的function进行包装
void loadTaskFunc(std::vector<std::function<void()>> *funcMap)
{
    assert(funcMap != nullptr);
    funcMap->push_back(add);
    funcMap->push_back(sub);
    funcMap->push_back(print);
    funcMap->push_back(download);
}

int main()
{
    std::vector<std::function<void()>> funcMap;
    loadTaskFunc(&funcMap);

    for (int i = 0; i < 100; i++)
    {
        funcMap[i % 4]();
    }
    return 0;
}

测试的结果如下:

之后只需要在让父进程负载均衡将这些任务派发给子进程让它们完成这些任务即可。

三. 创建子进程,并维护好管道的通信

3.1 类SubEndpoint 

        为了方便管理子进程和这个子进程的通信管道,我们定义一个类用于保存这两个信息。这样我们就能方便的选择子进程和这个进程的管道进行通信。

代码如下:

class SubEp
{
public:
    SubEp(int writeFd, pid_t pid)
        : _writeFd(writeFd), _pid(pid)
    {
        char buffer[64];
        snprintf(buffer, sizeof(buffer), "process: num[%d]pid[%d]fd[%d]", num++, pid, writeFd);
        _name = buffer;
    }

public:
    int _writeFd;      // 该进程的管道文件描述符
    pid_t _pid;        // 该进程的pid
    std::string _name; // 该进程的名称
    static int num;    // 这是第几个进程
};
int SubEp::num = 1;

3.4 父进程发送信息码

        父进程每一个每一次随机选择一个子进程让这个子进程随机执行一个任务,这样能基本保证子进程的运行是均衡的。

代码如下: 

// 父进程将一个随机信息码发送给一个随机的子进程
void sendTask(const SubEp &process, const int &num)
{
    std::cout << "send task num: " << num << " to ->" << process._name << std::endl;
    int n = write(process._writeFd, &num, sizeof(num));
    assert(n == sizeof(num));
    (void)n;
}

3.3 子进程接收信息码

        对于子进程来说,子进程从管道中获取来自父进程发送的信息码。根据父进程传递的信息码来判断执行哪一个任务。

        比如父进程向子进程发送了一个1,子进程在方法表中找到下标为1的这个方法并执行这个方法的代码。

 代码如下:

// 子进程接收来自父进程从管道传递过来的信息码
int receiveTask(int readFd)
{
    int code = 0;
    ssize_t n = read(readFd, &code, sizeof(code));
    if (n == 4) // 传递4个字节说明是一个整数
        return code;
    return -1;
}

3.4 创建子进程与管道文件

        定义一个函数用于创建所有的子进程,同时维护好各个子进程与父进程之间的关系

代码如下:

#define PROCESS_NUM 5
// 参数分别是子进程集合,方法集合
void creatSubProcess(std::vector<SubEp> &subs, std::vector<func_t> &funcMap)
{
    for (int i = 0; i < PROCESS_NUM; i++)
    {
        int fds[2];
        int n = pipe(fds); // 创建管道文件
        assert(n = 0);

        pid_t id = fork();
        if (id == 0)
        {
            // 子进程,需要关闭写端
            close(fds[1]);

            // 接收来自父进程的信息码并处理任务,如果没有则阻塞等待
            while (true)
            {
                int commandCode = receiveTask(fds[0]);
                if (commandCode >= 0 && commandCode <= funcMap.size())
                    funcMap[commandCode]();
                else if (commandCode == -1)
                    break;
            }
            exit(0);
        }

        // 父进程,需要将子进程和其管道信息保存好到subs中
        close(fds[0]);
        SubEp sub(fds[1], id);
        subs.push_back(sub);
    }
}

四. 父进程负载均衡控制子进程执行任务

        当我们创建好了子进程,以及父子进程之间的管道通信文件之后,我们就可以让父进程去控制子进程进行执行方法表的任务了。

        子进程在被创建之后,此时父进程还没有向子进程发送消息,由于匿名管道文件具有同步和互斥功能。子进程此时会阻塞等待父进程向管道中发送信息,直到收到信息之后子进程才会根据信息执行相应的任务。

代码如下:

void loadBalanceContrl(const std::vector<SubEp> &subs, const std::vector<func_t> &funcMap, int count)
{
    int subProcessNum = subs.size();
    int funcNum = funcMap.size();

    bool flag = (count == 0 ? true : false);
    while (true)
    {
        // 1.随机选择一个子进程
        int subIndex = rand() % subProcessNum;

        // 2.随机选择一个任务码
        int taskcode = rand() % funcNum;

        // 3.将任务码发送相应的子进程进行执行任务
        sendTask(subs[subIndex], taskcode);

        if (!flag) // 表明任务还没有结束
        {
            count--;
            if (count == 0) // 任务数量为0,结束
                break;
        }
        sleep(1);
    }

    // 依次关闭子进程的管道文件描述符
    for (int i = 0; i < subs.size(); i++)
    {
        close(subs[i]._writeFd);
    }
}

  注意,我们在完成所有的任务之后,需要关闭子进程的文件描述符表。

五. 父进程等待子进程

        子进程完成所有的任务之后,父进程需要等待子进程并获取子进程的进程退出码和进程退出时候的信号。用于获取任务是否完成以及子进程是否异常退出

代码如下:

        

void waitSubProcess(const std::vector<SubEp> &subs)
{
    int processNum = subs.size();
    for (int i = 0; i < subs.size(); i++)
    {
        int status = 0;
        waitpid(subs[i]._pid, &status, 0);
        std::cout << "子进程id:" << subs[i]._pid << " 该进程的退出码为:" << ((status >> 8) & 0xff) << " 该进程的退出信号是:" << (status & 0x7f) << std::endl;
    }
}

对于进程退出的内容可以看这一篇文章:Linux操作系统2-进程控制2(进程等待,waitpid系统调用,阻塞与非阻塞等待)_wait系统调用-CSDN博客

六. 主函数测试代码 

        测试代码如下:

int main()
{
    srand((unsigned int)time(0));
    // 1.创建方法表
    std::vector<func_t> funcMap; // 方法表集合
    loadTaskFunc(&funcMap);

    // 创建子进程并维护好管道文件
    std::vector<SubEp> subs; // 子进程封装
    creatSubProcess(subs, funcMap);

    // 负载均衡派发任务
    int taskCount = 15;
    loadBalanceContrl(subs, funcMap, taskCount);

    // 等待子进程
    waitSubProcess(subs);

    return 0;
}

测试结果如下:

 

可以看到,父进程成功地将信息随机发送给子进程让其完成我们的任务。

相关文章:

  • Cython学习笔记1:利用Cython加速Python运行速度
  • 2025年信息科学与工程学院科协单片机编程介绍——按键拓展编程
  • 第6章:基于LangChain如何开发Agents,附带客户支持智能体示例
  • Spring Boot 中多线程工具类的配置与使用:基于 YAML 配置文件
  • 21.回溯算法3
  • 【2025最新版】Chrome谷歌浏览器如何能恢复到之前的旧版本
  • 【信息系统项目管理师-案例真题】2013下半年案例分析答案和详解
  • 对CSS了解哪些?
  • Ubuntu 下 nginx-1.24.0 源码分析 - ngx_os_specific_init函数
  • 网站改了域名,如何查找?
  • HTTP和HTTPS详解
  • ai json处理提示词
  • AI大模型零基础学习(7):边缘智能与物联网——让AI走出云端
  • XML XML约束 二、DTD
  • 基于STM32的智能工业设备健康监测系统
  • StableDiffusion学习笔记——6、XYZ图表
  • 基于spring boot物流管理系统设计与实现(代码+数据库+LW)
  • 文心一言大模型的“三级跳”:从收费到免费再到开源,一场AI生态的重构实验
  • LLM增强强化学习:开启智能决策的新篇章
  • 【文本】词嵌入经典模型:从one-hot到BERT
  • 北京市平谷区政协原主席王春辉接受纪律审查和监察调查
  • 特朗普支持率降至新低:宣布关税后骤降,选民最不满经济表现
  • 出发!陈冬、陈中瑞、王杰三名航天员领命出征
  • 上海车展上的双向奔赴:跨国车企融入中国创新,联手“在中国,为全球”
  • 王毅同伊朗外长阿拉格齐会谈
  • 神二十明日发射,长二F火箭推进剂加注工作已完成