当前位置: 首页 > news >正文

【高并发内存池】从零到一的项目之高并发内存池整体框架设计及thread cache设计

个人主页 : zxctscl
专栏 【C++】、 【C语言】、 【Linux】、 【数据结构】、 【算法】
如有转载请先通知

文章目录

  • 前言
  • 1. 高并发内存池整体框架设计
  • 2. 高并发内存池--thread cache
    • 2.1 定长内存池的问题
    • 2.2 整体框架
    • 2.3 自由链表
    • 2.4 thread cache哈希桶的对齐规则
    • 2.5 ThreadCache类的设计
      • 2.5.1 Allocate
      • 2.5.2 Deallocate
    • 2.6 无锁访问
    • 2.7 测试TLS
  • 3. 附代码
    • 3.1 Common.h
    • 3.2 ObjectPool.h
    • 3.3 ThreadCache.h
    • 3.4 ConcurrentAlloc.h
    • 3.5 ThreadCache.cpp
    • 3.6 test.cpp

前言

接上回的项目 【高并发内存池】从零到一的项目:项目介绍、内存池及定长内存池的设计继续分享项目创做过程及代码。

1. 高并发内存池整体框架设计

现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题。

  1. 性能问题。
  2. 多线程环境下,锁竞争问题。
  3. 内存碎片问题。

concurrent memory pool主要由以下3个部分构成:

  1. thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方
    解决了大部分情况下线程锁竞争的情况。

  2. central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈
    居中调度

  3. page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题

在这里插入图片描述

2. 高并发内存池–thread cache

2.1 定长内存池的问题

在上一次的分享中,做了一个定长内存池,有一个自由链表,它用来管理好切好的小块内存。需要内存的时候首先在自由链表中找,然后头删,释放的内存就头插到自由链表里。
但这里解决的是定长内存池,指定解决某一个大小。
在这里插入图片描述
要想解决各种需求下的内存分配,可能是8byte、10byte、16byte等等,这是就考虑利用多个自由链表来实现。
但是如果每一个字节数都挂一个自由链表,就像1字节、2字节这样就麻烦了。此时就用一些平衡的牺牲。

如果需求小于等于8字节的就在8字节这里切,要8-16字节的就在16字节那里切。
在这里插入图片描述

如果此时需要5字节,却分配了8字节;需要6字节,却分配了2字节,那么就会导致碎片化的内存用不上,这里的碎片被称为内碎片。会选择一些值去对齐,后面会提到。

在这里插入图片描述
对齐就会产生一些碎片化的内存用不上,利用不上的这些内存就是内碎片。

而外碎片,是一块连续的空间被切成了很多块分出去,只有部分还回来了,它们不连续,导致有足够的空间,却申请不了大块的空间。

2.2 整体框架

thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。
在这里插入图片描述
申请内存:

  1. 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标i。
  2. 如果自由链表_freeLists[i]中有对象,则直接Pop一个内存对象返回。
  3. 如果_freeLists[i]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。

释放内存:

  1. 当释放内存小于256k时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象Push到_freeLists[i]。
  2. 当链表的长度过长,则回收一部分内存对象到central cache。

2.3 自由链表

void* Allocate(size_t size);来存不同定长哈希桶自由链表大小。

每个桶下面都挂自由链表,为了方便控制,用一个类去封装一下, 这个类FreeList管理切分好的小对象的自由链表。
这个类里面成员变量只有一个指针用来指向链表,提供简单的头删、头插接口。

头插:
在这里插入图片描述

  void Push(void* obj){assert(obj);*(void**)obj = _freeList;_freeList = obj;}

再提供一个公共函数NextObj(),给一个对象去取这个对象头上的4个字节或者8个字节。

static void*& NextObj(void* obj)
{return *(void**)obj;
}

头删:
取它头上的4个或者8个字节指向下一个
在这里插入图片描述

	void* Pop(){assert(_freeList);// 头删void* obj = _freeList;_freeList = NextObj(obj);return obj;}
static void*& NextObj(void* obj)
{return *(void**)obj;
}class FreeList
{
public:void Push(void* obj){assert(obj);// 头插//*(void**)obj = _freeList;NextObj(obj) = _freeList;_freeList = obj;}void* Pop(){assert(_freeList);// 头删void* obj = _freeList;_freeList = NextObj(obj);return obj;}bool Empty(){return _freeList == nullptr;}
private:void* _freeList = nullptr;
};

2.4 thread cache哈希桶的对齐规则

给一个定长内存池,该切多大一个内存块分配呢?
就得制定一定的规则。

用静态成员变量来记录最大的内存大小。

static const size_t MAX_BYTES = 256 * 1024;

此时在thread cache设计中,它哈希桶的size不能大于MAX_BYTES。

assert(size <= MAX_BYTES);

那么映射对应对象的桶怎么选择?这里就有一个对齐规则,计算一个申请这个空间大小的内存,该对应哪一个哈希桶。

此时用一个类SizeClass专门来管理这个映射规则:如果所有的都以8字节来对齐,1-8映射8,9-16对应第二个桶,17-24对应第三个桶一直这样分配,此时256*1024=262144都以8字节平分,就除8,就得建32768个自由链表个桶,就很多。
在这里插入图片描述
此时就给了一个简化的规则:

整体控制在最多10%左右的内碎片浪费:

  [1,128]	                8byte对齐	    freelist[0,16)[128+1,1024]			 	16byte对齐	    freelist[16,72)[1024+1,8*1024]			128byte对齐	    freelist[72,128)[8*1024+1,64*1024]		1024byte对齐     freelist[128,184)[64*1024+1,256*1024]		8*1024byte对齐   freelist[184,208)

框架就是:

	size_t RoundUp(size_t size){if (size <= 128){}else if (size <= 1024){}else if (size <= 8 * 1024){}else if (size <= 64 * 1024){}else if (size <= 256 * 1024){}else{assert(false);return -1;}}

怎么去算对齐呢?
此时给一个子函数size_t _RoundUp(),用来算对齐数。
如果size模8等于0就不需要处理,对齐的就是8的倍数;如果不等于0就得用(size/8+1)*8,这时候得出来的就是8的倍数。
通用就是这样:

	size_t _RoundUp(size_t size, size_t alignNum){size_t alignSize;if (size % alignNum != 0){alignSize = (size / alignNum + 1)*alignNum;}else{alignSize = size;}return alignSize;

专业给出的是,用8字节来计算一下,8+8-1=15,再7去翻,7二进制就是000111,~7就是111000,再用15&~7,此时低三位无论是什么都与000,15就是001111,001111&111000就是001000,就是8

	static inline size_t _RoundUp(size_t bytes, size_t alignNum){return ((bytes + alignNum - 1) & ~(alignNum - 1));}

此时对齐规则完整就是:

	static inline size_t _RoundUp(size_t bytes, size_t alignNum){return ((bytes + alignNum - 1) & ~(alignNum - 1));}static inline size_t RoundUp(size_t size){if (size <= 128){return _RoundUp(size, 8);}else if (size <= 1024){return _RoundUp(size, 16);}else if (size <= 8 * 1024){return _RoundUp(size, 128);}else if (size <= 64 * 1024){return _RoundUp(size, 1024);}else if (size <= 256 * 1024){return _RoundUp(size, 8 * 1024);}else{assert(false);return -1;}}

计算映射的哪一个自由链表桶,求他对应的下标:

size_t _Index(size_t bytes, size_t alignNum){if (bytes % alignNum == 0){return bytes / alignNum - 1;}else{return bytes / alignNum;}}

专业的计算映射的哪一个自由链表桶,如果是1-8之间,那么就整体加上一个8-1=7,此时就变到8-15,然后再右移3位,相当于除8,最后再减1就算出它下标。

	static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}// 计算映射的哪一个自由链表桶static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);// 每个区间有多少个链static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128) {return _Index(bytes, 3);}else if (bytes <= 1024) {return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024) {return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];}else if (bytes <= 64 * 1024) {return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];}else if (bytes <= 256 * 1024) {return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];}else {assert(false);}return -1;}

2.5 ThreadCache类的设计

ThreadCache类的成员变量是一个存储208个自由链表的数组,成员函数包括申请内存对象,释放内存对象和从中心缓存获取对象。

总的哈希桶就有208个,用一个静态const成员来记录:

static const size_t NFREELIST = 208;
class ThreadCache
{
public:// 申请和释放内存对象void* Allocate(size_t size);void Deallocate(void* ptr, size_t size);// 从中心缓存获取对象void* FetchFromCentralCache(size_t index, size_t size);
private:FreeList _freeLists[NFREELIST];
};

2.5.1 Allocate

有了对齐规则之后,申请内存大小就比较容易。

此时空间,每一个ThreadCache都有一个哈希映射的自由链表,如果这个链表不为空,就弄一个出去。

如果这个自由链表下面对应的没有空的,就找下一层中心缓存去获取对象,获取alignSize大小对象。

void* ThreadCache::Allocate(size_t size)
{assert(size <= MAX_BYTES);size_t alignSize = SizeClass::RoundUp(size);size_t index = SizeClass::Index(size);if (!_freeLists[index].Empty()){return _freeLists[index].Pop();}else{return FetchFromCentralCache(index, alignSize);}
}

2.5.2 Deallocate

释放内存对象,释放后的内存对象,找对映射的自由链表桶,将对象插入进入。

void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);// 找对映射的自由链表桶,对象插入进入size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);
}

2.6 无锁访问

TLS–thread local storage:
linux gcc下 tls

// TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

通过静态的TLS 每个线程无锁的获取自己的专属的ThreadCache对象

如果pTLSThreadCache 为空,申请一个ThreadCache,获取这个对象内存

std::this_thread::get_id()可以获取pTLSThreadCache线程号

static void* ConcurrentAlloc(size_t size)
{// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象if (pTLSThreadCache == nullptr){pTLSThreadCache = new ThreadCache;}cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;return pTLSThreadCache->Allocate(size);
}

释放ThreadCache,断言一下是否为空,不为空就调用Deallocate(),而Deallocate()要有释放内存大小

static void ConcurrentFree(void* ptr, size_t size)
{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);
}

2.7 测试TLS

用C++11里面提供的线程相关的,在构造的时候给运行对象和可变参数。
在这里插入图片描述

创建多线程场景,申请字节

void Alloc1()
{for (size_t i = 0; i < 5; ++i){void* ptr = ConcurrentAlloc(6);}
}void Alloc2()
{for (size_t i = 0; i < 5; ++i){void* ptr = ConcurrentAlloc(7);}
}void TLSTest()
{std::thread t1(Alloc1);//创建一个线程t1.join();std::thread t2(Alloc2);t2.join();
}int main()
{TLSTest();return 0;
}

并行监视:
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

3. 附代码

3.1 Common.h

#pragma once#include <iostream>
#include <vector>
#include <thread>
#include <time.h>
#include <assert.h>
using std::cout;
using std::endl;static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;static void*& NextObj(void* obj)
{return *(void**)obj;
}// 管理切分好的小对象的自由链表
class FreeList
{
public:void Push(void* obj){assert(obj);// 头插//*(void**)obj = _freeList;NextObj(obj) = _freeList;_freeList = obj;}void* Pop(){assert(_freeList);// 头删void* obj = _freeList;_freeList = NextObj(obj);return obj;}bool Empty(){return _freeList == nullptr;}
private:void* _freeList = nullptr;
};// 计算对象大小的对齐映射规则
class SizeClass
{
public:// 整体控制在最多10%左右的内碎片浪费// [1,128]					8byte对齐	    freelist[0,16)// [128+1,1024]				16byte对齐	    freelist[16,72)// [1024+1,8*1024]			128byte对齐	    freelist[72,128)// [8*1024+1,64*1024]		1024byte对齐     freelist[128,184)// [64*1024+1,256*1024]		8*1024byte对齐   freelist[184,208)/*size_t _RoundUp(size_t size, size_t alignNum){size_t alignSize;if (size % alignNum != 0){alignSize = (size / alignNum + 1)*alignNum;}else{alignSize = size;}return alignSize;}*/// 1-8 static inline size_t _RoundUp(size_t bytes, size_t alignNum){return ((bytes + alignNum - 1) & ~(alignNum - 1));}static inline size_t RoundUp(size_t size){if (size <= 128){return _RoundUp(size, 8);}else if (size <= 1024){return _RoundUp(size, 16);}else if (size <= 8 * 1024){return _RoundUp(size, 128);}else if (size <= 64 * 1024){return _RoundUp(size, 1024);}else if (size <= 256 * 1024){return _RoundUp(size, 8 * 1024);}else{assert(false);return -1;}}/*size_t _Index(size_t bytes, size_t alignNum){if (bytes % alignNum == 0){return bytes / alignNum - 1;}else{return bytes / alignNum;}}*/// 1 + 7  8// 2      9// ...// 8      15// 9 + 7 16// 10// ...// 16    23static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}// 计算映射的哪一个自由链表桶static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);// 每个区间有多少个链static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128) {return _Index(bytes, 3);}else if (bytes <= 1024) {return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024) {return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];}else if (bytes <= 64 * 1024) {return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];}else if (bytes <= 256 * 1024) {return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];}else {assert(false);}return -1;}};

3.2 ObjectPool.h

#include "Common.h"#ifdef _WIN32
#include<windows.h>
#else
// 
#endif// 定长内存池
//template<size_t N>
//class ObjectPool
//{};// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else// linux下brk mmap等
#endifif (ptr == nullptr)throw std::bad_alloc();return ptr;
}template<class T>
class ObjectPool
{
public:T* New(){T* obj = nullptr;// 优先把还回来内存块对象,再次重复利用if (_freeList){void* next = *((void**)_freeList);obj = (T*)_freeList;_freeList = next;}else{// 剩余内存不够一个对象大小时,则重新开大块空间if (_remainBytes < sizeof(T)){_remainBytes = 128 * 1024;//_memory = (char*)malloc(_remainBytes);_memory = (char*)SystemAlloc(_remainBytes >> 13);if (_memory == nullptr){throw std::bad_alloc();}}obj = (T*)_memory;size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);_memory += objSize;_remainBytes -= objSize;}// 定位new,显示调用T的构造函数初始化new(obj)T;return obj;}void Delete(T* obj){// 显示调用析构函数清理对象obj->~T();// 头插*(void**)obj = _freeList;_freeList = obj;}private:char* _memory = nullptr; // 指向大块内存的指针size_t _remainBytes = 0; // 大块内存在切分过程中剩余字节数void* _freeList = nullptr; // 还回来过程中链接的自由链表的头指针
};struct TreeNode
{int _val;TreeNode* _left;TreeNode* _right;TreeNode():_val(0), _left(nullptr), _right(nullptr){}
};void TestObjectPool()
{// 申请释放的轮次const size_t Rounds = 5;// 每轮申请释放多少次const size_t N = 100000;std::vector<TreeNode*> v1;v1.reserve(N);size_t begin1 = clock();for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i){v1.push_back(new TreeNode);}for (int i = 0; i < N; ++i){delete v1[i];}v1.clear();}size_t end1 = clock();std::vector<TreeNode*> v2;v2.reserve(N);ObjectPool<TreeNode> TNPool;size_t begin2 = clock();for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i){v2.push_back(TNPool.New());}for (int i = 0; i < N; ++i){TNPool.Delete(v2[i]);}v2.clear();}size_t end2 = clock();cout << "new cost time:" << end1 - begin1 << endl;cout << "object pool cost time:" << end2 - begin2 << endl;
}

3.3 ThreadCache.h

#pragma once#include "Common.h"class ThreadCache
{
public:// 申请和释放内存对象void* Allocate(size_t size);void Deallocate(void* ptr, size_t size);// 从中心缓存获取对象void* FetchFromCentralCache(size_t index, size_t size);
private:FreeList _freeLists[NFREELIST];
};// TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

3.4 ConcurrentAlloc.h

#pragma once#include "Common.h"
#include "ThreadCache.h"static void* ConcurrentAlloc(size_t size)
{// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象if (pTLSThreadCache == nullptr){pTLSThreadCache = new ThreadCache;}cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;return pTLSThreadCache->Allocate(size);
}static void ConcurrentFree(void* ptr, size_t size)
{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);
}

3.5 ThreadCache.cpp

#include "ThreadCache.h"void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{// ...return nullptr;
}void* ThreadCache::Allocate(size_t size)
{assert(size <= MAX_BYTES);size_t alignSize = SizeClass::RoundUp(size);size_t index = SizeClass::Index(size);if (!_freeLists[index].Empty()){return _freeLists[index].Pop();}else{return FetchFromCentralCache(index, alignSize);}
}void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);// 找对映射的自由链表桶,对象插入进入size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);
}

3.6 test.cpp

#include "ObjectPool.h"
#include "ConcurrentAlloc.h"void Alloc1()
{for (size_t i = 0; i < 5; ++i){void* ptr = ConcurrentAlloc(6);}
}void Alloc2()
{for (size_t i = 0; i < 5; ++i){void* ptr = ConcurrentAlloc(7);}
}void TLSTest()
{std::thread t1(Alloc1);t1.join();std::thread t2(Alloc2);t2.join();
}int main()
{//TestObjectPool();TLSTest();return 0;
}

有问题请指出,大家一起进步!!!

相关文章:

  • 基于TCP的协议
  • 深度学习--卷积神经网络保存最优模型
  • mcp 客户端sse远程调用服务端与本地大模型集成实例
  • Python 基础
  • ABAQUS多晶体材料断裂模型
  • 百度搜索 API 相比于爬虫的效率提升、价格及如何注册使用
  • Kubernetes Docker 部署达梦8数据库
  • 【EasyPan】文件上传、文件秒传、文件转码、文件合并、异步转码、视频切割分析
  • MySQL索引知识点(笔记)
  • 《大模型+Agent 企业应用实践》的大纲
  • 网络基础概念(下)
  • 驱动开发硬核特训 · Day 17:深入掌握中断机制与驱动开发中的应用实战
  • MYSQL的binlog
  • 《棒球规则》全明星比赛规则·棒球1号位
  • 爱普生FC1610BN晶体在健康监测手环的应用
  • 使用Python设置excel单元格的字体(font值)
  • JavaScript 扩展Array类方法实现数组求和
  • 【网络应用程序设计】实验一:本地机上的聊天室
  • 代码随想录训练营38天 || 322. 零钱兑换 279. 完全平方数 139. 单词拆分
  • 从零开始学习MySQL的系统学习大纲
  • 特朗普激发加拿大爱国热情:大选提前投票人数创纪录,魁北克分离情绪被冲淡
  • 都市文化·商业演剧·海派艺术:早期上海话剧商演发展新探索
  • 九江市人大常委会原党组成员、副主任戴晓慧主动交代问题,正接受审查调查
  • 四川苍溪警方通报一男子离家出走:遗体被打捞上岸,排除刑案
  • 南京信息工程大学商学院讲师李玮玮逝世,终年45岁
  • 女子伸腿阻止高铁关门等待同行人员,相关部门已介入调查