【Linux】清晰思路讲解:POSIX信号量、基于环形队列的生产消费模型、线程池。
📚 博主的专栏
🐧 Linux | 🖥️ C++ | 📊 数据结构 | 💡C++ 算法 | 🌐 C 语言
上篇文章: 条件变量、基于阻塞队列的生产者消费者模型
下篇文章: linux多线程终章、单例模式
目录
POSIX信号量
POSIX信号量类型
核心API接口
1. 未命名信号量
2. 命名信号量
PV操作
PV操作的本质
1. P操作(Proberen)
2. V操作(Verhogen)
PV操作的经典应用场景
1. 互斥访问(Mutex)
基于环形队列的生产消费模型
信号量能做到以下几点:
使用信号量实现模型的方案:
代码1.0
代码2.0:生产者给消费者派发任务
代码3.0:多生产,多消费,加锁 mutex
问题:是先加锁,还是先申请信号量?
封装环形队列2.0
多生产多消费的意义:让处理数据和获取数据有更好的并发度
问题:在编写申请信号量处,对资源进行使用,申请,为什么不需要判断一下是否满足条件?
这是因为信号量本身就是判断条件
二元信号量
线程池
线程池的简单理解:餐厅服务员团队 🍔
解决方案:线程池(固定服务员团队)
实际应用场景
线程池实现:
POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。在Linux中,POSIX信号量是一种用于进程或线程间同步的机制,它比System V信号量更简洁高效。以下是其核心接口及使用方法的详细说明:
POSIX信号量类型
命名信号量:通过名称标识,可用于不相关进程间的同步。
未命名信号量(基于内存的信号量):通常用于同一进程内的线程间或相关进程间的同步。
核心API接口
1. 未命名信号量
#include <semaphore.h>// 初始化信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
// pshared: 0表示线程间共享,非0表示进程间共享
// value: 信号量初始值// 等待信号量(P操作)
int sem_wait(sem_t *sem); // 阻塞直到信号量>0
int sem_trywait(sem_t *sem); // 非阻塞版本
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);// 释放信号量(V操作)
int sem_post(sem_t *sem);// 销毁信号量
int sem_destroy(sem_t *sem);
2. 命名信号量
#include <semaphore.h>
#include <fcntl.h>// 创建/打开命名信号量
sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value);
// name: 格式如"/mysem",必须以斜杠开头
// oflag: O_CREAT、O_EXCL等标志
// mode: 权限位(如0644)
// value: 初始值// 关闭信号量
int sem_close(sem_t *sem);// 删除系统中的信号量
int sem_unlink(const char *name);
上一篇博客的:生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序(POSIX信号量):
首先我们先再次熟悉PV操作:
PV操作
PV操作是操作系统和并发编程中的核心同步原语,由荷兰计算机科学家Edsger Dijkstra在1965年提出,用于解决多进程/线程间的互斥与同步问题。其名称来源于荷兰语中的 Proberen(测试) 和 Verhogen(增加),对应信号量的两种原子操作。
PV操作的本质
PV操作基于信号量(Semaphore)实现,信号量是一个整型变量,表示资源的可用数量。通过两个原子操作控制资源访问:
操作 | 行为描述 | 伪代码示例 | 实际函数(POSIX) |
---|---|---|---|
P操作 | 申请资源(减操作) | wait(sem) | sem_wait() |
V操作 | 释放资源(加操作) | signal(sem) | sem_post() |
1. P操作(Proberen)
动作:尝试减少信号量的值。
若信号量
sem > 0
:将其减1,进程继续执行。若信号量
sem ≤ 0
:进程阻塞,直到信号量变为正数。原子性:整个操作不可中断,确保并发安全。
用途:进入临界区前获取资源锁。
伪代码实现:
void P(semaphore sem) {sem.value--;if (sem.value < 0) {将当前进程加入sem的等待队列;阻塞该进程;}
}
2. V操作(Verhogen)
动作:增加信号量的值。
若信号量的等待队列非空:唤醒一个阻塞的进程。
若等待队列为空:信号量值加1。
原子性:操作不可分割。
用途:退出临界区后释放资源锁。
伪代码实现:
void V(semaphore sem) {sem.value++;if (sem.value <= 0) {从sem的等待队列中移除一个进程P;唤醒进程P;}
}
PV操作的经典应用场景
1. 互斥访问(Mutex)
-
目标:确保同一时刻仅一个线程进入临界区。
实现:初始化信号量为1(二进制信号量)。
sem_t mutex;
sem_init(&mutex, 0, 1); // 初始值为1// 线程代码
void thread_func() {sem_wait(&mutex); // P操作// 临界区操作sem_post(&mutex); // V操作
}
基于环形队列的生产消费模型
以前在数据结构专栏,有做过环形队列的OJ题
目标:协调生产者和消费者对缓冲区的访问
环形队列采用数组模拟,用模运算来模拟环状特性。
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,当队列为空||为满的时候,head == tail,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置(head == end + 1,当end到head-1的位置的时候不访问),作为满的状态。
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程
我们要关心的是,多线程如何在环形队列中进行生产和消费
1.单生产,单消费
2.多生产,多消费
生产和消费指向同一个位置时:在局部空间访问资源要具有顺序性,互斥性
1.队列为空,优先让谁先访问
生产者先生产
2.队列满了,让谁访问?
让消费者消费,如果生产者再生产,会覆盖原有数据
3.队列不为空 && 队列不为满
head != tail 允许生产和消费同时进行!
信号量能做到以下几点:
信号量实现互斥与同步
a.不让生产者把消费者套一个圈
b.不能让消费者,超过生产者
1.队列为空,让生产者来生产
2.队列满了,让消费者来消费
使用信号量实现模型的方案:
消费者所关心的资源是数据资源,对于生产者,所关心的资源是放数据的空间资源
数据资源+空间资源 = 队列大小N
数据资源量:sem_t data_sem = 0;
空间资源量:sem_t space_sem = N;
互斥:保证任意一个时刻只允许一个线程访问临界区
生产者:
P操作申请空间,开始生产,空间资源-1
P(space_sem);
V操作,当生产者生产数据资源之后,将数据资源+1
V(data_sem)
消费者:
P操作申请数据资源,进入环形队列进行消费,数据资源-1
P(data_sem);
V操作,当消费者将数据资源拿走之后,空间资源+1
V(space_sem);
注意看代码中的注释:
代码1.0
封装环形队列1.0
#pragma once #include <iostream> #include <string> #include <vector> #include <semaphore.h>template <class T> class RingQueue { private:// 等待信号量void P(sem_t &s){sem_wait(&s);}// 释放信号量void V(sem_t &s){sem_post(&s);}public:RingQueue(int max_cap): _ringqueue(max_cap), _max_cap(max_cap), _c_step(0), _p_step(0){// 信号量需要调用函数初始化:// 第二个参数pshared: 0表示线程间共享,非0表示进程间共享sem_init(&_data_sem, 0, 0);sem_init(&_space_sem, 0, max_cap);}// 入队列,生产者调用void Push(const T &in){// 1.申请空间信号量P(_space_sem);// 2.向环形队列中进行生产_ringqueue[_p_step] = in;_p_step++;_p_step %= _max_cap;// 3.释放数据资源V(_data_sem);}// 出队列void Pop(T *out){// 1.申请数据信号量P(_data_sem);// 2.从环形队列中得到数据*out = _ringqueue[_c_step];_c_step++;_c_step %= _max_cap;// 3.释放空间资源V(_space_sem);}~RingQueue(){sem_destroy(&_data_sem);sem_destroy(&_space_sem);}private:// 1.队列和容量std::vector<T> _ringqueue;int _max_cap; // 最大容量// 2.消费者和生产者对应的位置,下标int _c_step;int _p_step;// 3.数据信号量和空间信号量sem_t _data_sem;sem_t _space_sem; };
测试代码1.0:
#include <iostream> #include <string> #include "RingQueue.hpp" #include <unistd.h> #include <pthread.h> #include <ctime>void *Consumer(void *args) {RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);while (true){//sleep(1);int data = 0;// 1.进行消费rq->Pop(&data);// 2.进行数据处理std::cout << "Consumer ->" << data << std::endl;} } void *Productor(void *args) {RingQueue<int> *rq = static_cast<RingQueue<int> *>(args);while (true){//sleep(1);// 1.构造数据int data = rand() % 10 + 1;// 2.生产数据rq->Push(data);std::cout << "Productor -> " << data << std::endl;} }int main() {srand(time(nullptr) ^ getpid());// 构造环形队列RingQueue<int> *rq = new RingQueue<int>(10);pthread_t c, p;pthread_create(&c, nullptr, Consumer, rq);pthread_create(&p, nullptr, Productor, rq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0; }
让消费者每sleep(1)再消费,生产者一直生产:观察现象,当生产的数据数量满,消费者消费一次,生产者再生产一个。
让生产者每sleep(1)再生产,消费者一直消费:观察现象,生产者每1秒生产一个。消费者才能消费一个:
代码2.0:生产者给消费者派发任务
Task.hpp:
#pragma once#include <iostream>
#include <functional>// typedef std::function<void()> task_t;
using task_t = std::function<void()>;//返回值是void参数为空的函数对象,类似于一个类,因为底层是仿函数// void Download()
// {
// std::cout << "我是一个下载的任务" << std::endl;
// }class Task
{
public:Task(){}Task(int x, int y) : _x(x), _y(y){}// 做加法void Excute(){_result = _x + _y;}void operator()(){Excute();}std::string debug(){std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";return msg;}std::string result(){std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);return msg;}private:int _x;int _y;int _result;
};
测试代码2.0:
#include <iostream>
#include <string>
#include "RingQueue.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <pthread.h>
#include <ctime>void *Consumer(void *args)
{RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);while (true){// sleep(1);// int data = 0;//构建空任务Task t;// 1.进行消费,执行rq->Pop(&t);// 2.进行数据处理t();std::cout << "Consumer -> " << t.result() << std::endl;}
}
void *Productor(void *args)
{RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);while (true){sleep(1);// 1.构造任务int x = rand() % 10 + 1;usleep(x*1000);int y = rand() % 10 + 1;Task t(x, y);// 2.生产数据rq->Push(t);std::cout << "Productor -> " << t.debug() << std::endl;}
}int main()
{srand(time(nullptr) ^ getpid());// 构造环形队列RingQueue<Task> *rq = new RingQueue<Task>(10);pthread_t c, p;pthread_create(&c, nullptr, Consumer, rq);pthread_create(&p, nullptr, Productor, rq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}
运行结果:
代码3.0:多生产,多消费,加锁 mutex
生产者之间存在互斥关系,消费者之间存在互斥关系,生产者和消费者之间我们已经处理了。
现在要处理的是,多个生产者先自己竞争,多个消费者也自己竞争,因此加两把锁:
private: pthread_mutex_t _c_mutex; pthread_mutex_t _p_mutex;
实际上,此时已经维护好了,生产者之间存在互斥关系,消费者之间存在互斥关系,生产者与消费者之间存在的同步与互斥的关系:
问题:是先加锁,还是先申请信号量?
例如生产者:
先让生产者全都先竞争(先加锁),再获取信号量:
一个生产者竞争到锁,然后申请信号量,单生产。在他没有释放锁的时候,其他线程在申请锁这里,等待他释放锁。这种情况会导致申请锁和申请信号量一定是串行的。
先让生产者全都申请信号量,再竞争锁(先加锁):(更有效率)
每一个生产者先预定好了票(预定好空间资源),再去争夺锁,谁的优先级更高,谁的竞争力更强,去拥有锁,进来也是安全的。一旦一个锁释放了,下一个争夺到锁的线程直接就生产,而不是先申请信号量再生产。
这就像是,在想要看电影的时候,先预定好票,临界开场就能排队后直接检票进去看,而不是在排队到我的时候,再去买票。
因此我们选择,先申请信号量(信号量的申请具有原子性),先瓜分资源,再争夺锁:
封装环形队列2.0
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <semaphore.h>
#include <mutex>template <class T>
class RingQueue
{
private:// 等待信号量void P(sem_t &s){sem_wait(&s);}// 释放信号量void V(sem_t &s){sem_post(&s);}public:RingQueue(int max_cap): _ringqueue(max_cap), _max_cap(max_cap), _c_step(0), _p_step(0){// 1.信号量需要调用函数初始化:// 第二个参数pshared: 0表示线程间共享,非0表示进程间共享sem_init(&_data_sem, 0, 0);sem_init(&_space_sem, 0, max_cap);// 2.互斥锁需要初始化pthread_mutex_init(&_c_mutex, nullptr);pthread_mutex_init(&_p_mutex, nullptr);}// 入队列,生产者调用void Push(const T &in){// 0.申请空间信号量P(_space_sem);// 1.生产者线程竞争申请锁:pthread_mutex_lock(&_p_mutex);// 2.向环形队列中进行生产_ringqueue[_p_step] = in;_p_step++;_p_step %= _max_cap;// 3.解锁pthread_mutex_unlock(&_p_mutex);// 4.释放数据资源V(_data_sem);}// 出队列void Pop(T *out){// 0.申请数据信号量P(_data_sem);// 1.消费者线程竞争申请锁pthread_mutex_lock(&_c_mutex);// 2.从环形队列中得到数据*out = _ringqueue[_c_step];_c_step++;_c_step %= _max_cap;// 3.解锁pthread_mutex_unlock(&_c_mutex);// 4.释放空间资源V(_space_sem);}~RingQueue(){sem_destroy(&_data_sem);sem_destroy(&_space_sem);pthread_mutex_destroy(&_c_mutex);pthread_mutex_destroy(&_p_mutex);}private:// 1.队列和容量std::vector<T> _ringqueue;int _max_cap; // 最大容量// 2.消费者和生产者对应的位置,下标int _c_step;int _p_step;// 3.数据信号量和空间信号量sem_t _data_sem;sem_t _space_sem;// 4.消费者和生产者的互斥锁 pthread_mutex_t _c_mutex;pthread_mutex_t _p_mutex;
};
多生产多消费的意义:让处理数据和获取数据有更好的并发度
生产和消费不能只看,他们从环形队列放数据和拿数据的时间,真实的场景下,放数据和拿数据的时间只占很小的比重,处理数据、获取数据才占更大的比重,因此多线程在根本上是为了解决:
让处理数据和获取数据有更好的并发度
问题:在编写申请信号量处,对资源进行使用,申请,为什么不需要判断一下是否满足条件?
在上一篇博客讲到添加条件变量唤醒线程的时候,都需要先判断一下,阻塞队列是否满,是否为空,为什么这里不需要条件判断
这是因为信号量本身就是判断条件
信号量的本质就是采用原子性,将内部资源的使用状态,让我们在外部就能直接得到,直接判断出。
二元信号量
将资源看做整体,可以使用互斥锁来维持互斥关系,也可以使用信号量,将信号量的初始值设为1,来表征这整个资源,信号量(1,0)这就是二元信号量等同于互斥锁。
线程池
线程池的简单理解:餐厅服务员团队 🍔
想象你开了一家汉堡店,顾客源源不断。如果每来一个顾客,你就现招一个服务员为他服务,会发生什么?
问题:招聘服务员耗时(创建线程开销大),顾客等得不耐烦(响应慢)。
更糟的是:如果瞬间来100个顾客,招100个服务员挤爆厨房(系统资源耗尽)!
解决方案:线程池(固定服务员团队)
提前准备:雇佣3个固定服务员(线程池大小=3)。
任务队列:顾客按顺序排成一队(任务队列)。
分配规则:空闲的服务员从队首接单,服务完继续接下一个。
效果:
无需反复招人,服务员复用(线程复用)。
最多3个顾客同时被服务(控制并发)。
新顾客排队等待,秩序井然(任务队列管理)。
以前有写过进程池:进程间通信、进程池
进程池的本质就是生产消费模型,线程池也是生产消费模型。
实际应用场景
Web服务器:同时处理上千用户请求。
批量处理数据:比如同时下载100个文件。
游戏服务器:管理大量玩家动作事件。
线程池实现:
1.Thread.hpp()之前所讲过的线程的封装
#pragma once #include <iostream> #include <string> #include <pthread.h>#include<functional>namespace ThreadModle {// 线程要执行的方法// typedef void (*func_t)(ThreadData *td); // 函数指针类型// 等同于typedef std::function<void()> func_t;using func_t = std::function<void()>;//返回值为void,参数为空的一个函数类型class Thread{public:void Excute() // 调用回调函数的成员方法{std::cout << _name << ",is running" << std::endl;_isrunning = true;_func();_isrunning = false;}// 未来想要创建一个线程,一方面在创建线程的时候传递线程名,传递一个回调方法Thread(const std::string &name, func_t func): _name(name), _func(func){std::cout << "create " << name << " done" << std::endl;}static void *ThreadRoutine(void *args) // 创建的新线程都会执行这个方法{// 执行回调方法,每一个线程都能执行一系列的方法Thread *self = static_cast<Thread *>(args); // 获得了当前对象self->Excute();return nullptr;}bool Start(){ // 创建线程成功就返回0int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this); // ThreadRoutine线程的固定历程if (n != 0)return false;return true;}std::string Status(){if (_isrunning)return "running";elsereturn "sleep";}void Stop(){if (_isrunning){pthread_cancel(_tid); // 取消线程_isrunning = false; // 设置状态为false线程停止std::cout << _name << " Stop" << std::endl;}}void Join(){pthread_join(_tid, nullptr);std::cout << _name << " Joined" << std::endl;}std::string Name(){return _name;}~Thread(){// Stop();// Join();}private:std::string _name; // 线程名pthread_t _tid; // 线程所对应的idbool _isrunning; // 线程此时是否正在运行func_t _func; // 线程要执行的回调函数};}
ThreadPool.hpp
注意:这里涉及到的知识点,包装器bind的用法:
包装器
这里出错的原因是,我们要求任务队列中的任务都应该是参数为空,返回值为void的函数类型,而HandlerTask属于成员函数,隐含了当前对象this指针,因此不满足函数类型的条件,当然无法传进。因此使用bind绑定来调整参数的个数,相当于构造一个function对象,用类ThreadPool中的方法HandlerTask,和当前对象this构造出这个func对象。func对象的参数在接收方看来就是空的。
ThreadPool.hpp1.0:
#pragma once#include <string> #include <iostream> #include <vector> #include <pthread.h> #include <queue> #include "Thread.hpp" #include "Task.hpp" #include <unistd.h> using namespace ThreadModle;static const int gdefaultnum = 5;void test() {while (true){std::cout << "hello linux" << std::endl;sleep(1);} }template <class T> class ThreadPool { private://锁住任务队列(保护任务队列)void LockTaskQueue(){pthread_mutex_lock(&_mutex);}void UnLockTaskQueue(){pthread_mutex_unlock(&_mutex);}bool isEmpty(){return _task_queue.empty();}void WakeUp(){pthread_cond_signal(&_cond);}void Sleep(){pthread_cond_wait(&_cond, &_mutex);//休眠时需要释放掉锁}//处理任务,注意在传这个函数指针的时候要注意,他内部有隐含有this指针void HandlerTask(){while(true){//取任务LockTaskQueue();//访问临界资源while(isEmpty()) //如果任务队列为空,线程就能休眠了,防止伪唤醒的情况这里使用while,不满足情况就一直wait,再判断{_sleep_thread_num++;//任务为空,将休眠线程+1Sleep();_sleep_thread_num--;}//有任务 || 被唤醒,在这里一定是有任务才被唤醒T t = _task_queue.front();_task_queue.pop();UnLockTaskQueue();//处理任务,此处不能/也不能在临界区//因为这个任务一旦被取出来了,就从队列当中被移走了,放在一个临时空间里t(); std::cout << t.result() << std::endl;}} public:// 实际上对于线程池需要外部传的就是线程的数量ThreadPool(int thread_num = gdefaultnum): _thread_num(thread_num), _isrunning(false),_sleep_thread_num(0)//最开始没有线程在休眠{pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}// 初始化线程池void Init(){//构造一个function线程池类来构造的对象func_t func = std::bind(&ThreadPool::HandlerTask, this);//直接把this传过去(改变参数数量)for (int i = 0; i < _thread_num; i++){std::string threadname = "thread-" + std::to_string(i + 1);_threads.emplace_back(threadname, func);}}// 让线程池中的线程工作void Start(){for (auto &thread : _threads){thread.Start();}}// 让线程池中线程停止工作void Stop(){}// 像线程池推送任务void Equeue(const T& in){LockTaskQueue();_task_queue.push(in);// 唤醒线程,是否有正在休眠的线程_sleep_thread_numif(_sleep_thread_num > 0)//只要有线程在休眠,就唤醒WakeUp(); //没有休眠的线程,就会在这等待UnLockTaskQueue();}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:// 1.期望的线程数量int _thread_num;// 2.线程队列std::vector<Thread> _threads;// 3.任务队列 --- 共享资源 --- 需要被保护std::queue<T> _task_queue; // 4.线程池是否正在运行bool _isrunning;// 因为需要线程访问任务队列,因此访问任务队列的代码就是临界资源// 5.加锁保护队列,为什么不用信号量,因为当前队列只能整体使用,pthread_mutex_t _mutex;// 6.条件变量pthread_cond_t _cond;// 7.计数器,休眠线程的数量int _sleep_thread_num; };
Task.hpp(所用到的就是前面生产者消费者模型当中封装的任务)
Main.cc
#include"ThreadPool.hpp" #include<memory> #include "Task.hpp"int main() {// std::unique_ptr<ThreadPool> tp = std::make_unique<ThreadPool>;//c++14智能指针ThreadPool<Task> *tp = new ThreadPool<Task>();tp->Init();tp->Start();while(true){//不断地像线程池推送任务Task t(1,1);tp->Equeue(t);sleep(1);}tp->Stop(); }
运行结果:
现在我们要处理的问题是,哪一个线程处理的问题,想要打印出来,因此想到在构造线程的时候,function函数对类型当中都传入处理任务的线程名,也就是修改Thread.hpp中的代码:
修改HandlerTask:
运行后就会出现这样的报错:
static assertion failed: Wrong number of arguments for pointer-to-member
出错原因:bind传参数时候出现了问题,需要设定一个占位符,表示在绑定this对象参数后,我们的函数类型内部至少还有一个参数。具体看bind的用法。
func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);//直接把this传过去(改变参数数量)
线程排队的去执行任务:
让线程池中的所有线程,在执行完所有任务后,自动退出
注意在执行任务的时候,每个线程所执行的任务可能不一样,退出的基本条件是:
1.任务队列没有任务
2.线程想退出,所有的线程的状态都是Stop。
因此对于处理任务
1.如果当前没有任务,并且线程池并不退出正在运行,才让线程休眠
2.否则,就需要处理,只需要判定一种情况:任务队列为空了,并且运行状态为非运行,请他情况都让线程继续运行
// 处理任务,注意在传这个函数指针的时候要注意,他内部有隐含有this指针void HandlerTask(const std::string &name){while (true){// 取任务LockTaskQueue();// 访问临界资源// 如果当前没有任务,并且线程池并不退出正在运行,才让线程休眠while (isEmpty() && _isrunning) // 如果任务队列为空,线程就能休眠了,防止伪唤醒的情况这里使用while,不满足情况就一直wait,再判断{_sleep_thread_num++; // 任务为空,将休眠线程+1Sleep();_sleep_thread_num--;}// 判定一种情况:任务队列为空了,并且运行状态为非运行if (isEmpty() && !_isrunning){std::cout << name << " quit" << std::endl;// 直接解锁任务队列;UnLockTaskQueue();// 直接退出循环:break;}// 有任务 || 被唤醒,在这里一定是有任务才被唤醒T t = _task_queue.front();_task_queue.pop();UnLockTaskQueue();// 处理任务,此处不能/也不能在临界区// 因为这个任务一旦被取出来了,就从队列当中被移走了,放在一个临时空间里t();std::cout << name << " : " << t.result() << std::endl;}}
将线程设置为Stop状态:
void Stop(){_isrunning = false;}
仅仅将运行状态设为false,有可能所有的线程都在休眠,在判断:
线程无法被唤醒,因此线程无法退出
因此封装一个唤醒所有线程的代码:
//唤醒所有的线程,方便让线程退出void WakeUpAll(){pthread_cond_broadcast(&_cond);}
修改Stop()
void Stop(){//保证他的安全性,带锁LockTaskQueue();_isrunning = false; //仅仅设为false,有可能所有的线程都在休眠,线程无法被唤醒,因此线程无法退出//因此需要唤醒所有的线程WakeUpAll();UnLockTaskQueue();}
如果此时已经将线程Stop了,但是此时上层如果不断的像线程池中push任务,任务队列一直不为空,那么不是还要一直处理任务吗
优化:因此在入队列的时候,要保证线程池的状态是运行状态
// 像线程池推送任务void Equeue(const T &in){LockTaskQueue();//线程池必须处于运行状态才能push任务if (_isrunning){_task_queue.push(in);// 唤醒线程,是否有正在休眠的线程_sleep_thread_numif (_sleep_thread_num > 0) // 只要有线程在休眠,就唤醒WakeUp(); // 没有休眠的线程,就会在这等待}UnLockTaskQueue();}
测试代码:
#include "ThreadPool.hpp" #include <memory>#include "Task.hpp"int main() {// std::unique_ptr<ThreadPool> tp = std::make_unique<ThreadPool>;//c++14智能指针ThreadPool<Task> *tp = new ThreadPool<Task>();tp->Init();tp->Start();int cnt = 10;while (cnt--){// 不断地像线程池推送任务Task t(1, 1);tp->Equeue(t);sleep(1);std::cout << "cnt: " << cnt << std::endl;}tp->Stop();std::cout << "thread pool stop" << std::endl;sleep(10);return 0; }
运行结果:
继续优化:
实际上在我们运行的代码当中还存在一些问题,运行的结果打印的很不优美
在实际情况当中,我们的多线程,线程池的代码当中,都需要日志信息
下一篇文章将讲解用日志来优化我们的代码,以及单例模式
结语:
随着这篇关于题目解析的博客接近尾声,我衷心希望我所分享的内容能为你带来一些启发和帮助。学习和理解的过程往往充满挑战,但正是这些挑战让我们不断成长和进步。我在准备这篇文章时,也深刻体会到了学习与分享的乐趣。
在此,我要特别感谢每一位阅读到这里的你。是你的关注和支持,给予了我持续写作和分享的动力。我深知,无论我在某个领域有多少见解,都离不开大家的鼓励与指正。因此,如果你在阅读过程中有任何疑问、建议或是发现了文章中的不足之处,都欢迎你慷慨赐教 。
你的每一条反馈都是我前进路上的宝贵财富。同时,我也非常期待能够得到你的点赞、收藏,关注这将是对我莫大的支持和鼓励。当然,我更期待的是能够持续为你带来有价值的内容。