【高并发内存池】从零到一的项目之高并发内存池整体框架设计及thread cache设计
个人主页 : zxctscl
专栏 【C++】、 【C语言】、 【Linux】、 【数据结构】、 【算法】
如有转载请先通知
文章目录
- 前言
- 1. 高并发内存池整体框架设计
- 2. 高并发内存池--thread cache
- 2.1 定长内存池的问题
- 2.2 整体框架
- 2.3 自由链表
- 2.4 thread cache哈希桶的对齐规则
- 2.5 ThreadCache类的设计
- 2.5.1 Allocate
- 2.5.2 Deallocate
- 2.6 无锁访问
- 2.7 测试TLS
- 3. 附代码
- 3.1 Common.h
- 3.2 ObjectPool.h
- 3.3 ThreadCache.h
- 3.4 ConcurrentAlloc.h
- 3.5 ThreadCache.cpp
- 3.6 test.cpp
前言
接上回的项目 【高并发内存池】从零到一的项目:项目介绍、内存池及定长内存池的设计继续分享项目创做过程及代码。
1. 高并发内存池整体框架设计
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题。
- 性能问题。
- 多线程环境下,锁竞争问题。
- 内存碎片问题。
concurrent memory pool
主要由以下3个部分构成:
-
thread cache
:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。
解决了大部分情况下线程锁竞争的情况。 -
central cache
:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈。
居中调度 -
page cache
:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache
会回收central cache
满足条件的span
对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
2. 高并发内存池–thread cache
2.1 定长内存池的问题
在上一次的分享中,做了一个定长内存池,有一个自由链表,它用来管理好切好的小块内存。需要内存的时候首先在自由链表中找,然后头删,释放的内存就头插到自由链表里。
但这里解决的是定长内存池,指定解决某一个大小。
要想解决各种需求下的内存分配,可能是8byte、10byte、16byte等等,这是就考虑利用多个自由链表来实现。
但是如果每一个字节数都挂一个自由链表,就像1字节、2字节这样就麻烦了。此时就用一些平衡的牺牲。
如果需求小于等于8字节的就在8字节这里切,要8-16字节的就在16字节那里切。
如果此时需要5字节,却分配了8字节;需要6字节,却分配了2字节,那么就会导致碎片化的内存用不上,这里的碎片被称为内碎片。会选择一些值去对齐,后面会提到。
对齐就会产生一些碎片化的内存用不上,利用不上的这些内存就是内碎片。
而外碎片,是一块连续的空间被切成了很多块分出去,只有部分还回来了,它们不连续,导致有足够的空间,却申请不了大块的空间。
2.2 整体框架
thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。
申请内存:
- 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标i。
- 如果自由链表_freeLists[i]中有对象,则直接Pop一个内存对象返回。
- 如果_freeLists[i]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。
释放内存:
- 当释放内存小于256k时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象Push到_freeLists[i]。
- 当链表的长度过长,则回收一部分内存对象到central cache。
2.3 自由链表
用void* Allocate(size_t size);
来存不同定长哈希桶自由链表大小。
每个桶下面都挂自由链表,为了方便控制,用一个类去封装一下, 这个类FreeList
管理切分好的小对象的自由链表。
这个类里面成员变量只有一个指针用来指向链表,提供简单的头删、头插接口。
头插:
void Push(void* obj){assert(obj);*(void**)obj = _freeList;_freeList = obj;}
再提供一个公共函数NextObj()
,给一个对象去取这个对象头上的4个字节或者8个字节。
static void*& NextObj(void* obj)
{return *(void**)obj;
}
头删:
取它头上的4个或者8个字节指向下一个
void* Pop(){assert(_freeList);// 头删void* obj = _freeList;_freeList = NextObj(obj);return obj;}
static void*& NextObj(void* obj)
{return *(void**)obj;
}class FreeList
{
public:void Push(void* obj){assert(obj);// 头插//*(void**)obj = _freeList;NextObj(obj) = _freeList;_freeList = obj;}void* Pop(){assert(_freeList);// 头删void* obj = _freeList;_freeList = NextObj(obj);return obj;}bool Empty(){return _freeList == nullptr;}
private:void* _freeList = nullptr;
};
2.4 thread cache哈希桶的对齐规则
给一个定长内存池,该切多大一个内存块分配呢?
就得制定一定的规则。
用静态成员变量来记录最大的内存大小。
static const size_t MAX_BYTES = 256 * 1024;
此时在thread cache设计中,它哈希桶的size不能大于MAX_BYTES。
assert(size <= MAX_BYTES);
那么映射对应对象的桶怎么选择?这里就有一个对齐规则,计算一个申请这个空间大小的内存,该对应哪一个哈希桶。
此时用一个类SizeClass
专门来管理这个映射规则:如果所有的都以8字节来对齐,1-8映射8,9-16对应第二个桶,17-24对应第三个桶一直这样分配,此时256*1024=262144都以8字节平分,就除8,就得建32768个自由链表个桶,就很多。
此时就给了一个简化的规则:
整体控制在最多10%左右的内碎片浪费:
[1,128] 8byte对齐 freelist[0,16)[128+1,1024] 16byte对齐 freelist[16,72)[1024+1,8*1024] 128byte对齐 freelist[72,128)[8*1024+1,64*1024] 1024byte对齐 freelist[128,184)[64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)
框架就是:
size_t RoundUp(size_t size){if (size <= 128){}else if (size <= 1024){}else if (size <= 8 * 1024){}else if (size <= 64 * 1024){}else if (size <= 256 * 1024){}else{assert(false);return -1;}}
怎么去算对齐呢?
此时给一个子函数size_t _RoundUp()
,用来算对齐数。
如果size模8等于0就不需要处理,对齐的就是8的倍数;如果不等于0就得用(size/8+1)*8,这时候得出来的就是8的倍数。
通用就是这样:
size_t _RoundUp(size_t size, size_t alignNum){size_t alignSize;if (size % alignNum != 0){alignSize = (size / alignNum + 1)*alignNum;}else{alignSize = size;}return alignSize;
专业给出的是,用8字节来计算一下,8+8-1=15,再7去翻,7二进制就是000111,~7
就是111000,再用15&~7,此时低三位无论是什么都与000,15就是001111,001111&111000就是001000,就是8
static inline size_t _RoundUp(size_t bytes, size_t alignNum){return ((bytes + alignNum - 1) & ~(alignNum - 1));}
此时对齐规则完整就是:
static inline size_t _RoundUp(size_t bytes, size_t alignNum){return ((bytes + alignNum - 1) & ~(alignNum - 1));}static inline size_t RoundUp(size_t size){if (size <= 128){return _RoundUp(size, 8);}else if (size <= 1024){return _RoundUp(size, 16);}else if (size <= 8 * 1024){return _RoundUp(size, 128);}else if (size <= 64 * 1024){return _RoundUp(size, 1024);}else if (size <= 256 * 1024){return _RoundUp(size, 8 * 1024);}else{assert(false);return -1;}}
计算映射的哪一个自由链表桶,求他对应的下标:
size_t _Index(size_t bytes, size_t alignNum){if (bytes % alignNum == 0){return bytes / alignNum - 1;}else{return bytes / alignNum;}}
专业的计算映射的哪一个自由链表桶,如果是1-8之间,那么就整体加上一个8-1=7,此时就变到8-15,然后再右移3位,相当于除8,最后再减1就算出它下标。
static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}// 计算映射的哪一个自由链表桶static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);// 每个区间有多少个链static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128) {return _Index(bytes, 3);}else if (bytes <= 1024) {return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024) {return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];}else if (bytes <= 64 * 1024) {return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];}else if (bytes <= 256 * 1024) {return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];}else {assert(false);}return -1;}
2.5 ThreadCache类的设计
ThreadCache类的成员变量是一个存储208个自由链表的数组,成员函数包括申请内存对象,释放内存对象和从中心缓存获取对象。
总的哈希桶就有208个,用一个静态const成员来记录:
static const size_t NFREELIST = 208;
class ThreadCache
{
public:// 申请和释放内存对象void* Allocate(size_t size);void Deallocate(void* ptr, size_t size);// 从中心缓存获取对象void* FetchFromCentralCache(size_t index, size_t size);
private:FreeList _freeLists[NFREELIST];
};
2.5.1 Allocate
有了对齐规则之后,申请内存大小就比较容易。
此时空间,每一个ThreadCache都有一个哈希映射的自由链表,如果这个链表不为空,就弄一个出去。
如果这个自由链表下面对应的没有空的,就找下一层中心缓存去获取对象,获取alignSize大小对象。
void* ThreadCache::Allocate(size_t size)
{assert(size <= MAX_BYTES);size_t alignSize = SizeClass::RoundUp(size);size_t index = SizeClass::Index(size);if (!_freeLists[index].Empty()){return _freeLists[index].Pop();}else{return FetchFromCentralCache(index, alignSize);}
}
2.5.2 Deallocate
释放内存对象,释放后的内存对象,找对映射的自由链表桶,将对象插入进入。
void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);// 找对映射的自由链表桶,对象插入进入size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);
}
2.6 无锁访问
TLS–thread local storage:
linux gcc下 tls
// TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
通过静态的TLS 每个线程无锁的获取自己的专属的ThreadCache对象
如果pTLSThreadCache 为空,申请一个ThreadCache,获取这个对象内存
std::this_thread::get_id()可以获取pTLSThreadCache线程号
static void* ConcurrentAlloc(size_t size)
{// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象if (pTLSThreadCache == nullptr){pTLSThreadCache = new ThreadCache;}cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;return pTLSThreadCache->Allocate(size);
}
释放ThreadCache,断言一下是否为空,不为空就调用Deallocate(),而Deallocate()要有释放内存大小
static void ConcurrentFree(void* ptr, size_t size)
{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);
}
2.7 测试TLS
用C++11里面提供的线程相关的,在构造的时候给运行对象和可变参数。
创建多线程场景,申请字节
void Alloc1()
{for (size_t i = 0; i < 5; ++i){void* ptr = ConcurrentAlloc(6);}
}void Alloc2()
{for (size_t i = 0; i < 5; ++i){void* ptr = ConcurrentAlloc(7);}
}void TLSTest()
{std::thread t1(Alloc1);//创建一个线程t1.join();std::thread t2(Alloc2);t2.join();
}int main()
{TLSTest();return 0;
}
并行监视:
3. 附代码
3.1 Common.h
#pragma once#include <iostream>
#include <vector>
#include <thread>
#include <time.h>
#include <assert.h>
using std::cout;
using std::endl;static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;static void*& NextObj(void* obj)
{return *(void**)obj;
}// 管理切分好的小对象的自由链表
class FreeList
{
public:void Push(void* obj){assert(obj);// 头插//*(void**)obj = _freeList;NextObj(obj) = _freeList;_freeList = obj;}void* Pop(){assert(_freeList);// 头删void* obj = _freeList;_freeList = NextObj(obj);return obj;}bool Empty(){return _freeList == nullptr;}
private:void* _freeList = nullptr;
};// 计算对象大小的对齐映射规则
class SizeClass
{
public:// 整体控制在最多10%左右的内碎片浪费// [1,128] 8byte对齐 freelist[0,16)// [128+1,1024] 16byte对齐 freelist[16,72)// [1024+1,8*1024] 128byte对齐 freelist[72,128)// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)/*size_t _RoundUp(size_t size, size_t alignNum){size_t alignSize;if (size % alignNum != 0){alignSize = (size / alignNum + 1)*alignNum;}else{alignSize = size;}return alignSize;}*/// 1-8 static inline size_t _RoundUp(size_t bytes, size_t alignNum){return ((bytes + alignNum - 1) & ~(alignNum - 1));}static inline size_t RoundUp(size_t size){if (size <= 128){return _RoundUp(size, 8);}else if (size <= 1024){return _RoundUp(size, 16);}else if (size <= 8 * 1024){return _RoundUp(size, 128);}else if (size <= 64 * 1024){return _RoundUp(size, 1024);}else if (size <= 256 * 1024){return _RoundUp(size, 8 * 1024);}else{assert(false);return -1;}}/*size_t _Index(size_t bytes, size_t alignNum){if (bytes % alignNum == 0){return bytes / alignNum - 1;}else{return bytes / alignNum;}}*/// 1 + 7 8// 2 9// ...// 8 15// 9 + 7 16// 10// ...// 16 23static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}// 计算映射的哪一个自由链表桶static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);// 每个区间有多少个链static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128) {return _Index(bytes, 3);}else if (bytes <= 1024) {return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024) {return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];}else if (bytes <= 64 * 1024) {return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];}else if (bytes <= 256 * 1024) {return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];}else {assert(false);}return -1;}};
3.2 ObjectPool.h
#include "Common.h"#ifdef _WIN32
#include<windows.h>
#else
//
#endif// 定长内存池
//template<size_t N>
//class ObjectPool
//{};// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else// linux下brk mmap等
#endifif (ptr == nullptr)throw std::bad_alloc();return ptr;
}template<class T>
class ObjectPool
{
public:T* New(){T* obj = nullptr;// 优先把还回来内存块对象,再次重复利用if (_freeList){void* next = *((void**)_freeList);obj = (T*)_freeList;_freeList = next;}else{// 剩余内存不够一个对象大小时,则重新开大块空间if (_remainBytes < sizeof(T)){_remainBytes = 128 * 1024;//_memory = (char*)malloc(_remainBytes);_memory = (char*)SystemAlloc(_remainBytes >> 13);if (_memory == nullptr){throw std::bad_alloc();}}obj = (T*)_memory;size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);_memory += objSize;_remainBytes -= objSize;}// 定位new,显示调用T的构造函数初始化new(obj)T;return obj;}void Delete(T* obj){// 显示调用析构函数清理对象obj->~T();// 头插*(void**)obj = _freeList;_freeList = obj;}private:char* _memory = nullptr; // 指向大块内存的指针size_t _remainBytes = 0; // 大块内存在切分过程中剩余字节数void* _freeList = nullptr; // 还回来过程中链接的自由链表的头指针
};struct TreeNode
{int _val;TreeNode* _left;TreeNode* _right;TreeNode():_val(0), _left(nullptr), _right(nullptr){}
};void TestObjectPool()
{// 申请释放的轮次const size_t Rounds = 5;// 每轮申请释放多少次const size_t N = 100000;std::vector<TreeNode*> v1;v1.reserve(N);size_t begin1 = clock();for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i){v1.push_back(new TreeNode);}for (int i = 0; i < N; ++i){delete v1[i];}v1.clear();}size_t end1 = clock();std::vector<TreeNode*> v2;v2.reserve(N);ObjectPool<TreeNode> TNPool;size_t begin2 = clock();for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i){v2.push_back(TNPool.New());}for (int i = 0; i < N; ++i){TNPool.Delete(v2[i]);}v2.clear();}size_t end2 = clock();cout << "new cost time:" << end1 - begin1 << endl;cout << "object pool cost time:" << end2 - begin2 << endl;
}
3.3 ThreadCache.h
#pragma once#include "Common.h"class ThreadCache
{
public:// 申请和释放内存对象void* Allocate(size_t size);void Deallocate(void* ptr, size_t size);// 从中心缓存获取对象void* FetchFromCentralCache(size_t index, size_t size);
private:FreeList _freeLists[NFREELIST];
};// TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
3.4 ConcurrentAlloc.h
#pragma once#include "Common.h"
#include "ThreadCache.h"static void* ConcurrentAlloc(size_t size)
{// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象if (pTLSThreadCache == nullptr){pTLSThreadCache = new ThreadCache;}cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;return pTLSThreadCache->Allocate(size);
}static void ConcurrentFree(void* ptr, size_t size)
{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);
}
3.5 ThreadCache.cpp
#include "ThreadCache.h"void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{// ...return nullptr;
}void* ThreadCache::Allocate(size_t size)
{assert(size <= MAX_BYTES);size_t alignSize = SizeClass::RoundUp(size);size_t index = SizeClass::Index(size);if (!_freeLists[index].Empty()){return _freeLists[index].Pop();}else{return FetchFromCentralCache(index, alignSize);}
}void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);// 找对映射的自由链表桶,对象插入进入size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);
}
3.6 test.cpp
#include "ObjectPool.h"
#include "ConcurrentAlloc.h"void Alloc1()
{for (size_t i = 0; i < 5; ++i){void* ptr = ConcurrentAlloc(6);}
}void Alloc2()
{for (size_t i = 0; i < 5; ++i){void* ptr = ConcurrentAlloc(7);}
}void TLSTest()
{std::thread t1(Alloc1);t1.join();std::thread t2(Alloc2);t2.join();
}int main()
{//TestObjectPool();TLSTest();return 0;
}
有问题请指出,大家一起进步!!!