当前位置: 首页 > news >正文

【Linux】基于阻塞队列的生产消费者模型

在这里插入图片描述

个人主页~


基于阻塞队列的生产消费者模型

  • 一、什么是生产消费者模型
  • 二、基于阻塞队列的生产消费者模型
    • 1、理论研究
    • 2、多生产多消费模型
      • (一)BlockQueue.hpp
      • (二)Task.hpp
      • (三)main.cpp
    • 3、误唤醒问题

一、什么是生产消费者模型

生产消费者模型就是通过一个容器来解决生产者和消费者的强耦合问题,生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接交给阻塞队列,消费者不找生产者索要数据,而是直接从阻塞队列中去取,这样一来,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力

对于生产消费者模型,我们有一个321规则,分别是3种关系,2种角色,1个交易场所

  • 三种关系:生产者和生产者的互斥竞争关系,消费者和消费者的互斥竞争关系,生产者和消费者的互斥、同步关系
  • 两种角色:生产者和消费者
  • 一个交易场所:特定结构的内存空间(如阻塞队列)

二、基于阻塞队列的生产消费者模型

1、理论研究

在这里插入图片描述
在多线程编程中,阻塞队列是一种常用于实现生产者和消费者模型的数据结构,其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中再次被放入元素,当队列满时,往队列中存放元素的操作也会被阻塞,直到有元素从队列中被获取

生产消费者模型最大的好处是,也是生产消费者模型效率高的原因是:在消费者获取数据(一般是网络数据)的时候,生产者可以生产数据,生产者放入数据的时候,消费者可以处理数据,虽然特定内存结构,也就是临界资源区是有锁的,只能由单线程通过,只要将时间合理化,我们就可以实现生产者和消费者的高效率工作,并且将发送数据的线程和处理数据的线程解耦合
在这里插入图片描述

2、多生产多消费模型

(一)BlockQueue.hpp

#pragma once#include <iostream>
#include <queue>
#include <pthread.h>
//定义一个模版类,方便我们使用任何类型进行生产消费
template <class T>
//定义一个阻塞队列
class BlockQueue
{//队列默认最大容量static const int defalutnum = 20;public:BlockQueue(int maxcap = defalutnum) : maxcap_(maxcap){//初始化互斥锁和生产者和消费者的条件变量pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&c_cond_, nullptr);pthread_cond_init(&p_cond_, nullptr);//下面注释掉的是设置水位线,设置最低最高水位线//在阻塞队列中的数据,在低于最低水位线时是不可被获取的,只能写入//在高于最高水位线时是不可被写入的,只能获取// low_water_ = maxcap_/3;// high_water_ = (maxcap_*2)/3;}//从队列头取出元素返回T pop(){pthread_mutex_lock(&mutex_);//加锁//只能用while不能用if,原因是会出现误唤醒问题,下面说while (q_.size() == 0){pthread_cond_wait(&c_cond_, &mutex_); }T out = q_.front();q_.pop();//这里是加了水位线的版本,在低于水位线的时候要唤醒生产者// if(q_.size()<low_water_) pthread_cond_signal(&p_cond_);pthread_cond_signal(&p_cond_); pthread_mutex_unlock(&mutex_);//解锁return out;}void push(const T &in){pthread_mutex_lock(&mutex_);//加锁//同pop函数while (q_.size() == maxcap_){pthread_cond_wait(&p_cond_, &mutex_); }q_.push(in); //这里是加了水位线的版本,在高于水位线的时候要唤醒消费者// if(q_.size() > high_water_) pthread_cond_signal(&c_cond_);pthread_cond_signal(&c_cond_);pthread_mutex_unlock(&mutex_);//解锁}//析构函数~BlockQueue(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&c_cond_);pthread_cond_destroy(&p_cond_);}private:std::queue<T> q_; int maxcap_; // 极大值pthread_mutex_t mutex_;pthread_cond_t c_cond_;pthread_cond_t p_cond_;//最低最高水位线// int low_water_;// int high_water_;
};

(二)Task.hpp

#pragma once
#include <iostream>
#include <string>//定义运算方法
std::string opers = "+-*/%";//枚举错误
enum
{DivZero = 1,ModZero,Unknown
};class Task
{
public:Task(int x, int y, char op) : data1_(x), data2_(y), oper_(op), result_(0), exitcode_(0){}void run(){switch (oper_){case '+':result_ = data1_ + data2_;break;case '-':result_ = data1_ - data2_;break;case '*':result_ = data1_ * data2_;break;case '/':{if (data2_ == 0)exitcode_ = DivZero;elseresult_ = data1_ / data2_;}break;case '%':{if (data2_ == 0)exitcode_ = ModZero;elseresult_ = data1_ % data2_;}break;default:exitcode_ = Unknown;break;}}//伪函数,通过重载()使run可以像函数一样调用void operator()(){run();}//返回的运算结果以及错误代码std::string GetResult(){std::string r = std::to_string(data1_);r += oper_;r += std::to_string(data2_);r += "=";r += std::to_string(result_);r += "[code: ";r += std::to_string(exitcode_);r += "]";return r;}//返回运算表达式std::string GetTask(){std::string r = std::to_string(data1_);r += oper_;r += std::to_string(data2_);r += "=?";return r;}~Task(){}private:int data1_;int data2_;char oper_;int result_;int exitcode_;
};

(三)main.cpp

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <ctime>void *Consumer(void *args)
{BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);while (true){// 消费Task t = bq->pop();// 计算t();//模拟消费者处理任务std::cout << "处理任务: " << t.GetTask() << " 运算结果是: " << t.GetResult() << " thread id: " << pthread_self() << std::endl;}
}void *Productor(void *args)
{int len = opers.size();BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);int x = 10;int y = 20;while (true){// 用随机数运算模拟生产者生产数据int data1 = rand() % 10 + 1; // [1,10]usleep(10);int data2 = rand() % 10;char op = opers[rand() % len];Task t(data1, data2, op);// 生产bq->push(t);std::cout << "生产了一个任务: " << t.GetTask() << " thread id: " << pthread_self() << std::endl;sleep(1);}
}int main()
{//随机数种子srand(time(nullptr));//给阻塞队列传一个任务BlockQueue<Task> *bq = new BlockQueue<Task>();//多生产者多消费者pthread_t c[3], p[5];for (int i = 0; i < 3; i++){pthread_create(c + i, nullptr, Consumer, bq);}for (int i = 0; i < 5; i++){pthread_create(p + i, nullptr, Productor, bq);}for (int i = 0; i < 3; i++){pthread_join(c[i], nullptr);}for (int i = 0; i < 5; i++){pthread_join(p[i], nullptr);}delete bq;return 0;
}

在这里插入图片描述

3、误唤醒问题

误唤醒问题就是在调用pop函数或者push函数的时候可能会引起的,下面我们再把代码贴出来,然后把上面有过的注释去掉

//...T pop(){pthread_mutex_lock(&mutex_);while (q_.size() == 0) //不能调用if而要用while{pthread_cond_wait(&c_cond_, &mutex_); }T out = q_.front();q_.pop();pthread_cond_signal(&p_cond_); pthread_mutex_unlock(&mutex_);return out;}void push(const T &in){pthread_mutex_lock(&mutex_);while (q_.size() == maxcap_){pthread_cond_wait(&p_cond_, &mutex_); }q_.push(in); pthread_cond_signal(&c_cond_);pthread_mutex_unlock(&mutex_);//...

在多生产者 - 多消费者并发编程场景中,误唤醒现象较为常见,假定队列当前处于满状态,当一个消费者线程成功消费一个数据后,队列中会空出一个位置,随后,线程可能多次调用pthread_cond_signal(&p_cond_) 函数,唤醒了一批正在 p_cond_ 条件变量下等待的生产者线程,由于被唤醒的生产者线程需要重新竞争互斥锁,这些线程之间呈现出互斥关系,在先前执行消费操作的线程释放锁之后,仅有一个生产者线程能够成功获取锁,其余虽被唤醒但未能抢到锁的生产者线程只能在锁处等待

当成功获取锁的生产者线程完成数据生产操作后,队列可能再次达到满状态,此时,该线程会调用 pthread_cond_signal(&c_cond_) 函数唤醒一个消费者线程,随后释放锁,在此情形下,被唤醒的线程不仅包括刚刚被唤醒的消费者线程,还涵盖之前被唤醒却未抢到锁的生产者线程,它们会同时参与锁的竞争,若使用 if 语句来判断队列是否已满,当某个生产者线程抢到锁后,可能不会再次对队列状态进行检查,直接尝试向已满的队列中添加数据,从而引发错误

因此,为确保线程安全,应使用 while 循环来包裹 pthread_cond_wait 函数,当一个线程被唤醒并成功获取锁后,不应直接执行队列操作(无论是生产数据还是消费数据),而应再次检查资源是否满足操作条件,若资源就绪,则可继续执行队列操作;若资源未就绪,则应再次调用 pthread_cond_wait 函数,使线程进入休眠状态,等待后续唤醒


今日分享就到这了~

在这里插入图片描述

相关文章:

  • 火语言RPA--发送邮件
  • 树莓派安装GStreamer ,opencv支持, 并在虚拟环境中使用的安装方法
  • opencv--图像变换
  • 使用QML Tumbler 实现时间日期选择器
  • express的中间件,全局中间件,路由中间件,静态资源中间件以及使用注意事项 , 获取请求体数据
  • BOM与DOM(解疑document window关系)
  • 看一看 中间件Middleware
  • JVM性能优化之老年代参数设置
  • 【前端】手写代码输出题易错点汇总
  • git检查提交分支和package.json的version版本是否一致
  • 使用vue2开发一个医疗预约挂号平台-前端静态网站项目练习
  • ASP.NET MVC​ 入门指南
  • JAVA设计模式——(六)装饰模式(Decorator Pattern)
  • 建造者模式:分步构建复杂对象的设计模式
  • 罗伯·派克:Go语言创始者的极客人生
  • 【项目管理】进度网络图 笔记
  • Vue 2 的响应式 API 和 Vue 3 的组合式 API 的详细对比,从核心机制、使用方式、代码示例及优缺点展开
  • Linux:git和gdb/cgdb
  • 多线程(线程安全)
  • MacOS上如何运行内网穿透详细教程
  • 为什么猛起身会头晕?你的身体在发出这个警报
  • 2025一季度,上海有两把刷子
  • 董明珠的接班人还是董明珠
  • 建行原副行长章更生被开除党籍:靠贷吃贷,大搞权钱交易
  • 浙江严禁中小学节假日集体补课,省市县教育部门公布举报电话
  • A股三大股指集体高开,黄金股大幅回调