一、什么是内存池
内存池(Memory Pool) 是一种动态内存分配与管理技术。
通常情况下,程序员习惯直接使用new、delete、malloc、free 等API申请分配和释放内存,这样导致的后果是:当程序长时间运行时,由于所申请内存块的大小不定,频繁使用时会造成大量的内存碎片从而降低程序和操作系统的性能。
内存池则是在真正使用内存之前,先申请分配一大块内存(内存池)留作备用,当程序员申请内存时,从池中取出一块动态分配,当程序员释放内存时,将释放的内存再放入池内,再次申请池可以 再取出来使用,并尽量与周边的空闲内存块合并。若内存池不够时,则自动扩大内存池,从操作系统中申请更大的内存池。
由于现在硬件条件已经很成熟,大多数运行环境都是多核的,为了提高效率,则高并发这一情况应运而生,对于高并发内存池,则是基于多线程并发申请使用的一个内存池称为高并发内存池。
二、项目介绍
本项目参考了谷歌 tcmalloc 设计模式,设计实现了高并发的内存池。
基于 Windows10 环境 VS2022,采用 C++进行编程,池化技术、多线程、TLS、单例模式、互斥锁、链表、哈希等数据结构。
该项目利用了 thread cache、central、cache、page cache 三级缓存结构,基于多线程申请释放内存的场景,最大程度提高了效率,解决了绝大部分内存碎片问题。
三、项目实现
3.1 设计目标
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。所以这次我们实现的内存池需要考虑以下几方面的问题。
- 内存碎片问题。
- 性能问题。
- 多核多线程环境下,锁竞争问题。
3.2 项目框架
concurrent memory pool主要由线程缓存(threadcache)、中心缓存(centralcache)、页缓存(pagecache)3个部分构成,如下图
thread cache
为了保证效率,我们使用线程局部存储(thread local storage,TLS)技术保存每个线程本地的ThreadCache的指针,这样大部分情况下申请释放内存是不需要锁的,线程缓存是每个线程独有的,用于小于256k的内存的分配。线程从这里申请内存不需要加锁,每个线程独享一个cache,这也就是这个并发线程池高效的地方。
class ThreadCache
{
public:// 申请和释放内存对象void* Allocate(size_t size);void Deallocate(void* ptr, size_t size);// 从中心缓存获取对象void* FetchFromCentralCache(size_t index, size_t size);// 释放对象时,链表过⻓时,回收内存回到中⼼缓存void ListTooLong(FreeList& list, size_t size);
private:FreeList _freeLists[NFREELIST];
};// TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
该结构每个位置存放一个FreeList,每个自由链表下都可以挂自己的内存块。
申请内存:
- 当内存申请size<=64k时在thread cache中申请内存,计算size在自由链表中的位置,如果自由链表中有内存对象时,直接从FistList[i]中Pop一下对象,时间复杂度是O(1),且没有锁竞争。
- 当FreeList[i]中没有对象时,则批量从central cache中获取一定数量的对象,插入到自由链表并返回一个对象。
释放内存:
- 当释放内存小于64k时将内存释放回thread cache,计算size在自由链表中的位置,将对象Push到FreeList[i].
- 当链表的长度过长,则回收一部分内存对象到central cache。
central cache
中心缓存是所有线程共享,本质是由一个哈希映射的span对象自由链表构成thread cache是按需从central cache中获取的对象。
central cache周期性的回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧。达到内存分配在多个线程中更均衡的按需调度的目的。
central cache是存在竞争的,所以从这里取内存对象是需要加锁,不过一般情况下在这里取内存对象的效率非常高,所以这里竞争不会很激烈。
这里要注意,要保证centralcache是全局唯一的,这里我们需要将centralcache类设计成单例模式。
// 单例模式——饿汉模式
class CentralCache
{
public:static CentralCache* GetInstance(){return &_sInst;}// 获取⼀个非空的spanSpan* GetOneSpan(SpanList& list, size_t size);// 从中⼼缓存获取⼀定数量的对象给thread cachesize_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);// 将⼀定数量的对象释放到span跨度void ReleaseListToSpans(void* start, size_t size);
private:SpanList _spanLists[NFREELIST];private:CentralCache(){}CentralCache(const CentralCache&) = delete;static CentralCache _sInst;
};
申请内存:
- 当thread cache中没有内存时,就会批量向central cache申请一些内存对象,central cache也有一个哈希映射的freelist,freelist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的。
- central cache中没有非空的span时,则将空的span链在一起,向page cache申请一个span对象,span对象中是一些以页为单位的内存,切成需要的内存大小,并链接起来,挂到span中。
- central cache的span中有一个use_count,分配一个对象给thread cache,就use_count++。
释放内存:
- 当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时use_count--。当use_count减到0时则表示所有对象都回到了span,则将span释放回pagecache,page cache中会对前后相邻的空闲页进行合并。
page cache
页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。
page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
class PageCache
{
public:static PageCache* GetInstance(){return &_sInst;}// 获取从对象到span的映射Span* MapObjectToSpan(void* obj);// 释放空闲span回到Pagecache,并合并相邻的spanvoid ReleaseSpanToPageCache(Span* span);// 获取一个k页的spanSpan* NewSpan(size_t k);std::mutex _pageMtx;
private:SpanList _spanLists[NPAGES];ObjectPool<Span> _spanPool;//std::unordered_map<PageID, Span*> _idSpanMap;//std::map<PageID, Span*> _idSpanMap;TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;PageCache(){}PageCache(const PageCache&) = delete;static PageCache _sInst;
};
申请内存:
- 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是4page,4page后面没有挂span,则向后面寻找更大的span,假设在10page位置找到一个span,则将10page span分裂为一个4page span和一个6page span。
- 如果找到128 page都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式申请128page span挂在自由链表中,再重复1中的过程。
- 需要注意的是central cache和page cache 的核⼼结构都是spanlist的哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache⼀样的⼤⼩对⻬关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成⼩块内存的⾃由链表。⽽page cache 中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i⻚内存。
释放内存:
- 如果central cache释放回一个span,则依次寻找span的前后page id的span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。
映射关系、FreeList、SpanList
// 自由链表的哈希桶跟对象大小的映射关系
// 小于等于MAX_BYTES,就找thread cache申请
// 大于MAX_BYTES,就直接找page cache或者系统堆申请
static const size_t MAX_BYTES = 256 * 1024;
// thread cache 和 central cache自由链表哈希桶的表大小
static const size_t NFREELIST = 208;
// page cache 管理span list哈希表⼤⼩
static const size_t NPAGES = 129;
// 页大小转换偏移,即一页定义为2^13,也就是8KB
static const size_t PAGE_SHIFT = 13;// ⻚编号类型,32位下是4byte类型,64位下是8byte类型
#ifdef _WIN64
typedef unsigned long long PageID;
#elif _WIN32
typedef size_t PageID;
#endif// 直接去堆上按⻚申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage * (1 << 12), MEM_COMMIT | MEM_RESERVE,PAGE_READWRITE);
#else// linux下brk mmap等
#endifif (ptr == nullptr){throw std::bad_alloc();}return ptr;
}inline static void SystemFree(void* ptr)
{
#ifdef _WIN32VirtualFree(ptr, 0, MEM_RELEASE);
#else// sbrk unmmap等
#endif
}// 获取内存对象中存储的头4 or 8字节值,即链接的下⼀个对象的地址
static void*& NextObj(void* obj)
{return *(void**)obj;
}
// 管理切分好的小对象的自由链表
class FreeList
{
public:void Push(void* obj){assert(obj);// 头插//*(void**)obj = _freeList;NextObj(obj) = _freeList;_freeList = obj;_size++;}void* Pop(){assert(_freeList);// 头删void* obj = _freeList;_freeList = NextObj(obj);_size--;return obj;}void PushRange(void* start, void* end,size_t n){NextObj(end) = _freeList;_freeList = start;_size += n;}void PopRange(void*& start, void*& end, size_t n){assert(n <= _size);start = _freeList;end = start;for (size_t i = 0; i < n - 1; i++){end = NextObj(end);}_freeList = NextObj(end);NextObj(end) = nullptr;_size -= n;}bool Empty(){return _freeList == nullptr;}size_t& MaxSize(){return _maxSize;}size_t Size(){return _size;}private:void* _freeList = nullptr;size_t _maxSize = 1;size_t _size = 0;
};// 管理对齐和映射等关系
class SizeClass
{
public:// 整体控制在最多10%左右的内存碎片浪费// [1,128] 8byte对⻬ freelist[0,16)// [128+1,1024] 16byte对⻬ freelist[16,72)// [1024+1,8*1024] 128byte对⻬ freelist[72,128)// [8*1024+1,64*1024] 1024byte对⻬ freelist[128,184)// [64*1024+1,256*1024] 8*1024byte对⻬ freelist[184,208)/*size_t _RoundUp(size_t bytes, size_t align){size_t alignSize;if (bytes % align != 0){alignSize = (bytes / align + 1) * align;}else{alignSize = bytes;}return alignSize;}*/static inline size_t _RoundUp(size_t bytes, size_t align){return ((bytes+align - 1) & ~(align - 1));}// 对齐大小计算static inline size_t RoundUp(size_t bytes){if (bytes <= 128){return _RoundUp(bytes, 8);}else if (bytes <= 1024){return _RoundUp(bytes, 16);}else if (bytes <= 8 * 1024){return _RoundUp(bytes, 128);}else if (bytes <= 64 * 1024){return _RoundUp(bytes, 1024);}else if (bytes <= 256 * 1024){return _RoundUp(bytes, 8 * 1024);}else{return _RoundUp(bytes, 1 << PAGE_SHIFT);}}/*static inline size_t _Index(size_t bytes, size_t align_shift){if (bytes % align_shift == 0){return bytes / align_shift - 1;}else{return bytes / align_shift;}}*/static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes + (static_cast<unsigned long long>(1) << align_shift) - 1) >> align_shift) - 1;}// 计算映射的哪一个自由链表桶static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);// 每个区间有多少个链static int group_array[4] = { 16,56,56,56 };if (bytes <= 128){return _Index(bytes, 3);}else if (bytes <= 1024){return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024){return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];}else if (bytes <= 64 * 1024){return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];}else if (bytes <= 256 * 1024){return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];}else{assert(false);}return -1;}// ⼀次thread cache从中⼼缓存获取多少个static size_t NumMoveSize(size_t size){assert(size > 0);// [2, 512],⼀次批量移动多少个对象的(慢启动)上限值// ⼩对象⼀次批量上限⾼// ⼩对象⼀次批量上限低size_t num = MAX_BYTES / size;if (num < 2){num = 2;}if (num > 512){num = 512;}return num;}// 计算⼀次向系统获取⼏个⻚// 单个对象 8byte// ...// 单个对象 256KBstatic size_t NumMovePage(size_t size){size_t num = NumMoveSize(size);size_t npage = num * size;npage >>= PAGE_SHIFT;if (npage == 0){npage = 1;}return npage;}
};// Span管理一个跨度的大块内存
// 管理以页为单位的大块内存
// 管理多个连续页大块内存跨度结构
struct Span
{PageID _pageId = 0; // ⼤块内存起始⻚的⻚号size_t _n = 0; // ⻚的数量Span* _next = nullptr; // 双向链表的结构Span* _prev = nullptr;size_t _objSize = 0; // 切好的小对象的大小size_t _useCount = 0; // 切好小块内存,被分配给thread cache的计数void* _freeList = nullptr; // 切好的小块内存的自由链表bool _isUse = false; // 是否在被使用
};// 带头双向循环链表
class SpanList
{
public:SpanList(){_head = new Span;_head->_next = _head;_head->_prev = _head;}Span* Begin(){return _head->_next;}Span* End(){return _head;}bool Empty(){return _head->_next == _head;}void PushFront(Span* span){Insert(Begin(), span);}Span* PopFront(){Span* front = _head->_next;Erase(front);return front;}void Insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;prev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos){assert(pos);assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}private:Span* _head;
public:std::mutex _mtx; // 桶锁
};
四、项目测试
测试时一定要确保编译器在release情况下,而不是debug!!!
五、项目总结
【优点】
- 增加动态申请的效率
- 减少陷入内核的次数
- 减少系统内存碎片
- 提升内存使用率
- 尽量减少锁竞争
- 应用于多核多线程场景
【不足】
- 当前实现的项目中并没有完全脱离malloc,比如在内存池自身数据结构的管理中,如SpanList中的span等结构,还是使用的new Span,new的底层使用的是malloc,所以还不足以替换malloc,因为我们本身没有完全脱离它。
- 平台及兼容性。linux等系统下面,需要将VirtualAlloc替换为brk等。
六、项目源码
ConcurrentMemoryPool/ConcurrentMemoryPool · 胡家豪/项目 - 码云 - 开源中国 (gitee.com)