目录
- 1. 项目介绍
- 1.1 这个项目具体功能是什么?
- 1.2 本项目的知识储备
- 2. 什么是内存池
- 2.1 池化技术
- 2.2 内存池主要解决的问题
- 2.3 malloc
- 3. 定长内存池设计
- 4. 高并发内存池整体框架设计
- 4.1 Thread Cache的设计思路
- 4.2 Central Cache的设计思路
- 4.3 Page Cache的设计思路
- 5. 项目具体实现
- 5.1 上述三个模块公共类的实现
- 5.2 定长内存池的实现
- 5.3 Thread cache模块的实现
- 5.4 TLS--thread local storage的实现
- 5.5 Central cache模块的实现
- 5.6 Page cache模块的实现
- 6. 申请内存的流程解析
- 6.1 申请内存和计算对齐大小和所属哈希桶详解
- 6.2 慢反馈算法来获得要需要申请对象的数量
- 6.3 从CentralCache中获得对象
- 6.4 从PageCache中获取对象
- 6.5 切分申请好的span
- 6.6 从span中取batchNum个对象
- 7. 释放内存的流程解析
- 7.1 获取对象到span的映射
- 7.2 将对象插入自由链表桶
- 7.3 释放对象到CentralCache中
- 7.4 释放对象到PageCache
- 8. 代码测试
- 9. 性能分析以及优化
- 9.1 性能分析
- 9.2 性能优化
- 10. 项目总结
1. 项目介绍
1.1 这个项目具体功能是什么?
当前项目是实现⼀个高并发的内存池,他的原型是google的⼀个开源项目tcmalloc,tcmalloc全称 Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内 存分配相关的函数(malloc、free)。
本项目是把tcmalloc最核心的框架简化后拿出来,模拟实现出⼀个自己的高并发内存池,目的就是学习tcamlloc的精华。
1.2 本项目的知识储备
本项目会用到C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁 等等方面的知识。
2. 什么是内存池
2.1 池化技术
- 池化技术,就是程序先向系统申请过量的资源,然后自己管理,当程序中需要申请内存时,不是直接向操作系统申请,而是直接从内存池中获取,释放内存时也不是将内存返回给操作系统,而是返回内存池中。这就是内存池的主要功能。
- 因为每次申请该资源都有较大的开销,这样提前申请好了,使用时就会非常快捷,能够大大提高程序运行效率。
- 在计算机中有很多使用这种池技术的地方,例如线程池、连接池等。
- 以服务器上的线程池为例,它的主要思想是:先启动若⼲数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。
2.2 内存池主要解决的问题
内存池主要解决的肯定就是效率的问题,其次如果作为系统的内存分配器的角度,还需要解决⼀下内存碎片的问题。那么什么是内存碎片呢?
内存碎片分为外碎片和内碎片。外部碎片是⼀些空闲的连续内存区域太小,这些内存空间不连续,以至于合计的内存足够,但是不能满足⼀些的内存分配
申请需求。内部碎片是由于⼀些对齐的需求,导致分配出去的空间中⼀些内存无法被利用。
2.3 malloc
C++中动态申请内存都是通过malloc去申请的,但实际上我们并不是直接去堆中获取内存的,而malloc就是一个内存池。malloc() 相当于向系统 “批发” 了一块较大的内存空间,然后“零售” 给程序使用,当全部使用完或者程序有大量内存需求时,再根据需求向操作系统申请内存。
一文了解,Linux内存管理,malloc、free 实现原理
3. 定长内存池设计
(1)开辟内存:
- 使用malloc开辟一大块内存,让_memory指针指向这个大块内存
- _memory 设置为char* 类型,是为了方便切割时_memory向后移动多少字节数。
(2)申请内存:
- 将_memory强转为对应类型,然后赋值给对方,_memory指针向后移动对应字节数即可。
- 如果有存在已经切割好的小块内存,则优先使用小块内存。
(3)释放内存:
- 用类型链表的结构来进行存储。
- 用当前小块内存的头4字节存储下一个小块内存的地址,最后用_freeList指针指向第一个小块内存的地址(并不是将内存释放给操作系统)
- 所以开辟内存时,开辟的内存大小必须大于或等于一个指针类型的大小。
(4)代码实现:
#pragma once
#include <iostream>
#include <vector>using std::cout;
using std::endl;template<class T>
class ObjectPool
{
public:T* New(){T* obj = nullptr;// 如果⾃由链表有对象,直接取⼀个if(_freeList){obj = (T*)_freeList;_freeList = *((void**)_freeList);}else{if(_leftBytes < sizeof(T)){_leftBytes = 128 * 1024;_memory = (char*)malloc(_leftBytes);if(_memory == nullptr){throw std::bad_alloc();}}obj = (T*)_memory;//根据编译器确定指针类型大小size_t objsize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);_memory += objsize;_memory -= objsize;}// 使⽤定位new调⽤T的构造函数初始化new(obj)T;return obj;}void Delete(T* obj){obj->~T();// 头插到freeList*((void**)obj) = _freeList;_freeList = obj;}private:char* _memory = nullptr; // 指向内存块的指针void* _freeList = nullptr; // 管理还回来的头指针size_t _leftBytes = 0; // 内存块中剩余字节大小};//以下是测试性能的代码
struct TreeNode
{int _val;TreeNode *_left;TreeNode *_right;TreeNode(): _val(0), _left(nullptr), _right(nullptr){}
};void TestObjectPool()
{// 申请释放的轮次const size_t Rounds = 3;// 每轮申请释放多少次const size_t N = 100000;size_t begin1 = clock();std::vector<TreeNode *> v1;v1.reserve(N);for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i){v1.push_back(new TreeNode);}for (int i = 0; i < N; ++i){delete v1[i];}v1.clear();}size_t end1 = clock();ObjectPool<TreeNode> TNPool;size_t begin2 = clock();std::vector<TreeNode *> v2;v2.reserve(N);for (size_t j = 0; j < Rounds; ++j){for (int i = 0; i < N; ++i){v2.push_back(TNPool.New());}for (int i = 0; i < 100000; ++i){TNPool.Delete(v2[i]);}v2.clear();}size_t end2 = clock();cout << "new cost time:" << end1 - begin1 << endl;cout << "object pool cost time:" << end2 - begin2 << endl;
}
(5)多次运行结果:
(6)代码解析:
4. 高并发内存池整体框架设计
(1)现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc 本身其实已经很优秀,那么我们项目的原型 tcmalloc 就是在多线程高并发的场景下更胜一筹,所以我们实现的内存池需要考虑以下几方面的问题。
- 性能问题。
- 内存碎片问题。
- 多线程环境下,锁竞争问题。
(2)高并发内存池主要由如下3个部分组成:
- Thread Cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个 Cache,这也就是这个并发线程池高效的地方。
- Central Cache:中心缓存是所有线程所共享,Thread Cache 是按需从 Central Cache 中获取的对象。Central Cache 合适的时机回收 Thread Cache 中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。Central Cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有 Thread Cache 的没有内存对象时才会找 Central Cache,所以这里竞争不会很激烈。
- Page Cache:页缓存是在 Central Cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,Central Cache没有内存对象时,从 Page Cache 分配出一定数量的 page,并切割成定长大小的小块内存,分配给 Central Cache。当一个 span 的几个跨度页的对象都回收以后,Page Cache 会回收 Central Cache 满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
4.1 Thread Cache的设计思路
(1)Thread Cache 是哈希桶结构,每个桶是一个按桶位置映射对应内存块对象大小的 FreeList 自由链表。采用 TLS 技术使每个线程都有一个 Thread Cache 对象,这样每个线程在这里获取对象和释放对象时是无锁的。
(2)申请内存的过程:
- 当内存申请size<=256KB,先获取到线程本地存储的thread cache对象,计算size映射的哈希桶自由链表下标i。
- 如果自由链表_freeLists[i]中有对象,则直接Pop⼀个内存对象返回。
- 如果_freeLists[i]中没有对象时,则批量从central cache中获取⼀定数量的对象,插⼊到自由链表并返回⼀个对象。
- 线程申请内存时,会有不同规格的内存申请(4字节、5字节等),根据范围划定不同的自由链表,设计多个自由链表管理不同规格的内存小块。实质就相当于使用多个定长内存池的自由链表。
(3)内存对齐规则:
- 每个内存小块采用向上对齐原则(可能会出现多申请内存的情况,这就是内碎片)例如:
- 需要9字节,则开辟一个大小为2个8字节的空间的节点
- 需要100字节,则开辟一个大小为13个8字节的空间的节点。
- 整体控制在最多10%左右的内碎片浪费,总计设计208个桶:
申请的内存数 | 对齐数 | 哈希桶分区 |
---|---|---|
[1, 128] | 8 byte | freelist[0, 16) |
[128+1, 1024] | 16 byte | freelist[16, 72) |
[1024+1, 8*1024] | 128 byte | freelist[72, 128) |
[81024+1, 641024] | 1024 byte | freelist[128, 184) |
[641024+1, 2561024] | 8192 byte | freelist[184, 208) |
- 注意:_freeLists是一个数组,每个元素都是自由链表类型(即存储自由链表的头结点)
(4)释放内存的过程:
- 当释放内存小于256k时将内存释放回thread cache,计算size映射自由链表桶位置i,将对象Push到_freeLists[i]。
- 具体方法是:用切分好的小块内存的前4字节或8字节来存储下一个小块内存的地址。插入节点时,采用头插的方式。
- 当链表的长度过长,则回收⼀部分内存对象到central cache。
(6)TLS–thread local storage:
- TLS:thread local storage 线程本地存储(linux和Windows下有各自的TLS)
- TLS是一种变量的存储方式,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问,这样就保证了线程的独立性。
- 静态TLS使用方法:static __thread ThreadCache* pTLSThreadCache = nullptr;
- 声明了一个 __thread 类型的变量,会为每一个线程创建一个单独的拷贝。
原理:
- 在x86 CPU上,将为每次引用的静态TLS变量生成3个辅助机器指令
- 如果在进程中创建子线程,那么系统将会捕获它并且自动分配一另一个内存块,以便存放新线程的静态TLS变量。
TLS–thread local storage文章:https://zhuanlan.zhihu.com/p/142418922
(5)Thread Cache代码整体框架:
#pragma once
#include "Common.hpp"
#include "CentralCache.hpp"class ThreadCache
{
public:// 申请内存对象void* Allocate(size_t size);// 释放内存对象void Deallocate(void* ptr, size_t size);// 从中心缓存获取对象void* FetchFromCentralCache(size_t index, size_t size);// 当释放对象而链表过长时,将回收的内存还给central cachevoid ListTooLong(FreeList& list, size_t size);private:FreeList _freelists[NFREELIST];};// TLS thread local storage
static __thread ThreadCache* pTLSThreadCache = nullptr;
4.2 Central Cache的设计思路
(1)Central cache也是一个哈希桶结构,每个哈希桶位置挂载的是SpanList自由链表结构。Span管理的是以页为单位的大块内存(一页为8kb(32位系统下))。每个Span中的大内存根据映射关系被切成了一个个的小块内存对象,然后挂载在Span上。因为中心缓存是所有线程共享的,只需要定义一个对象,所以这里需要将 Central cache 设计为单例模式(这里采用饿汉模式的设计方法)。
注意:
- span是双向链表,而span下挂载的小块内存对象是单链表。
- 中心缓存需要加桶锁。
- _spanLists 是一个数组,数组中每个元素都是一个span自由链表的_head头指针。
- 每个span又是一个单向自由链表。
(2)申请内存的过程:
- 当 Thread Cache 中没有内存时,就会批量向 Central Cache 申请一些内存对象,这里的批量获取对象的数量使用了类似网络 tcp 协议拥塞控制的慢开始算法;Central Cache 也有一个哈希映射的 SpanList,SpanList 中挂着 span,从span中取出对象给 Thread Cache,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。
- Central Cache 映射的 SpanList 中所有 span 的都没有内存以后,则需要向 Page Cache 申请一个新的 span 对象,拿到 span 以后将 span 管理的内存按大小切好作为自由链表链接到一起。然后从 span 中取对象给 Thread Cache。
- Central Cache 中挂的 span 中 use_count 记录分配了多少个对象出去,分配一个对象给Thread Cache,就 ++use_count。
(3)释放内存的过程:
- 当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时就- -use_count。
- 当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。
(4)CentralCache 代码整体框架:
#pragma once
#include "Common.hpp"
#include "PageCache.hpp"// 单例模式
class CentralCache
{
public:static CentralCache* GetInstance(){return &_sInst;}// 获取一个非空的spanSpan* GetOneSpan(SpanList& list, size_t size);// 从中心缓存获取一定数量的对象给thread cachesize_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);// 将⼀定数量的对象释放到span跨度void ReleaseListToSpans(void* start, size_t byte_size);private:CentralCache(){}CentralCache(const CentralCache&) = delete;static CentralCache _sInst;SpanList _spanLists[NFREELIST];};
4.3 Page Cache的设计思路
(1)Page cache中也是哈希桶结构,但是每个节点存储的都是span。因为页缓存是所有线程共享的,只需要定义一个对象,所以这里将 page cache 设计为单例模式(这里采用饿汉模式的设计方法)。
注意:Page cache需要加整体锁(因为是所有线程共享的)。
(2)申请内存的过程:
- 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找⼀个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后⾯没有挂 span,则向后⾯寻找更大的span,假设在10页page位置找到⼀个span,则将10页page span分裂 为⼀个4页page span和⼀个6页page span。
- 如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等放式申请128页page span挂在自由链表中,再重复前面的过程。
- 需要注意的是central cache和page cache 的核心结构都是spanlist的哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache⼀样的大小对齐关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。
(3)释放内存的过程:
- 如果central cache释放回⼀个span,则依次寻找span的前后page id的没有在使用的空闲span, 看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。
(4)Page Cache 代码整体框架:
#pragma once
#include "Common.hpp"class PageCache
{
public:static PageCache* GetInstance(){return &_sInst;}// 获取一个K页的spanSpan* NewSpan(size_t k);// 获取从对象到span的映射Span* MapObjectToSpan(void* obj);// 释放空闲span回到给page cache,并合并相邻的spanvoid ReleaseSpanToPageCache(Span* span);public:std::mutex _pageMtx;private:SpanList _spanLists[NPAGES];PageCache(){}PageCache(const PageCache&) = delete;std::unordered_map<PAGE_ID, Span*> _idSpanMap;static PageCache _sInst;};
5. 项目具体实现
5.1 上述三个模块公共类的实现
(1)主要实现的功能:
- 申请空间函数。
- 管理切分好的小对象的自由链表。
- Sizeclass类:功能是实现内存对齐以及计算映射到哪个自由链表桶。
- Span类的实现。
- SpanList类用来链接Span类。
(2)具体实现:
#pragma once
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <ctime>
#include <cassert>
#include <unordered_map>
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>using std::cout;
using std::endl;static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;
static const size_t NPAGES = 129;
static const size_t PAGE_SHIFT = 13;typedef unsigned long long PAGE_ID;// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#elsevoid *ptr = mmap(NULL, kpage << 13, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
#endifif (ptr == nullptr){throw std::bad_alloc();}return ptr;
}inline static void SystemFree(void* ptr, size_t kpage)
{
#ifdef _WIN32 //windowsVirtualFree(ptr, 0, MEM_RELEASE);
#else //linuxmunmap(ptr, kpage << 13);
#endif
}static void*& NextObj(void* obj)
{return *(void**)obj;
}// 管理切分好的小对象的自由链表
class FreeList
{
public:void push(void* obj){assert(obj);//*(void**)obj = _freeList;NextObj(obj) = _freeList;_freeList = obj;_size++;}void PushRange(void* start, void* end, size_t n){//*(void**)end = _freeList;NextObj(end) = _freeList;_freeList = start;_size += n;}void PopRange(void*& start, void*& end, size_t n){assert(n <= _size);start = _freeList;end = start;for(int i = 0; i < n - 1; i++){end = NextObj(end);}_freeList = NextObj(end);NextObj(end) = nullptr;_size -= n;}void* pop(){assert(_freeList);void* ret = _freeList;_freeList = NextObj(ret);_size--;//_freeList = *(void**)_freeList;return ret;}bool empty(){return _freeList == nullptr;}size_t& GetmaxSize(){return _maxSize;}size_t Getsize(){return _size;}private:void* _freeList = nullptr;size_t _maxSize = 1;size_t _size = 0;};class Sizeclass
{
public:// 整体控制在最多10%左右的内碎片浪费// [1,128] 8byte对齐 freelist[0,16)// [128+1,1024] 16byte对齐 freelist[16,72)// [1024+1,8*1024] 128byte对齐 freelist[72,128)// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)static inline size_t _RoundUp(size_t bytes, size_t alignNum){return ((bytes + alignNum - 1) & ~(alignNum - 1));}static size_t RoundUp(size_t size){if(size <= 128){return _RoundUp(size, 8);}else if(size <= 1024){return _RoundUp(size, 16);}else if(size <= 8 * 1024){return _RoundUp(size, 128);}else if(size <= 64 * 1024){return _RoundUp(size, 1024);}else if(size <= 256 * 1024){return _RoundUp(size, 8 * 1024);}else{return _RoundUp(size, 1 << PAGE_SHIFT);}}static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}// 计算映射的哪一个自由链表桶static size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);// 每个区间有多少个链节static int group_array[4] = { 16, 56, 56, 56 };if(bytes <= 128){return _Index(bytes, 3);}else if(bytes <= 1024){return _Index(bytes - 128, 4) + group_array[0];}else if(bytes <= 8 * 1024){return _Index(bytes - 1024, 7) + group_array[0] + group_array[1];}else if(bytes <= 64 * 1024){return _Index(bytes - 8 * 1024, 10) + group_array[0] + group_array[1] + group_array[2];}else if(bytes <= 256 * 1024){return _Index(bytes - 64 * 1024, 13) + group_array[0] + group_array[1] + group_array[2] + group_array[3];}else{assert(false);return -1;}}// 一次thread cache从中心缓存获取多少个static size_t NumMoveSize(size_t size){assert(size > 0);// [2, 512],一次批量移动多少个对象的(慢启动)上限值// 小对象一次批量上限高// 小对象一次批量上限低int num = MAX_BYTES / size;if (num < 2){num = 2;}if (num > 512){num = 512;}return num;}// 计算一次向系统获取几个页// 单个对象 8byte// ...// 单个对象 256KBstatic size_t NumMovePage(size_t size){size_t num = NumMoveSize(size);size_t npage = num*size;npage >>= PAGE_SHIFT;if (npage == 0){npage = 1;}return npage;}
};// 管理多个连续页大块内存跨度结构
struct Span
{PAGE_ID _pageId = 0; // 大块内存起始页的页号size_t _n = 0; // 页的数量Span* _next = nullptr; // 双向链表的结构Span* _prev = nullptr;size_t _objSize = 0; // 切好的小对象的大小size_t _useCount = 0; // 切好小块内存,被分配给thread cache的计数void* _freeList = nullptr; // 切好的小块内存的自由链表bool _isUse = false;
};class SpanList
{
public:SpanList(){_head = new Span();_head->_next = _head;_head->_prev = _head;}Span* Begin(){return _head->_next;}Span* End(){return _head;}bool empty(){return _head->_next == _head;}void pushfront(Span* span){insert(Begin(), span);}Span* popfront(){Span* front = _head->_next;Erase(front);return front;}void insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;prev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos){assert(pos);assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}public:std::mutex _mutex; // 桶锁private:Span* _head;};
5.2 定长内存池的实现
(1)为什么需要定长内存池?
- 因为在本项目中需要用到new这个函数,new本质上还是malloc,所以在本项目当中需要用到new的地方会减少一点性能,所以使用定长内存池可以提高一点性能。
(2)具体实现:
#pragma once
#include "Common.hpp"template<class T>
class ObjectPool
{
public:T* New(){T* obj = nullptr;// 优先把还回来内存块对象,再次重复利用if (_freeList){void* next = *((void**)_freeList);obj = (T*)_freeList;_freeList = next;}else{// 剩余内存不够一个对象大小时,则重新开大块空间if (_remainBytes < sizeof(T)){_remainBytes = 128 * 1024;//_memory = (char*)malloc(_remainBytes);_memory = (char*)SystemAlloc(_remainBytes >> 13);if (_memory == nullptr){throw std::bad_alloc();}}obj = (T*)_memory;size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);_memory += objSize;_remainBytes -= objSize;}// 定位new,显示调用T的构造函数初始化new(obj)T;return obj;}void Delete(T* obj){// 显示调用析构函数清理对象obj->~T();// 头插*(void**)obj = _freeList;_freeList = obj;}private:char* _memory = nullptr; // 指向大块内存的指针size_t _remainBytes = 0; // 大块内存在切分过程中剩余字节数void* _freeList = nullptr; // 还回来过程中链接的自由链表的头指针
};
5.3 Thread cache模块的实现
(1)实现的主要功能:
- 申请和释放内存对象。
- 当空间不足时从中心缓存获取对象。
(2)具体实现:
#pragma once
#include "Common.hpp"
#include "CentralCache.hpp"class ThreadCache
{
public:// 申请内存对象void* Allocate(size_t size){assert(size <= MAX_BYTES);size_t alignSize = Sizeclass::RoundUp(size);size_t index = Sizeclass::Index(size);if(!_freelists[index].empty()){return _freelists[index].pop();}else{return FetchFromCentralCache(index, alignSize);}}// 释放内存对象void Deallocate(void* ptr, size_t size){assert(ptr);assert(size <= MAX_BYTES);// 找对映射的自由链表桶,对象插入进入size_t index = Sizeclass::Index(size);_freelists[index].push(ptr);// 当链表长度大于一次批量申请的内存时就开始还一段list给central cacheif (_freelists[index].Getsize() >= _freelists[index].GetmaxSize()){ListTooLong(_freelists[index], size);}}void ListTooLong(FreeList& list, size_t size){void* start = nullptr;void* end = nullptr;list.PopRange(start, end, list.GetmaxSize());CentralCache::GetInstance()->ReleaseListToSpans(start, size);}// 从中心缓存获取对象void* FetchFromCentralCache(size_t index, size_t size){// 慢开始反馈调节算法// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完// 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限// 3、size越大,一次向central cache要的batchNum就越小// 4、size越小,一次向central cache要的batchNum就越大size_t batchNum = std::min(_freelists[index].GetmaxSize(), Sizeclass::NumMoveSize(size));if(_freelists[index].GetmaxSize() == batchNum){_freelists[index].GetmaxSize() += 1;}void* start = nullptr;void* end = nullptr;size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);assert(actualNum >= 1);if (actualNum == 1){assert(start == end);return start;}else{_freelists[index].PushRange(NextObj(start), end, actualNum - 1);return start;}}private:FreeList _freelists[NFREELIST];};// TLS thread local storage
static __thread ThreadCache* pTLSThreadCache = nullptr;
5.4 TLS–thread local storage的实现
(1)实现的主要功能:
- 实现不同的线程调用自己专属的Thread cache模块。
(2)具体实现:
#pragma once
#include "Common.hpp"
#include "ThreadCache.hpp"
#include "PageCache.hpp"
#include "ObjectPool.hpp"static void* ConcurrentAlloc(size_t size)
{if(size > MAX_BYTES){size_t alignSize = Sizeclass::RoundUp(size);size_t kpage = alignSize >> PAGE_SHIFT;PageCache::GetInstance()->_pageMtx.lock();Span* span = PageCache::GetInstance()->NewSpan(kpage);span->_objSize = size;PageCache::GetInstance()->_pageMtx.unlock();void* ptr = (void*)(span->_pageId << PAGE_SHIFT);return ptr;}else{if(pTLSThreadCache == nullptr){static ObjectPool<ThreadCache> tcPool;//pTLSThreadCache = new ThreadCache;pTLSThreadCache = tcPool.New();}//cout << std::this_thread::get_id() << " " << ":" << " " << pTLSThreadCache << endl;return pTLSThreadCache->Allocate(size);}
}static void ConcurrentFree(void* ptr)
{Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);size_t size = span->_objSize;if(size > MAX_BYTES){PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();}else{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);}
}
5.5 Central cache模块的实现
(1)实现的主要功能:
- 从中心缓存获取一定数量的对象给Thread cache。
- 获取一个非空的Span,当Span不足时从Page cache中获取。
(2)具体实现:
#pragma once
#include "Common.hpp"
#include "PageCache.hpp"// 单例模式
class CentralCache
{
public:static CentralCache* GetInstance(){return &_sInst;}// 获取一个非空的spanSpan* GetOneSpan(SpanList& list, size_t size){// 查看当前的spanlist中是否有还有未分配对象的spanSpan* iter = list.Begin();while(iter != list.End()){if(iter->_freeList != nullptr){return iter;}else{iter = iter->_next;}}// 先把central cache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞list._mutex.unlock();// 走到这里说没有空闲span了,只能找page cache要PageCache::GetInstance()->_pageMtx.lock();Span* span = PageCache::GetInstance()->NewSpan(Sizeclass::NumMovePage(size));span->_isUse = true;span->_objSize = size;PageCache::GetInstance()->_pageMtx.unlock();// 对获取span进行切分,不需要加锁,因为这会其他线程访问不到这个span// 计算span的大块内存的起始地址和大块内存的大小(字节数)char* start = (char*)(span->_pageId << PAGE_SHIFT);//cout << (span->_pageId << PAGE_SHIFT) << endl;size_t bytes = span->_n << PAGE_SHIFT;char* end = start + bytes;// 把大块内存切成自由链表链接起来// 1、先切一块下来去做头,方便尾插span->_freeList = start;start += size;void* tail = span->_freeList;int i = 1;while (start < end){++i;NextObj(tail) = start;tail = NextObj(tail);start += size;}NextObj(tail) = nullptr;// 切好span以后,需要把span挂到桶里面去的时候,再加锁list._mutex.lock();list.pushfront(span);return span;}// 从中心缓存获取一定数量的对象给thread cachesize_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size){size_t index = Sizeclass::Index(size);_spanLists[index]._mutex.lock();Span* span = GetOneSpan(_spanLists[index], size);assert(span);assert(span->_freeList);// 从span中获取batchNum个对象// 如果不够batchNum个,有多少拿多少start = span->_freeList;end = start;size_t i = 0;size_t actualNum = 1;while(i < batchNum - 1 && NextObj(end) != nullptr){end = NextObj(end);++i;++actualNum;}span->_freeList = NextObj(end);NextObj(end) = nullptr;span->_useCount += actualNum;_spanLists[index]._mutex.unlock();return actualNum;}// 将一定数量的对象释放到span跨度void ReleaseListToSpans(void* start, size_t byte_size){size_t index = Sizeclass::Index(byte_size);_spanLists[index]._mutex.lock();while(start){void* next = NextObj(start);Span* span = PageCache::GetInstance()->MapObjectToSpan(start);NextObj(start) = span->_freeList;span->_freeList = start;span->_useCount--;// 说明span的切分出去的所有小块内存都回来了// 这个span就可以再回收给page cache,pagecache可以再尝试去做前后页的合并if(span->_useCount == 0){_spanLists[index].Erase(span);span->_next = nullptr;span->_prev = nullptr;span->_freeList = nullptr;//span->_isUse = false;// 释放span给page cache时,使用page cache的锁就可以了// 这时把桶锁解掉_spanLists[index]._mutex.unlock();PageCache::GetInstance()->_pageMtx.lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->_pageMtx.unlock();_spanLists[index]._mutex.lock();}start = next;}_spanLists[index]._mutex.unlock();}private:CentralCache(){}CentralCache(const CentralCache&) = delete;static CentralCache _sInst;SpanList _spanLists[NFREELIST];};CentralCache CentralCache::_sInst;
5.6 Page cache模块的实现
(1)实现的主要功能:
- 获取一个K页的span给上层。
- 获取从对象到span的映射方便在回收Span的时候进行合并回收,减少碎片的产生。
- 当通过上层回收了Span的时候,需要对回收的Span进行合并。
(2)具体实现:
#pragma once
#include "Common.hpp"
#include "ObjectPool.hpp"
#include "PageMap.hpp"class PageCache
{
public:static PageCache* GetInstance(){return &_sInst;}// 获取一个K页的spanSpan* NewSpan(size_t k){assert(k > 0);// 大于128 page的直接向堆申请if (k > NPAGES - 1){void* ptr = SystemAlloc(k);// Span* span = new Span;Span* span = _spanPool.New();span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;span->_n = k;_idSpanMap[span->_pageId] = span;//_idSpanMap.set(span->_pageId, span);return span;}// 先检查第k个桶里面有没有spanif (!_spanLists[k].empty()){Span* kSpan = _spanLists[k].popfront();// 建立id和span的映射,方便central cache回收小块内存时,查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; ++i){_idSpanMap[kSpan->_pageId + i] = kSpan;//_idSpanMap.set(kSpan->_pageId + i, kSpan);}return kSpan;}// 检查一下后面的桶里面有没有span,如果有可以把他它进行切分for(int i = k + 1; i < NPAGES; i++){if(!_spanLists[i].empty()){Span* nSpan = _spanLists[i].popfront();Span* kSpan = _spanPool.New();//Span* kSpan = new Span;// 在nSpan的头部切一个k页下来// k页span返回// nSpan再挂到对应映射的位置kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;//cout << nSpan->_pageId << endl;nSpan->_pageId += k;nSpan->_n -= k;_spanLists[nSpan->_n].pushfront(nSpan);// 存储nSpan的首位页号跟nSpan映射,方便page cache回收内存时// 进行的合并查找_idSpanMap[nSpan->_pageId] = nSpan;_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;//_idSpanMap.set(nSpan->_pageId, nSpan);//_idSpanMap.set(nSpan->_pageId + nSpan->_n - 1, nSpan);// 建立id和span的映射,方便central cache回收小块内存时,查找对应的spanfor (PAGE_ID i = 0; i < kSpan->_n; ++i){_idSpanMap[kSpan->_pageId + i] = kSpan;//_idSpanMap.set(kSpan->_pageId + i, kSpan);}return kSpan;}}// 走到这个位置就说明后面没有大页的span了// 这时就去找堆要一个128页的span//Span* bigSpan = new Span;Span* bigSpan = _spanPool.New();void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;//cout << bigSpan->_pageId << endl;bigSpan->_n = NPAGES - 1;_spanLists[bigSpan->_n].pushfront(bigSpan);return NewSpan(k);}// 获取从对象到span的映射Span* MapObjectToSpan(void* obj){PAGE_ID id = ((PAGE_ID)obj >> PAGE_SHIFT);std::unique_lock<std::mutex> lock(_pageMtx);auto ret = _idSpanMap.find(id);if (ret != _idSpanMap.end()){return ret->second;}else{assert(false);return nullptr;}/*auto ret = (Span*)_idSpanMap.get(id);assert(ret != nullptr);return ret;*/}// 释放空闲span回到Pagecache,并合并相邻的spanvoid ReleaseSpanToPageCache(Span* span){// 大于128 page的直接还给堆if (span->_n > NPAGES - 1){void *ptr = (void *)(span->_pageId << PAGE_SHIFT);SystemFree(ptr, span->_n);// delete span;_spanPool.Delete(span);return;}// 对span前后的页,尝试进行合并,缓解内存碎片问题while (1){PAGE_ID prevId = span->_pageId - 1;auto iter = _idSpanMap.find(prevId);// 前面的页号没有,不合并了if(iter == _idSpanMap.end()){break;}/*auto ret = (Span*)_idSpanMap.get(prevId);if (ret == nullptr){break;}*/Span* prevSpan = iter->second;if(prevSpan->_isUse == true){break;}span->_pageId = prevSpan->_pageId;span->_n += prevSpan->_n;_spanLists[prevSpan->_n].Erase(prevSpan);_spanPool.Delete(prevSpan);//delete prevSpan;}// 向后合并while (1){PAGE_ID nextId = span->_pageId + span->_n;auto iter = _idSpanMap.find(nextId);if(iter == _idSpanMap.end()){break;}/*auto ret = (Span*)_idSpanMap.get(nextId);if (ret == nullptr){break;}*/Span* nextSpan = iter->second;if(nextSpan->_isUse == true){break;}if(nextSpan->_n + span->_n > NPAGES - 1){break;}span->_n += nextSpan->_n;_spanLists[nextSpan->_n].Erase(nextSpan);_spanPool.Delete(nextSpan);//delete nextSpan;}_spanLists[span->_n].pushfront(span);span->_isUse = false;_idSpanMap[span->_pageId] = span;_idSpanMap[span->_pageId+span->_n - 1] = span;//_idSpanMap.set(span->_pageId, span);//_idSpanMap.set(span->_pageId+span->_n - 1, span);}public:std::mutex _pageMtx;private:SpanList _spanLists[NPAGES];PageCache(){}PageCache(const PageCache&) = delete;std::unordered_map<PAGE_ID, Span*> _idSpanMap;//TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;ObjectPool<Span> _spanPool;static PageCache _sInst;};PageCache PageCache::_sInst;
6. 申请内存的流程解析
6.1 申请内存和计算对齐大小和所属哈希桶详解
(1)分配到pTLSThreadCache线程后再去Allocate内存:
(2)内存进行对齐:
(3)Index计算内存在ThreadCache哪个桶中,例如6b大小在第0个桶中:
6.2 慢反馈算法来获得要需要申请对象的数量
(1)刚开始运行项目的时候_freeList[index]为空,需要去Central cache申请内存。_freeLists[index]的maxsize为空,通过慢反馈获得要申请小对象的数量,如果batchNum为maxsize,则申请小对象数量为1。
(2)慢反馈调节,MAX_BYTES为256k,申请字节越大,返回越少:
6.3 从CentralCache中获得对象
(1)定义start和end,通过FetchRangeObj从CentralCache对应哈希桶中获得对象,如果不够则有多少获得多少,数量为actualNum。**大于1的话将多余的对象插入_freeList[index]中。**返回内存起始地址start,一次内存申请完成。
(2)计算size所在的哈希桶,给对应_spanLists[index]加锁,然后获取span对象:
(3)GetOneSpan函数依次判断桶中的各个span有无小对象(也就是FreeList),如果有则返回:
6.4 从PageCache中获取对象
(1)如果CentralCache对应哈希桶中没有小对象,则去PageCache申请一个span大的空间,将span标记为已使用:
(2)慢反馈NumMovePage,size为8b时,通过NumMoveSize慢反馈获得num为512,npage为4096,右移13位为0,申请1页。
(3)NewSpan获取1页的span,先判断PageCache对应的桶中有没有span,如果有则_spanLists[k]头删一个kspan并返回。
(4)如果PageCache的桶中有没有span,检查后面的桶的span,申请一个1页的span,如果有一个10页的span,则切分为一个1页的kSpan和一个9页的nSpan,返回kSpan。如果PageCache中没有Span,则去堆上申请一个128页的内存,放在bigSpan中,将bigSpan头插到第128个_spanLists中,然后再执行一遍NewSpan返回需要的span。
6.5 切分申请好的span
(1)PageCache申请一个span大的空间后将span进行切分,计算起始地址和结束地址,将切分好的小对象依次链接起来:
6.6 从span中取batchNum个对象
(1)从_spanLists[index]中获取了span,start为_freeList头,将end递增到batchNum个对象,之后的内存接到_freeList中,返回实际获得的对象数量actualNum。
7. 释放内存的流程解析
7.1 获取对象到span的映射
(1)获得从对象到span的映射,获到释放对象的字节大小(8b),该线程调用Deallocate释放:
(2)将对象地址强转为PAGE_ID(size_t)类型再右移13位得到对象的id,通过id找到PageCache中分出去对应的span,并返回该span。
7.2 将对象插入自由链表桶
(1)获取Index桶号,将对象Push到对应_freeLists[index]中,释放完成。判断_freeLists[index]大于申请时候的值MaxSize,说明申请的内存都还回来了,则统一释放给CentralCache。当链表过长时释放链表到CentralCache对应_spanLists[index]中。先弹出所有的小对象存到start和end中,再通过start和大小释放给CentralCache。
(2)通过所给的起始地址和个数进行删除:
7.3 释放对象到CentralCache中
(1)通过size获得Index所在的桶,将_spanLists[index]加锁便于插入对象,当start还有对象时,获取下一个对象NextObj,获取start指向的对象的span映射,将start头插到span的_freeList链表中,span分出去的小对象计数_useCount- -,start后移到下一个小对象,然后继续头插。直到将所有小对象插入后解开桶锁。
(2)如果_useCount为0说明所有小对象都回来了,将span从该桶中摘除,指针置空。然后PageCache加锁,将该span还给PageCache。
7.4 释放对象到PageCache
(1)先获取span的页号,减1获得前一个页的页号prevId,通过prevId得到前一个span,如果前一个span没有使用,则与当前的span进行合并,将 span的起始页调整为前一个页段的起始页,并更新页数。然后,从_spanLists中删除prevSpan,在_spanPool中调用delete删除prevSpan。
(2)然后获得下一个span的_pageId,检查下一个span是否没有使用,通过nextId返回对应的Span,合并span的_n页数,将nextSpan在_spanLists中摘除,通过_spanPool调用delete释放nextSpan。
(3)PageCache中的_spanLists中插入还回来的span,标记为未使用状态,建立_pageId和span的映射。
8. 代码测试
(1)测试代码:
#include "ConcurrentAlloc.hpp"
#include <atomic>// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime(0);std::atomic<size_t> free_costtime(0);for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&, k]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){v.push_back(malloc(16));//v.push_back(malloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){free(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}cout << nworks << "个线程并发执行" << rounds << "u轮次,每轮次malloc " << ntimes << "次: 花费:" << malloc_costtime << " ms" <<endl;cout << nworks << "个线程并发执行" << rounds << "u轮次,每轮次free " << ntimes << "次: 花费:" << free_costtime << " ms" <<endl;cout << nworks << "个线程并发malloc&free " << nworks * rounds * ntimes << "次,总计花费:" << malloc_costtime + free_costtime << " ms" <<endl;/*printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, malloc_costtime);printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",nworks, rounds, ntimes, free_costtime);printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);*/
}// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime(0);std::atomic<size_t> free_costtime(0);for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){v.push_back(ConcurrentAlloc(16));//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){ConcurrentFree(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}cout << nworks << "个线程并发执行" << rounds << "u轮次,每轮次concurrent alloc " << ntimes << "次: 花费:" << malloc_costtime << " ms" <<endl;cout << nworks << "个线程并发执行" << rounds << "u轮次,每轮次concurrent dealloc " << ntimes << "次: 花费:" << free_costtime << " ms" <<endl;cout << nworks << "个线程并发concurrent alloc&dealloc " << nworks * rounds * ntimes << "次,总计花费:" << malloc_costtime + free_costtime << " ms" <<endl;/*printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, malloc_costtime);printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, free_costtime);printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);*/
}int main()
{size_t n = 1000;cout << "==========================================================" << endl;BenchmarkConcurrentMalloc(n, 4, 10);cout << endl << endl;BenchmarkMalloc(n, 4, 10);cout << "==========================================================" << endl;return 0;
}
(2)运行结果:
9. 性能分析以及优化
9.1 性能分析
可以看到测试结果在多次申请固定内存时,我们实现的内存池和系统的 malloc 在效率上还是略有差距。可以使用 Visual Studio 自带的性能检测工具分析代码。
9.2 性能优化
(1)我们发现在获取页号到 Span 的函数里为了保护该临界资源加了锁,这把锁占用了较多的资源,故采用类似基数树的哈希表进行优化,结构如下:
(2)此前,我们使用 unordered_map 或 map 维护 PageId 与 Span* 的映射关系,在对该表进行写时都会对数据结构进行修改,比如哈希表的扩容、红黑树的结点旋转,多线程对这种临界资源的修改就需要加锁。所以我们把unordered_map 存放PageId 与 Span* 的映射关系改成基数树来进行存放即可。
(3)采用类似基数树的哈希表进行优化后,不用再对查表过程加锁,原因如下:
- 对该表进行构造时就开好了空间,二同一个线程的读写是分离的,不会改变整个结构。
- 多线程对表进行读写时,由于外面还有 Page Cache 里的一把大锁的保护,保证了不同线程互斥地对表进行读写的。
(4)基数树代码:
#pragma once
#include <cstring>
#include <cassert>
#include"Common.hpp"// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:static const int LENGTH = 1 << BITS;void** array_;public:typedef uintptr_t Number;//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap1() {//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));size_t size = sizeof(void*) << BITS;size_t alignSize = Sizeclass::_RoundUp(size, 1<<PAGE_SHIFT);array_ = (void**)SystemAlloc(alignSize>>PAGE_SHIFT);memset(array_, 0, sizeof(void*) << BITS);}// Return the current value for KEY. Returns NULL if not yet set,// or if k is out of range.void* get(Number k) const {if ((k >> BITS) > 0) {return NULL;}return array_[k];}// REQUIRES "k" is in range "[0,2^BITS-1]".// REQUIRES "k" has been ensured before.//// Sets the value 'v' for key 'k'.void set(Number k, void* v) {array_[k] = v;cout << v << endl;}
};// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.static const int ROOT_BITS = 5;static const int ROOT_LENGTH = 1 << ROOT_BITS;static const int LEAF_BITS = BITS - ROOT_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodesvoid* (*allocator_)(size_t); // Memory allocatorpublic:typedef uintptr_t Number;//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap2() {//allocator_ = allocator;memset(root_, 0, sizeof(root_));PreallocateMoreMemory();}void* get(Number k) const {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 || root_[i1] == NULL) {return NULL;}return root_[i1]->values[i2];}void set(Number k, void* v) {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);assert(i1 < ROOT_LENGTH);root_[i1]->values[i2] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> LEAF_BITS;// Check for overflowif (i1 >= ROOT_LENGTH)return false;// Make 2nd level node if necessaryif (root_[i1] == NULL) {//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));//if (leaf == NULL) return false;static ObjectPool<Leaf> leafPool;Leaf* leaf = (Leaf*)leafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] = leaf;}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {// Allocate enough to keep track of all possible pagesEnsure(0, 1 << BITS);}
};// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:// How many bits should we consume at each interior levelstatic const int INTERIOR_BITS = (BITS + 2) / 3; // Round-upstatic const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;// How many bits should we consume at leaf levelstatic const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Interior nodestruct Node {Node* ptrs[INTERIOR_LENGTH];};// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Node* root_; // Root of radix treevoid* (*allocator_)(size_t); // Memory allocatorNode* NewNode() {Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));if (result != NULL) {memset(result, 0, sizeof(*result));}return result;}public:typedef uintptr_t Number;explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {allocator_ = allocator;root_ = NewNode();}void* get(Number k) const {const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 ||root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {return NULL;}return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];}void set(Number k, void* v) {assert(k >> BITS == 0);const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);// Check for overflowif (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)return false;// Make 2nd level node if necessaryif (root_->ptrs[i1] == NULL) {Node* n = NewNode();if (n == NULL) return false;root_->ptrs[i1] = n;}// Make leaf node if necessaryif (root_->ptrs[i1]->ptrs[i2] == NULL) {Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));if (leaf == NULL) return false;memset(leaf, 0, sizeof(*leaf));root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {}
};
10. 项目总结
-
本项目实现的是tcmalloc的部分主要代码,并没有全部实现,了解tcmalloc的原理就可以了。
-
更加了解tcmalloc的框架。
-
参考文章:
- 几个内存池库的对比
- tcmalloc源码学习。
- TCMALLOC 源码阅读。
- tcmalloc源代码。
-
本项目源码链接:https://gitee.com/liu-yechi/new_code/tree/master/concurrent_memory_pool。