当前位置: 首页 > news >正文

【Linux】清晰思路讲解:POSIX信号量、基于环形队列的生产消费模型、线程池。

📚 博主的专栏

🐧 Linux   |   🖥️ C++   |   📊 数据结构  | 💡C++ 算法 | 🌐 C 语言

上篇文章:   条件变量、基于阻塞队列的生产者消费者模型 

下篇文章: linux多线程终章、单例模式

目录

POSIX信号量

POSIX信号量类型

核心API接口

1. 未命名信号量

2. 命名信号量

 PV操作

PV操作的本质

1. P操作(Proberen)

2. V操作(Verhogen)

PV操作的经典应用场景

1. 互斥访问(Mutex)

基于环形队列的生产消费模型 

信号量能做到以下几点:

使用信号量实现模型的方案:

代码1.0

代码2.0:生产者给消费者派发任务

代码3.0:多生产,多消费,加锁 mutex

问题:是先加锁,还是先申请信号量?

封装环形队列2.0

多生产多消费的意义:让处理数据和获取数据有更好的并发度

问题:在编写申请信号量处,对资源进行使用,申请,为什么不需要判断一下是否满足条件?

这是因为信号量本身就是判断条件

二元信号量

线程池

线程池的简单理解:餐厅服务员团队 🍔

解决方案:线程池(固定服务员团队)

实际应用场景

线程池实现:


POSIX信号量

POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。在Linux中,POSIX信号量是一种用于进程或线程间同步的机制,它比System V信号量更简洁高效。以下是其核心接口及使用方法的详细说明:

POSIX信号量类型

  • 命名信号量:通过名称标识,可用于不相关进程间的同步。

  • 未命名信号量(基于内存的信号量):通常用于同一进程内的线程间或相关进程间的同步。


核心API接口

1. 未命名信号量
#include <semaphore.h>// 初始化信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
// pshared: 0表示线程间共享,非0表示进程间共享
// value: 信号量初始值// 等待信号量(P操作)
int sem_wait(sem_t *sem);      // 阻塞直到信号量>0
int sem_trywait(sem_t *sem);   // 非阻塞版本
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);// 释放信号量(V操作)
int sem_post(sem_t *sem);// 销毁信号量
int sem_destroy(sem_t *sem);
2. 命名信号量
#include <semaphore.h>
#include <fcntl.h>// 创建/打开命名信号量
sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);
// name: 格式如"/mysem",必须以斜杠开头
// oflag: O_CREAT、O_EXCL等标志
// mode: 权限位(如0644)
// value: 初始值// 关闭信号量
int sem_close(sem_t *sem);// 删除系统中的信号量
int sem_unlink(const char *name);

上一篇博客的:生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序(POSIX信号量):

首先我们先再次熟悉PV操作:

 PV操作

PV操作是操作系统和并发编程中的核心同步原语,由荷兰计算机科学家Edsger Dijkstra在1965年提出,用于解决多进程/线程间的互斥与同步问题。其名称来源于荷兰语中的 Proberen(测试) 和 Verhogen(增加),对应信号量的两种原子操作

PV操作的本质

PV操作基于信号量(Semaphore)实现,信号量是一个整型变量,表示资源的可用数量。通过两个原子操作控制资源访问:

操作行为描述伪代码示例实际函数(POSIX)
P操作申请资源(减操作)wait(sem)sem_wait()
V操作释放资源(加操作)signal(sem)sem_post()
1. P操作(Proberen)
  • 动作:尝试减少信号量的值。

    • 若信号量 sem > 0:将其减1,进程继续执行。

    • 若信号量 sem ≤ 0:进程阻塞,直到信号量变为正数。

  • 原子性:整个操作不可中断,确保并发安全。

  • 用途:进入临界区前获取资源锁。

伪代码实现

void P(semaphore sem) {sem.value--;if (sem.value < 0) {将当前进程加入sem的等待队列;阻塞该进程;}
}
2. V操作(Verhogen)
  • 动作:增加信号量的值。

    • 若信号量的等待队列非空:唤醒一个阻塞的进程。

    • 若等待队列为空:信号量值加1。

  • 原子性:操作不可分割。

  • 用途:退出临界区后释放资源锁。

伪代码实现

void V(semaphore sem) {sem.value++;if (sem.value <= 0) {从sem的等待队列中移除一个进程P;唤醒进程P;}
}

PV操作的经典应用场景

1. 互斥访问(Mutex)

  • 目标:确保同一时刻仅一个线程进入临界区。

实现:初始化信号量为1(二进制信号量)。

sem_t mutex;
sem_init(&mutex, 0, 1);  // 初始值为1// 线程代码
void thread_func() {sem_wait(&mutex);   // P操作// 临界区操作sem_post(&mutex);    // V操作
}

基于环形队列的生产消费模型 

以前在数据结构专栏,有做过环形队列的OJ题

目标:协调生产者和消费者对缓冲区的访问 

环形队列采用数组模拟,用模运算来模拟环状特性。 

环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,当队列为空||为满的时候,head == tail,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置(head == end + 1,当end到head-1的位置的时候不访问),作为满的状态。

但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程

我们要关心的是,多线程如何在环形队列中进行生产和消费

1.单生产,单消费

2.多生产,多消费

生产和消费指向同一个位置时:在局部空间访问资源要具有顺序性,互斥性

1.队列为空,优先让谁先访问

        生产者先生产

2.队列满了,让谁访问?

        让消费者消费,如果生产者再生产,会覆盖原有数据

3.队列不为空 && 队列不为满

        head != tail 允许生产和消费同时进行!

信号量能做到以下几点:

信号量实现互斥与同步

a.不让生产者把消费者套一个圈

b.不能让消费者,超过生产者       

1.队列为空,让生产者来生产

2.队列满了,让消费者来消费

使用信号量实现模型的方案:

消费者所关心的资源是数据资源,对于生产者,所关心的资源是放数据的空间资源

数据资源+空间资源 = 队列大小N

数据资源量:sem_t data_sem = 0;

空间资源量:sem_t space_sem = N;

互斥:保证任意一个时刻只允许一个线程访问临界区

生产者:

P操作申请空间,开始生产,空间资源-1

P(space_sem);

V操作,当生产者生产数据资源之后,将数据资源+1

V(data_sem)

消费者:

P操作申请数据资源,进入环形队列进行消费,数据资源-1

P(data_sem);

V操作,当消费者将数据资源拿走之后,空间资源+1

V(space_sem);

注意看代码中的注释:

代码1.0

封装环形队列1.0

#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <semaphore.h>template <class T>
class RingQueue
{
private:// 等待信号量void P(sem_t &s){sem_wait(&s);}// 释放信号量void V(sem_t &s){sem_post(&s);}public:RingQueue(int max_cap): _ringqueue(max_cap), _max_cap(max_cap), _c_step(0), _p_step(0){// 信号量需要调用函数初始化://  第二个参数pshared: 0表示线程间共享,非0表示进程间共享sem_init(&_data_sem, 0, 0);sem_init(&_space_sem, 0, max_cap);}// 入队列,生产者调用void Push(const T &in){// 1.申请空间信号量P(_space_sem);// 2.向环形队列中进行生产_ringqueue[_p_step] = in;_p_step++;_p_step %= _max_cap;// 3.释放数据资源V(_data_sem);}// 出队列void Pop(T *out){// 1.申请数据信号量P(_data_sem);// 2.从环形队列中得到数据*out = _ringqueue[_c_step];_c_step++;_c_step %= _max_cap;// 3.释放空间资源V(_space_sem);}~RingQueue(){sem_destroy(&_data_sem);sem_destroy(&_space_sem);}private:// 1.队列和容量std::vector<T> _ringqueue;int _max_cap; // 最大容量// 2.消费者和生产者对应的位置,下标int _c_step;int _p_step;// 3.数据信号量和空间信号量sem_t _data_sem;sem_t _space_sem;
};

测试代码1.0:

#include <iostream>
#include <string>
#include "RingQueue.hpp"
#include <unistd.h>
#include <pthread.h>
#include <ctime>void *Consumer(void *args)
{RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);while (true){//sleep(1);int data = 0;// 1.进行消费rq->Pop(&data);// 2.进行数据处理std::cout << "Consumer ->" << data << std::endl;}
}
void *Productor(void *args)
{RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);while (true){//sleep(1);// 1.构造数据int data = rand() % 10 + 1;// 2.生产数据rq->Push(data);std::cout << "Productor -> " << data << std::endl;}
}int main()
{srand(time(nullptr) ^ getpid());// 构造环形队列RingQueue<int> *rq = new RingQueue<int>(10);pthread_t c, p;pthread_create(&c, nullptr, Consumer, rq);pthread_create(&p, nullptr, Productor, rq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}

让消费者每sleep(1)再消费,生产者一直生产:观察现象,当生产的数据数量满,消费者消费一次,生产者再生产一个。

让生产者每sleep(1)再生产,消费者一直消费:观察现象,生产者每1秒生产一个。消费者才能消费一个:

代码2.0:生产者给消费者派发任务

Task.hpp:

#pragma once#include <iostream>
#include <functional>// typedef std::function<void()> task_t;
using task_t = std::function<void()>;//返回值是void参数为空的函数对象,类似于一个类,因为底层是仿函数// void Download()
// {
//     std::cout << "我是一个下载的任务" << std::endl;
// }class Task
{
public:Task(){}Task(int x, int y) : _x(x), _y(y){}// 做加法void Excute(){_result = _x + _y;}void operator()(){Excute();}std::string debug(){std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";return msg;}std::string result(){std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);return msg;}private:int _x;int _y;int _result;
};

测试代码2.0:

#include <iostream>
#include <string>
#include "RingQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <pthread.h>
#include <ctime>void *Consumer(void *args)
{RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);while (true){// sleep(1);// int data = 0;//构建空任务Task t;// 1.进行消费,执行rq->Pop(&t);// 2.进行数据处理t();std::cout << "Consumer -> " << t.result() << std::endl;}
}
void *Productor(void *args)
{RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);while (true){sleep(1);// 1.构造任务int x = rand() % 10 + 1;usleep(x*1000);int y = rand() % 10 + 1;Task t(x, y);// 2.生产数据rq->Push(t);std::cout << "Productor -> " << t.debug() << std::endl;}
}int main()
{srand(time(nullptr) ^ getpid());// 构造环形队列RingQueue<Task> *rq = new RingQueue<Task>(10);pthread_t c, p;pthread_create(&c, nullptr, Consumer, rq);pthread_create(&p, nullptr, Productor, rq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}

运行结果:

代码3.0:多生产,多消费,加锁 mutex

生产者之间存在互斥关系,消费者之间存在互斥关系,生产者和消费者之间我们已经处理了。

现在要处理的是,多个生产者先自己竞争,多个消费者也自己竞争,因此加两把锁:

private:
pthread_mutex_t _c_mutex;
pthread_mutex_t _p_mutex;

实际上,此时已经维护好了,生产者之间存在互斥关系,消费者之间存在互斥关系,生产者与消费者之间存在的同步与互斥的关系:

问题:是先加锁,还是先申请信号量?

例如生产者:

先让生产者全都先竞争(先加锁),再获取信号量:

一个生产者竞争到锁,然后申请信号量,单生产。在他没有释放锁的时候,其他线程在申请锁这里,等待他释放锁。这种情况会导致申请锁和申请信号量一定是串行的。

先让生产者全都申请信号量,再竞争锁(先加锁):(更有效率)

每一个生产者先预定好了票(预定好空间资源),再去争夺锁,谁的优先级更高,谁的竞争力更强,去拥有锁,进来也是安全的。一旦一个锁释放了,下一个争夺到锁的线程直接就生产,而不是先申请信号量再生产。

这就像是,在想要看电影的时候,先预定好票,临界开场就能排队后直接检票进去看,而不是在排队到我的时候,再去买票。 

因此我们选择,先申请信号量(信号量的申请具有原子性),先瓜分资源,再争夺锁:

封装环形队列2.0
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <semaphore.h>
#include <mutex>template <class T>
class RingQueue
{
private:// 等待信号量void P(sem_t &s){sem_wait(&s);}// 释放信号量void V(sem_t &s){sem_post(&s);}public:RingQueue(int max_cap): _ringqueue(max_cap), _max_cap(max_cap), _c_step(0), _p_step(0){//  1.信号量需要调用函数初始化://  第二个参数pshared: 0表示线程间共享,非0表示进程间共享sem_init(&_data_sem, 0, 0);sem_init(&_space_sem, 0, max_cap);// 2.互斥锁需要初始化pthread_mutex_init(&_c_mutex, nullptr);pthread_mutex_init(&_p_mutex, nullptr);}// 入队列,生产者调用void Push(const T &in){// 0.申请空间信号量P(_space_sem);// 1.生产者线程竞争申请锁:pthread_mutex_lock(&_p_mutex);// 2.向环形队列中进行生产_ringqueue[_p_step] = in;_p_step++;_p_step %= _max_cap;// 3.解锁pthread_mutex_unlock(&_p_mutex);// 4.释放数据资源V(_data_sem);}// 出队列void Pop(T *out){// 0.申请数据信号量P(_data_sem);// 1.消费者线程竞争申请锁pthread_mutex_lock(&_c_mutex);// 2.从环形队列中得到数据*out = _ringqueue[_c_step];_c_step++;_c_step %= _max_cap;// 3.解锁pthread_mutex_unlock(&_c_mutex);// 4.释放空间资源V(_space_sem);}~RingQueue(){sem_destroy(&_data_sem);sem_destroy(&_space_sem);pthread_mutex_destroy(&_c_mutex);pthread_mutex_destroy(&_p_mutex);}private:// 1.队列和容量std::vector<T> _ringqueue;int _max_cap; // 最大容量// 2.消费者和生产者对应的位置,下标int _c_step;int _p_step;// 3.数据信号量和空间信号量sem_t _data_sem;sem_t _space_sem;// 4.消费者和生产者的互斥锁 pthread_mutex_t _c_mutex;pthread_mutex_t _p_mutex;
};

多生产多消费的意义:让处理数据和获取数据有更好的并发度

生产和消费不能只看,他们从环形队列放数据和拿数据的时间,真实的场景下,放数据和拿数据的时间只占很小的比重,处理数据、获取数据才占更大的比重,因此多线程在根本上是为了解决:

让处理数据和获取数据有更好的并发度

问题:在编写申请信号量处,对资源进行使用,申请,为什么不需要判断一下是否满足条件?

在上一篇博客讲到添加条件变量唤醒线程的时候,都需要先判断一下,阻塞队列是否满,是否为空,为什么这里不需要条件判断

这是因为信号量本身就是判断条件

信号量的本质就是采用原子性,将内部资源的使用状态,让我们在外部就能直接得到,直接判断出。

二元信号量

将资源看做整体,可以使用互斥锁来维持互斥关系,也可以使用信号量,将信号量的初始值设为1,来表征这整个资源,信号量(1,0)这就是二元信号量等同于互斥锁。

线程池

线程池的简单理解:餐厅服务员团队 🍔

想象你开了一家汉堡店,顾客源源不断。如果每来一个顾客,你就现招一个服务员为他服务,会发生什么?

  • 问题:招聘服务员耗时(创建线程开销大),顾客等得不耐烦(响应慢)。

  • 更糟的是:如果瞬间来100个顾客,招100个服务员挤爆厨房(系统资源耗尽)!


解决方案:线程池(固定服务员团队)

  1. 提前准备:雇佣3个固定服务员(线程池大小=3)。

  2. 任务队列:顾客按顺序排成一队(任务队列)。

  3. 分配规则:空闲的服务员从队首接单,服务完继续接下一个。

效果

  • 无需反复招人,服务员复用(线程复用)。

  • 最多3个顾客同时被服务(控制并发)。

  • 新顾客排队等待,秩序井然(任务队列管理)。


以前有写过进程池:进程间通信、进程池

进程池的本质就是生产消费模型,线程池也是生产消费模型。

实际应用场景

  • Web服务器:同时处理上千用户请求。

  • 批量处理数据:比如同时下载100个文件。

  • 游戏服务器:管理大量玩家动作事件。

线程池实现:

1.Thread.hpp()之前所讲过的线程的封装

#pragma once
#include <iostream>
#include <string>
#include <pthread.h>#include<functional>namespace ThreadModle
{// 线程要执行的方法// typedef void (*func_t)(ThreadData *td); // 函数指针类型// 等同于typedef std::function<void()> func_t;using func_t = std::function<void()>;//返回值为void,参数为空的一个函数类型class Thread{public:void Excute() // 调用回调函数的成员方法{std::cout << _name << ",is running" << std::endl;_isrunning = true;_func();_isrunning = false;}// 未来想要创建一个线程,一方面在创建线程的时候传递线程名,传递一个回调方法Thread(const std::string &name, func_t func): _name(name), _func(func){std::cout << "create " << name << " done" << std::endl;}static void *ThreadRoutine(void *args) // 创建的新线程都会执行这个方法{// 执行回调方法,每一个线程都能执行一系列的方法Thread *self = static_cast<Thread *>(args); // 获得了当前对象self->Excute();return nullptr;}bool Start(){                                                                  // 创建线程成功就返回0int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this); // ThreadRoutine线程的固定历程if (n != 0)return false;return true;}std::string Status(){if (_isrunning)return "running";elsereturn "sleep";}void Stop(){if (_isrunning){pthread_cancel(_tid); // 取消线程_isrunning = false;   // 设置状态为false线程停止std::cout << _name << " Stop" << std::endl;}}void Join(){pthread_join(_tid, nullptr);std::cout << _name << " Joined" << std::endl;}std::string Name(){return _name;}~Thread(){// Stop();// Join();}private:std::string _name; // 线程名pthread_t _tid;    // 线程所对应的idbool _isrunning;   // 线程此时是否正在运行func_t _func; // 线程要执行的回调函数};}

ThreadPool.hpp

注意:这里涉及到的知识点,包装器bind的用法:

包装器

这里出错的原因是,我们要求任务队列中的任务都应该是参数为空,返回值为void的函数类型,而HandlerTask属于成员函数,隐含了当前对象this指针,因此不满足函数类型的条件,当然无法传进。因此使用bind绑定来调整参数的个数,相当于构造一个function对象,用类ThreadPool中的方法HandlerTask,和当前对象this构造出这个func对象。func对象的参数在接收方看来就是空的。

ThreadPool.hpp1.0:

#pragma once#include <string>
#include <iostream>
#include <vector>
#include <pthread.h>
#include <queue>
#include "Thread.hpp"
#include "Task.hpp"
#include <unistd.h>
using namespace ThreadModle;static const int gdefaultnum = 5;void test()
{while (true){std::cout << "hello linux" << std::endl;sleep(1);}
}template <class T>
class ThreadPool
{
private://锁住任务队列(保护任务队列)void LockTaskQueue(){pthread_mutex_lock(&_mutex);}void UnLockTaskQueue(){pthread_mutex_unlock(&_mutex);}bool isEmpty(){return _task_queue.empty();}void WakeUp(){pthread_cond_signal(&_cond);}void Sleep(){pthread_cond_wait(&_cond, &_mutex);//休眠时需要释放掉锁}//处理任务,注意在传这个函数指针的时候要注意,他内部有隐含有this指针void HandlerTask(){while(true){//取任务LockTaskQueue();//访问临界资源while(isEmpty()) //如果任务队列为空,线程就能休眠了,防止伪唤醒的情况这里使用while,不满足情况就一直wait,再判断{_sleep_thread_num++;//任务为空,将休眠线程+1Sleep();_sleep_thread_num--;}//有任务 || 被唤醒,在这里一定是有任务才被唤醒T t = _task_queue.front();_task_queue.pop();UnLockTaskQueue();//处理任务,此处不能/也不能在临界区//因为这个任务一旦被取出来了,就从队列当中被移走了,放在一个临时空间里t(); std::cout << t.result() << std::endl;}}
public:// 实际上对于线程池需要外部传的就是线程的数量ThreadPool(int thread_num = gdefaultnum): _thread_num(thread_num), _isrunning(false),_sleep_thread_num(0)//最开始没有线程在休眠{pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}// 初始化线程池void Init(){//构造一个function线程池类来构造的对象func_t func = std::bind(&ThreadPool::HandlerTask, this);//直接把this传过去(改变参数数量)for (int i = 0; i < _thread_num; i++){std::string threadname = "thread-" + std::to_string(i + 1);_threads.emplace_back(threadname, func);}}// 让线程池中的线程工作void Start(){for (auto &thread : _threads){thread.Start();}}// 让线程池中线程停止工作void Stop(){}// 像线程池推送任务void Equeue(const T& in){LockTaskQueue();_task_queue.push(in);// 唤醒线程,是否有正在休眠的线程_sleep_thread_numif(_sleep_thread_num > 0)//只要有线程在休眠,就唤醒WakeUp(); //没有休眠的线程,就会在这等待UnLockTaskQueue();}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:// 1.期望的线程数量int _thread_num;// 2.线程队列std::vector<Thread> _threads;// 3.任务队列 --- 共享资源 --- 需要被保护std::queue<T> _task_queue; // 4.线程池是否正在运行bool _isrunning;// 因为需要线程访问任务队列,因此访问任务队列的代码就是临界资源// 5.加锁保护队列,为什么不用信号量,因为当前队列只能整体使用,pthread_mutex_t _mutex;// 6.条件变量pthread_cond_t _cond;// 7.计数器,休眠线程的数量int _sleep_thread_num;
};

Task.hpp(所用到的就是前面生产者消费者模型当中封装的任务)

Main.cc

#include"ThreadPool.hpp"
#include<memory>
#include "Task.hpp"int main()
{// std::unique_ptr<ThreadPool> tp = std::make_unique<ThreadPool>;//c++14智能指针ThreadPool<Task> *tp = new ThreadPool<Task>();tp->Init();tp->Start();while(true){//不断地像线程池推送任务Task t(1,1);tp->Equeue(t);sleep(1);}tp->Stop();
}

运行结果:

现在我们要处理的问题是,哪一个线程处理的问题,想要打印出来,因此想到在构造线程的时候,function函数对类型当中都传入处理任务的线程名,也就是修改Thread.hpp中的代码:

修改HandlerTask:

运行后就会出现这样的报错:

static assertion failed: Wrong number of arguments for pointer-to-member

出错原因:bind传参数时候出现了问题,需要设定一个占位符,表示在绑定this对象参数后,我们的函数类型内部至少还有一个参数。具体看bind的用法。

func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);//直接把this传过去(改变参数数量)

线程排队的去执行任务:

让线程池中的所有线程,在执行完所有任务后,自动退出

注意在执行任务的时候,每个线程所执行的任务可能不一样,退出的基本条件是:

1.任务队列没有任务

2.线程想退出,所有的线程的状态都是Stop。

因此对于处理任务

1.如果当前没有任务,并且线程池并不退出正在运行,才让线程休眠

2.否则,就需要处理,只需要判定一种情况:任务队列为空了,并且运行状态为非运行,请他情况都让线程继续运行

 // 处理任务,注意在传这个函数指针的时候要注意,他内部有隐含有this指针void HandlerTask(const std::string &name){while (true){// 取任务LockTaskQueue();// 访问临界资源// 如果当前没有任务,并且线程池并不退出正在运行,才让线程休眠while (isEmpty() && _isrunning) // 如果任务队列为空,线程就能休眠了,防止伪唤醒的情况这里使用while,不满足情况就一直wait,再判断{_sleep_thread_num++; // 任务为空,将休眠线程+1Sleep();_sleep_thread_num--;}// 判定一种情况:任务队列为空了,并且运行状态为非运行if (isEmpty() && !_isrunning){std::cout << name << " quit" << std::endl;// 直接解锁任务队列;UnLockTaskQueue();// 直接退出循环:break;}// 有任务 || 被唤醒,在这里一定是有任务才被唤醒T t = _task_queue.front();_task_queue.pop();UnLockTaskQueue();// 处理任务,此处不能/也不能在临界区// 因为这个任务一旦被取出来了,就从队列当中被移走了,放在一个临时空间里t();std::cout << name << " : " << t.result() << std::endl;}}

将线程设置为Stop状态:

    void Stop(){_isrunning = false;}

仅仅将运行状态设为false,有可能所有的线程都在休眠,在判断:

线程无法被唤醒,因此线程无法退出 

因此封装一个唤醒所有线程的代码:

    //唤醒所有的线程,方便让线程退出void WakeUpAll(){pthread_cond_broadcast(&_cond);}

修改Stop()

    void Stop(){//保证他的安全性,带锁LockTaskQueue();_isrunning = false; //仅仅设为false,有可能所有的线程都在休眠,线程无法被唤醒,因此线程无法退出//因此需要唤醒所有的线程WakeUpAll();UnLockTaskQueue();}

如果此时已经将线程Stop了,但是此时上层如果不断的像线程池中push任务,任务队列一直不为空,那么不是还要一直处理任务吗

优化:因此在入队列的时候,要保证线程池的状态是运行状态

    // 像线程池推送任务void Equeue(const T &in){LockTaskQueue();//线程池必须处于运行状态才能push任务if (_isrunning){_task_queue.push(in);// 唤醒线程,是否有正在休眠的线程_sleep_thread_numif (_sleep_thread_num > 0) // 只要有线程在休眠,就唤醒WakeUp();              // 没有休眠的线程,就会在这等待}UnLockTaskQueue();}

测试代码:

#include "ThreadPool.hpp"
#include <memory>#include "Task.hpp"int main()
{// std::unique_ptr<ThreadPool> tp = std::make_unique<ThreadPool>;//c++14智能指针ThreadPool<Task> *tp = new ThreadPool<Task>();tp->Init();tp->Start();int cnt = 10;while (cnt--){// 不断地像线程池推送任务Task t(1, 1);tp->Equeue(t);sleep(1);std::cout << "cnt: " << cnt << std::endl;}tp->Stop();std::cout << "thread pool stop" << std::endl;sleep(10);return 0;
}

运行结果:

继续优化:

实际上在我们运行的代码当中还存在一些问题,运行的结果打印的很不优美

在实际情况当中,我们的多线程,线程池的代码当中,都需要日志信息

下一篇文章将讲解用日志来优化我们的代码,以及单例模式

结语:

       随着这篇关于题目解析的博客接近尾声,我衷心希望我所分享的内容能为你带来一些启发和帮助。学习和理解的过程往往充满挑战,但正是这些挑战让我们不断成长和进步。我在准备这篇文章时,也深刻体会到了学习与分享的乐趣。    

         在此,我要特别感谢每一位阅读到这里的你。是你的关注和支持,给予了我持续写作和分享的动力。我深知,无论我在某个领域有多少见解,都离不开大家的鼓励与指正。因此,如果你在阅读过程中有任何疑问、建议或是发现了文章中的不足之处,都欢迎你慷慨赐教     。

        你的每一条反馈都是我前进路上的宝贵财富。同时,我也非常期待能够得到你的点赞、收藏,关注这将是对我莫大的支持和鼓励。当然,我更期待的是能够持续为你带来有价值的内容。

相关文章:

  • 基于 Elasticsearch 8.12.0 集群热词实现
  • Hello, Dirty page
  • LabVIEW发电机励磁系统远程诊断
  • P8512 [Ynoi Easy Round 2021] TEST_152 Solution
  • conda环境独立管理cudatoolkit
  • vulnhub five86系列靶机合集
  • HTTP:十.cookie机制
  • 2000-2017年各省城市液化石油气供气总量数据
  • 硬件工程师笔记——电子器件汇总大全
  • HTML — 总结
  • LeetCode[225]用队列实现栈
  • LeetCode 每日一题 2563. 统计公平数对的数目
  • WEMOS LOLIN32
  • python之计算平面曲线离散点的曲率
  • vivo把三颗「主摄」放进了手机
  • cpp知识章节
  • SpringAI系列 - MCP篇(一) - 什么是MCP
  • VitePress搭建-接入giscus的注意事项。
  • 第十章 继承与派生
  • PCL库开发入门
  • 北朝时期的甲胄
  • 最大涨幅9800%!金价新高不断,引发期权“末日轮”效应,沪金期权多张合约大涨
  • 外贸50城,谁在“扛大旗”?
  • 山西一国道塌陷致2死后续:地质雷达检测出10处道路病害
  • 周继红当选中国泳协主席,曾为国摘得首枚奥运跳水金牌
  • “小时光:地铁里的阅读”摄影展开幕,嘉宾共话日常生活与阅读