大纲
9.Netty的内存规格
10.缓存数据结构
11.命中缓存的分配流程
12.Netty里有关内存分配的重要概念
13.Page级别的内存分配
14.SubPage级别的内存分配
15.ByteBuf的回收
13.Page级别的内存分配
(1)Page级别的内存分配的入口
(2)Page级别的内存分配的流程
(3)尝试在现有的PoolChunk上分配
(4)创建一个PoolChunk进行内存分配
(5)初始化PooledByteBuf对象
(1)Page级别的内存分配的入口
下面这3行代码可以用来跟踪进行Page级别内存分配时的调用栈:
PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
ByteBuf byteBuf = allocator.heapBuffer(8192);//分配8192B内存
byteBuf.release();
PooledByteBufAllocator的heapBuffer()方法通过其newHeapBuffer()方法执行代码heapArena.allocate()时,首先会调用PoolArena的allocate()方法,然后会调用PoolArena的allocateNormal()方法,接着会调用PoolChunk的allocate()方法,并最终调用到PoolChunk的allocateRun()方法进行Page级别的内存分配。进行Page级别的内存分配时只会分配多个Page而不会分配一个Page的一部分。
注意:真正要分配的内存其实就是byte[]或者ByteBuffer,所以实际的分配就是得到一个数值handle进行定位以方便后续的写。
public class PooledByteBufAllocator extends AbstractByteBufAllocator {private final PoolThreadLocalCache threadCache;private final PoolArena<byte[]>[] heapArenas;//一个线程会和一个PoolArena绑定private final PoolArena<ByteBuffer>[] directArenas;//一个线程会和一个PoolArena绑定...//从下面的代码可知:真正要分配的内存其实就是byte[] 或者 ByteBuffer//所以实际的分配就是得到一个数值handle进行定位@Overrideprotected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {PoolThreadCache cache = threadCache.get();PoolArena<byte[]> heapArena = cache.heapArena;ByteBuf buf;if (heapArena != null) {//分配堆内存buf = heapArena.allocate(cache, initialCapacity, maxCapacity);} else {buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);}return toLeakAwareBuffer(buf);}@Overrideprotected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {PoolThreadCache cache = threadCache.get();PoolArena<ByteBuffer> directArena = cache.directArena;ByteBuf buf;if (directArena != null) {buf = directArena.allocate(cache, initialCapacity, maxCapacity);} else {if (PlatformDependent.hasUnsafe()) {buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);} else {buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);}}return toLeakAwareBuffer(buf);}...
}abstract class PoolArena<T> implements PoolArenaMetric {final PooledByteBufAllocator parent;final int pageSize;//默认8192 = 8Kprivate final int maxOrder;//默认11final int pageShifts;//默认13final int chunkSize;//默认16Mprivate final PoolChunkList<T> qInit;private final PoolChunkList<T> q000;private final PoolChunkList<T> q025;private final PoolChunkList<T> q050;private final PoolChunkList<T> q075;private final PoolChunkList<T> q100;//Metrics for allocations and deallocationsprivate long allocationsNormal;...protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {this.parent = parent;this.pageSize = pageSize;this.maxOrder = maxOrder;this.pageShifts = pageShifts;this.chunkSize = chunkSize;...qInit = new PoolChunkList<T>(q000, Integer.MIN_VALUE, 25, chunkSize);q000 = new PoolChunkList<T>(q025, 1, 50, chunkSize);q025 = new PoolChunkList<T>(q050, 25, 75, chunkSize);q050 = new PoolChunkList<T>(q075, 50, 100, chunkSize);q075 = new PoolChunkList<T>(q100, 75, 100, chunkSize);q100 = new PoolChunkList<T>(null, 100, Integer.MAX_VALUE, chunkSize);qInit.prevList(qInit);q000.prevList(null);q025.prevList(q000);q050.prevList(q025);q075.prevList(q050);q100.prevList(q075);...}PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {PooledByteBuf<T> buf = newByteBuf(maxCapacity);//创建ByteBuf对象allocate(cache, buf, reqCapacity);//基于PoolThreadCache对ByteBuf对象进行内存分配return buf;}private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {//1.根据reqCapacity进行分段规格化final int normCapacity = normalizeCapacity(reqCapacity);if (isTinyOrSmall(normCapacity)) {//capacity < pageSize,需要分配的内存小于8K...}if (normCapacity <= chunkSize) {//需要分配的内存大于8K,但小于16M//2.进行缓存分配if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {//was able to allocate out of the cache so move onreturn;}allocateNormal(buf, reqCapacity, normCapacity);} else {//需要分配的内存大于16M//Huge allocations are never served via the cache so just call allocateHugeallocateHuge(buf, reqCapacity);}}private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {//1.尝试在现有的PoolChunk上分配if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||q075.allocate(buf, reqCapacity, normCapacity)) {++allocationsNormal;return;}//2.创建一个PoolChunk并进行内存分配PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);//由handle指向PoolChunk里的一块连续内存long handle = c.allocate(normCapacity);++allocationsNormal;assert handle > 0;//3.初始化PooledByteBuf对象c.initBuf(buf, handle, reqCapacity);//4.将新建的PoolChunk添加到PoolArena的qInit这个PoolChunkList中qInit.add(c);}protected abstract PoolChunk<T> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize);...
}
(2)Page级别的内存分配的流程
一.尝试在现有的PoolChunk上分配
一个PoolArena里会有一个由多个PoolChunkList连接起来的双向链表,每个PoolChunkList代表了某种内存使用率的PoolChunk列表。PoolArena的allocateNormal()方法首先会尝试从某使用率的PoolChunkList中获取一个PoolChunk来分配,如果没法从某使用率的PoolChunkList中获取一个PoolChunk进行分配,那么就新创建一个PoolChunk。
二.创建一个PoolChunk并进行内存分配
也就是通过PoolArena的newChunk()方法去创建一个PoolChunk,然后调用该PoolChunk的allocate()方法进行内存分配,最后会调用到PoolChunk的allocateRun()方法去确定PoolChunk里的一块连续内存,这块连续内存会由一个long型的变量名为handle来指向。
三.初始化PooledByteBuf对象
在拿到一个PoolChunk的一块连续内存后(即allocateRun()方法返回的handle标记),需要执行PoolChunk的initBuf()方法去把handle标记设置到PooledByteBuf对象上。
四.将新建的PoolChunk添加PoolChunkList
也就是将新建的PoolChunk添加到PoolArena的qInit这个PoolChunkList中。
(3)尝试在现有的PoolChunk上分配
在PoolChunkList的allocate()方法中:首先会从PoolChunkList的head节点开始往下遍历,然后对每一个PoolChunk都尝试调用PoolChunk.allocate()方法进行分配。如果PoolChunk.allocate()方法返回的handle大于0,接下来会调用PoolChunk.initBuf()方法对PooledByteBuf进行初始化,并且如果当前PoolChunk的使用率大于所属PoolChunkList的最大使用率,那么需要将当前PoolChunk从所在的PoolChunkList中移除并加入到下一个PoolChunkList中。
final class PoolChunkList<T> implements PoolChunkListMetric {private PoolChunk<T> head;...//reqCapacity为需要分配的内存大小,normCapacity为已规格化的内存大小boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {if (head == null || normCapacity > maxCapacity) {//Either this PoolChunkList is empty or the requested capacity is larger then the capacity //which can be handled by the PoolChunks that are contained in this PoolChunkList.return false;}//从PoolChunkList的head节点开始往下遍历for (PoolChunk<T> cur = head;;) {//对每一个PoolChunk都尝试调用PoolChunk.allocate()方法进行分配long handle = cur.allocate(normCapacity);if (handle < 0) {cur = cur.next;if (cur == null) {return false;}} else {//调用PoolChunk.initBuf()方法对PooledByteBuf进行初始化cur.initBuf(buf, handle, reqCapacity);//如果此时的PoolChunk的使用率大于所属PoolChunkList的最大使用率if (cur.usage() >= maxUsage) {//将当前PoolChunk从所在的PoolChunkList中移除remove(cur);//将当前PoolChunk加入到下一个PoolChunkList中nextList.add(cur);}return true;}}}...
}
(4)创建一个PoolChunk进行内存分配
一.创建PoolChunk时的入参和构造方法
PoolArena的allocateNormal()方法在分配Page级别的内存时,会调用newChunk(pageSize, maxOrder, pageShifts, chunkSize),其中pageSize = 8K、maxOrder = 11、pageShifts = 13、chunkSize = 16M。newChunk()方法会由PoolArena的内部类兼子类DirectArena和HeapArena实现。
在HeapArena和DirectArena的newChunk()方法中,会new一个PoolChunk。而在PoolChunk的构造方法中,除了设置newChunk()方法传入的参数外,还会初始化两个数组memoryMap和depthMap来表示不同规格的连续内存使用分配情况。其中的memoryMap是一个有4096个元素的字节数组,depthMap在初始化时和memoryMap完全一样。
abstract class PoolArena<T> implements PoolArenaMetric {...private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {//1.尝试在现有的PoolChunk上分配if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||q075.allocate(buf, reqCapacity, normCapacity)) {++allocationsNormal;return;}//2.创建一个PoolChunk并进行内存分配,pageSize = 8K、maxOrder = 11、pageShifts = 13、chunkSize = 16MPoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);//由handle指向PoolChunk里的一块连续内存long handle = c.allocate(normCapacity);++allocationsNormal;assert handle > 0;//3.初始化PooledByteBuf对象c.initBuf(buf, handle, reqCapacity);//4.将新建的PoolChunk添加到PoolArena的qInit这个PoolChunkList中qInit.add(c);}protected abstract PoolChunk<T> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize);static final class HeapArena extends PoolArena<byte[]> {...//pageSize = 8K、maxOrder = 11、pageShifts = 13、chunkSize = 16M@Overrideprotected PoolChunk<byte[]> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {//真正的内存new byte[chunkSize],会存放到PoolChunk的memory变量中return new PoolChunk<byte[]>(this, new byte[chunkSize], pageSize, maxOrder, pageShifts, chunkSize);}...}static final class DirectArena extends PoolArena<ByteBuffer> {...//pageSize = 8K、maxOrder = 11、pageShifts = 13、chunkSize = 16M@Overrideprotected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {//真正的内存ByteBuffer.allocateDirect(chunkSize),会存放到PoolChunk的memory变量中return new PoolChunk<ByteBuffer>(this, allocateDirect(chunkSize), pageSize, maxOrder, pageShifts, chunkSize);}private static ByteBuffer allocateDirect(int capacity) {return PlatformDependent.useDirectBufferNoCleaner() ?PlatformDependent.allocateDirectNoCleaner(capacity) : ByteBuffer.allocateDirect(capacity);}...}...
}final class PoolChunk<T> implements PoolChunkMetric {final PoolArena<T> arena;final T memory;//内存//一个Page的大小,比如8Kprivate final int pageSize;//4096个元素的字节数组,表示不同规格的连续内存使用分配情况,用二叉树理解private final byte[] memoryMap;private final byte[] depthMap;//2048个元素的数组,表示Chunk里哪些Page是以SubPage方式存在的//由于一个PoolChunk是16M,会以8K为标准划分一个个的Page,所以会有16 * 1024 / 8 = 2048个Pageprivate final PoolSubpage<T>[] subpages;//Used to mark memory as unusableprivate final byte unusable;//默认是12...//pageSize = 8K、maxOrder = 11、pageShifts = 13、chunkSize = 16MPoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) {unpooled = false;this.arena = arena;this.memory = memory;this.pageSize = pageSize;//默认是8Kthis.pageShifts = pageShifts;//默认是13this.maxOrder = maxOrder;//默认是11this.chunkSize = chunkSize;//默认是16Munusable = (byte) (maxOrder + 1);//默认是12log2ChunkSize = log2(chunkSize);//默认是24subpageOverflowMask = ~(pageSize - 1);freeBytes = chunkSize;assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;maxSubpageAllocs = 1 << maxOrder;//Generate the memory map.memoryMap = new byte[maxSubpageAllocs << 1];depthMap = new byte[memoryMap.length];int memoryMapIndex = 1;for (int d = 0; d <= maxOrder; ++ d) {//move down the tree one level at a timeint depth = 1 << d;for (int p = 0; p < depth; ++ p) {//in each level traverse left to right and set value to the depth of subtreememoryMap[memoryMapIndex] = (byte) d;depthMap[memoryMapIndex] = (byte) d;memoryMapIndex ++;}}subpages = newSubpageArray(maxSubpageAllocs);}...
}
二.PoolChunk的memoryMap属性的数据结构
首先PoolChunk是以Page的方式去组织内存的,然后memoryMap属性中的每个结点都表示一块连续内存是否已被分配,以及depthMap属性中的每个结点都表示其所在树的深度或层高。memoryMap数组中下标为n的元素对应于一棵高度为12的连续内存的平衡二叉树的第n个结点。比如0-4M这个连续内存结点是整棵连续内存的平衡二叉树中的第4个结点,其所在树的深度是2。注意:0-16M对应在memoryMap数组中的索引是1,memoryMap数组中索引为0的元素是空的。
三.在新创建的PoolChunk上分配内存
PoolArena的allocateNormal()方法在创建完一个PoolChunk后,就要从这个PoolChunk上分配一块内存,于是会调用PoolChunk的allocate()方法。由于进行的是Page级别的内存分配,所以最终会调用PoolChunk的allocateRun()方法。
PoolChunk的allocateRun()方法首先根据需要分配的内存大小算出将被分配的连续内存结点在平衡二叉树中的深度,然后再根据PoolChunk的allocateNode()方法取得将被分配的连续内存结点在memoryMap数组中的下标id。
也就是说,执行allocateNode(d)会获得:在平衡二叉树中的 + 深度为d的那一层中 + 还没有被使用过的 + 一个连续内存结点的索引。获得这个索引之后,便会设置memoryMap数组在这个索引位置的元素的值为unusable = 12表示不可用,以及逐层往上标记结点不可用。其中,allocateNode()方法会通过异或运算实现 + 1。
final class PoolChunk<T> implements PoolChunkMetric {...long allocate(int normCapacity) {if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize//Page级别的内存分配return allocateRun(normCapacity);} else {//SubPage级别的内存分配return allocateSubpage(normCapacity);}}//Allocate a run of pages (>=1)//@param normCapacity normalized capacity//@return index in memoryMapprivate long allocateRun(int normCapacity) {//maxOrder = 11、pageShifts = 13//比如要分配的normCapacity = 8K,则d = 11 - (13 - 13) int d = maxOrder - (log2(normCapacity) - pageShifts);int id = allocateNode(d);if (id < 0) {return id;}freeBytes -= runLength(id);return id;}//Algorithm to allocate an index in memoryMap when we query for a free node at depth d//@param d depth//@return index in memoryMapprivate int allocateNode(int d) {int id = 1;int initial = - (1 << d);//has last d bits = 0 and rest all = 1//取出memoryMap[id]的值byte val = value(id);if (val > d) {//val = unusable = 12return -1;}//val < d表示当前结点可用while (val < d || (id & initial) == 0) {//id & initial == 1 << d for all ids at depth d, for < d it is 0id <<= 1;//每循环一次乘以2val = value(id);if (val > d) {//如果当前id对应的结点不可用id ^= 1;//通过异或运算实现对id加1val = value(id);}}byte value = value(id);assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d", value, id & initial, d);setValue(id, unusable);//mark as unusable = 12updateParentsAlloc(id);//逐层往上标记结点不可用return id;}private byte value(int id) {return memoryMap[id];}//Update method used by allocate.//This is triggered only when a successor is allocated and all its predecessors need to update their state.//The minimal depth at which subtree rooted at id has some free space.private void updateParentsAlloc(int id) {while (id > 1) {int parentId = id >>> 1;byte val1 = value(id);byte val2 = value(id ^ 1);byte val = val1 < val2 ? val1 : val2;setValue(parentId, val);id = parentId;}}...
}
(5)初始化PooledByteBuf对象
在PoolChunk的initBuf()方法中,首先会根据handle计算出memoryMapIdx和bitMapIdx。由于这里进行的是Page级别的内存分配,所以bitMapIdx为0,于是接下来会调用buf.init()方法进行PooledByteBuf的初始化。
在PooledByteBuf的构造函数中:首先会设置PoolChunk,因为要拿到一块可写的内存首先需要拿到一个PoolChunk。然后设置handle,表示指向的是PoolChunk中的哪一块连续内存,也就是平衡二叉树中的第几个结点,或者是memoryMap中的第几个元素。接着设置memory,表示分配PoolChunk时是通过Heap还是Direct方式进行申请的。以及设置offset为0,因为这里进行的是Page级别的内存分配,所以没有偏移量。最后设置length,表示实际这次内存分配到底分配了多少内存。
final class PoolChunk<T> implements PoolChunkMetric {...void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {int memoryMapIdx = memoryMapIdx(handle);int bitmapIdx = bitmapIdx(handle);if (bitmapIdx == 0) {byte val = value(memoryMapIdx);assert val == unusable : String.valueOf(val);buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache()); } else {initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);}}private static int memoryMapIdx(long handle) {return (int) handle;}private static int bitmapIdx(long handle) {return (int) (handle >>> Integer.SIZE);//即 handle >>> 32}...
}abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {private final Recycler.Handle<PooledByteBuf<T>> recyclerHandle;protected PoolChunk<T> chunk;protected long handle;protected T memory;protected int offset;protected int length;int maxLength;PoolThreadCache cache;private ByteBuffer tmpNioBuf;@SuppressWarnings("unchecked")protected PooledByteBuf(Recycler.Handle<? extends PooledByteBuf<T>> recyclerHandle, int maxCapacity) {super(maxCapacity);this.recyclerHandle = (Handle<PooledByteBuf<T>>) recyclerHandle;}void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {assert handle >= 0;assert chunk != null;this.chunk = chunk;this.handle = handle;this.memory = chunk.memory;this.offset = offset;this.length = length;this.maxLength = maxLength;this.tmpNioBuf = null;this.cache = cache;}...
}
14.SubPage级别的内存分配
(1)SubPage级别的内存分配的入口
(2)SubPage级别的内存分配的流程
(3)定位一个SubPage对象
(4)初始化SubPage对象
(5)调用subpage.allocate()进行分配
(1)SubPage级别的内存分配的入口
下面这3行代码可以用来跟踪进行SubPage级别内存分配时的调用栈:
PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
ByteBuf byteBuf = allocator.directBuffer(16);//分配16B内存
byteBuf.release();
PooledByteBufAllocator的directBuffer()方法通过其newDirectBuffer()方法执行代码directArena.allocate()时,首先会调用PoolArena的allocate()方法,然后会调用PoolArena的allocateNormal()方法,接着会调用PoolChunk的allocate()方法,并最终调用到PoolChunk的allocateSubpage()方法进行SubPage级别的内存分配。进行SubPage级别的内存分配时不会分配多个Page而只会分配一个Page的一部分。
在PoolArena的allocate()方法中,首先会通过tinyIdx(16)拿到tableIdx,此时tableIdx = 16 >>> 4 = 1。然后从PoolArena的tinySubpagePools数组中取出下标为tableIdx的一个PoolSubpage元素赋值给table。tinySubpagePools默认和MemoryRegionCache的缓存一样,也有32个元素:16B、32B、48B、...、480、496B。其中第n个元素表示大小为(n - 1) * 16B的专属SubPage。接着通过PoolArena.allocateNormal()方法调用到PoolChunk.allocateSubpage()方法。
abstract class PoolArena<T> implements PoolArenaMetric {//有32个元素:16B、32B、48B、...、480、496Bprivate final PoolSubpage<T>[] tinySubpagePools;//有4个元素:512B、1K、2K、4Kprivate final PoolSubpage<T>[] smallSubpagePools;... PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {PooledByteBuf<T> buf = newByteBuf(maxCapacity);//创建ByteBuf对象allocate(cache, buf, reqCapacity);//基于PoolThreadCache对ByteBuf对象进行内存分配return buf;}private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {//1.根据reqCapacity进行分段规格化final int normCapacity = normalizeCapacity(reqCapacity);if (isTinyOrSmall(normCapacity)) {//capacity < pageSize,需要分配的内存小于8Kint tableIdx;PoolSubpage<T>[] table;boolean tiny = isTiny(normCapacity);if (tiny) {//capacity < 512if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {//was able to allocate out of the cache so move onreturn;}//根据规格化后的需要分配的内存大小normCapacity,获取tableIdxtableIdx = tinyIdx(normCapacity);table = tinySubpagePools;} else {if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {//was able to allocate out of the cache so move onreturn;}//根据规格化后的需要分配的内存大小normCapacity,获取tableIdxtableIdx = smallIdx(normCapacity);table = smallSubpagePools;}final PoolSubpage<T> head = table[tableIdx];//Synchronize on the head. //This is needed as PoolChunk#allocateSubpage(int) and PoolChunk#free(long) may modify the doubly linked list as well. synchronized (head) {final PoolSubpage<T> s = head.next;if (s != head) {//PoolArena的tinySubpagePools数组中有可以分配内存的PoolSubpageassert s.doNotDestroy && s.elemSize == normCapacity;//调用SubPage的allocate()方法进行内存分配long handle = s.allocate();assert handle >= 0;s.chunk.initBufWithSubpage(buf, handle, reqCapacity);if (tiny) {allocationsTiny.increment();} else {allocationsSmall.increment();}return;}}allocateNormal(buf, reqCapacity, normCapacity);return;}...}static int tinyIdx(int normCapacity) {return normCapacity >>> 4;}static int smallIdx(int normCapacity) {int tableIdx = 0;int i = normCapacity >>> 10;while (i != 0) {i >>>= 1;tableIdx ++;}return tableIdx;}private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {//1.尝试在现有的PoolChunk上分配if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||q075.allocate(buf, reqCapacity, normCapacity)) {++allocationsNormal;return;}//2.创建一个PoolChunk并进行内存分配PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);//由handle指向PoolChunk里的一块连续内存long handle = c.allocate(normCapacity);++allocationsNormal;assert handle > 0;//3.初始化PooledByteBuf对象c.initBuf(buf, handle, reqCapacity);//4.将新建的PoolChunk添加到PoolArena的qInit这个PoolChunkList中qInit.add(c);}...
}final class PoolChunk<T> implements PoolChunkMetric {...long allocate(int normCapacity) {if ((normCapacity & subpageOverflowMask) != 0) {//normCapacity >= pageSize//Page级别的内存分配return allocateRun(normCapacity);} else {//SubPage级别的内存分配return allocateSubpage(normCapacity);}}...
}
(2)SubPage级别的内存分配的流程
PoolChunk.allocateSubpage()方法的主要操作:
一.定位一个SubPage对象
二.初始化SubPage对象
三.调用SubPage的allocate()方法进行分配
final class PoolChunk<T> implements PoolChunkMetric {final PoolArena<T> arena;final T memory;//内存//一个Page的大小,比如8Kprivate final int pageSize;//4096个元素的字节数组,表示不同规格的连续内存使用分配情况,用二叉树理解private final byte[] memoryMap;private final byte[] depthMap;//2048个元素的数组,表示Chunk里哪些Page是以SubPage方式存在的//由于一个PoolChunk是16M,会以8K为标准划分一个个的Page,所以会有16 * 1024 / 8 = 2048个Pageprivate final PoolSubpage<T>[] subpages;//Used to mark memory as unusableprivate final byte unusable;//默认是12...//Create/ initialize a new PoolSubpage of normCapacity//Any PoolSubpage created/ initialized here is added to subpage pool in the PoolArena that owns this PoolChunk//@param normCapacity normalized capacity//@return index in memoryMapprivate long allocateSubpage(int normCapacity) {//Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.//This is need as we may add it back and so alter the linked-list structure.//1.定位一个SubPage对象//PoolArena的findSubpagePoolHead()方法会通过除以16,来获取用来存放16B的PoolSubpage节点PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);synchronized (head) {//只能从层高为11的memoryMap中获取SubPage,因为只有这一层的每个结点都是8Kint d = maxOrder;//subpages are only be allocated from pages i.e., leaves//1.定位一个SubPage对象:在平衡二叉树的第11层上分配一个结点//即通过allocateNode(d)找到一个Page在PoolChunk中的下标idxint id = allocateNode(d);if (id < 0) {return id;}final PoolSubpage<T>[] subpages = this.subpages;final int pageSize = this.pageSize;freeBytes -= pageSize;//1.定位一个SubPage对象:确定PoolChunk中第几个Page会以SubPage方式存在int subpageIdx = subpageIdx(id);PoolSubpage<T> subpage = subpages[subpageIdx];//2.初始化SubPage对象if (subpage == null) {subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);subpages[subpageIdx] = subpage;} else {subpage.init(head, normCapacity);}//3.调用SubPage的allocate()方法进行分配return subpage.allocate();}}...
}
(3)定位一个SubPage对象
SubPage是基于一个Page进行划分的,不管是从一个现有的SubPage对象中分配,还是在没有SubPage对象时创建一个SubPage,第一个步骤都是定位一个SubPage对象。
在PoolChunk的allocateSubpage()方法中:首先会通过PoolArena的findSubpagePoolHead()方法去找到在PoolArena的tinySubpagePools中用于存放16B的PoolSubpage结点。然后在连续内存的平衡二叉树的第11层上分配一个Page结点,即通过allocateNode(d)找到一个Page在PoolChunk中的下标id。接着通过subpageIdx(id)确定PoolChunk中第subpageIdx个Page会以SubPage方式存在,从而定位到在PoolChunk的subpages数组中下标为subpageIdx对应的PoolSubpage元素可用来进行SubPage级别的内存分配。
注意:在PoolChunk的subpages数组中,如果某个下标对应的PoolSubpage元素为空,则说明这个下标对应的PoolChunk中的某个Page已经用来进行了Page级别的内存分配或者还没被分配。
abstract class PoolArena<T> implements PoolArenaMetric {//有32个元素:16B、32B、48B、...、480、496Bprivate final PoolSubpage<T>[] tinySubpagePools;//有4个元素:512B、1K、2K、4Kprivate final PoolSubpage<T>[] smallSubpagePools;...//找到在PoolArena的tinySubpagePools中用于存放16B的PoolSubpage结点PoolSubpage<T> findSubpagePoolHead(int elemSize) {int tableIdx;PoolSubpage<T>[] table;if (isTiny(elemSize)) {//< 512tableIdx = elemSize >>> 4;table = tinySubpagePools;} else {tableIdx = 0;elemSize >>>= 10;while (elemSize != 0) {elemSize >>>= 1;tableIdx ++;}table = smallSubpagePools;}return table[tableIdx];}...
}final class PoolChunk<T> implements PoolChunkMetric {...//Algorithm to allocate an index in memoryMap when we query for a free node at depth d//@param d depth//@return index in memoryMapprivate int allocateNode(int d) {int id = 1;int initial = - (1 << d);//has last d bits = 0 and rest all = 1//取出memoryMap[id]的值byte val = value(id);if (val > d) {//val = unusable = 12return -1;}//val < d表示当前结点可用while (val < d || (id & initial) == 0) {//id & initial == 1 << d for all ids at depth d, for < d it is 0id <<= 1;//每循环一次乘以2val = value(id);if (val > d) {//如果当前id对应的结点不可用id ^= 1;//通过异或运算实现对id加1val = value(id);}}byte value = value(id);assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d", value, id & initial, d);setValue(id, unusable);//mark as unusable = 12updateParentsAlloc(id);//逐层往上标记结点不可用return id;}private int subpageIdx(int memoryMapIdx) {return memoryMapIdx ^ maxSubpageAllocs;//remove highest set bit, to get offset}...
}
(4)初始化SubPage对象
如果PoolChunk的subpages数组中下标为subpageIdx的PoolSubpage元素为空,那么就会创建一个PoolSubpage对象并对其进行初始化。初始化的过程就是去一个PoolChunk里寻找一个Page,然后按照SubPage大小将该Page进行划分。当完成PoolSubpage对象的初始化之后,就可以通过它的allocate()方法来进行内存分配了。具体来说就是把内存信息设置到PooledByteBuf对象中。
final class PoolSubpage<T> implements PoolSubpageMetric {final PoolChunk<T> chunk;private final int memoryMapIdx;private final int runOffset;private final int pageSize;private final long[] bitmap;...PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {this.chunk = chunk;this.memoryMapIdx = memoryMapIdx;this.runOffset = runOffset;this.pageSize = pageSize;bitmap = new long[pageSize >>> 10];//pageSize / 16 / 64init(head, elemSize);}void init(PoolSubpage<T> head, int elemSize) {doNotDestroy = true;this.elemSize = elemSize;if (elemSize != 0) {maxNumElems = numAvail = pageSize / elemSize;nextAvail = 0;bitmapLength = maxNumElems >>> 6;if ((maxNumElems & 63) != 0) {bitmapLength ++;}for (int i = 0; i < bitmapLength; i ++) {bitmap[i] = 0;}}//将当前PoolSubpage对象添加到PoolArena的tinySubpagePools数组中用于存放//可以分配16B内存的PoolSubpage链表中addToPool(head);}private void addToPool(PoolSubpage<T> head) {assert prev == null && next == null;prev = head;next = head.next;next.prev = this;head.next = this;}...
}
PoolSubpage的构造方法调用init()方法的处理:
一.设置elemSize用于表示一个SubPage的大小,比如规格化后所申请的内存大小:16B、32B等。
二.设置bitmap用于标识把一个Page划分成多个SubPage后哪一个SubPage已被分配,0表示未分配,1表示已分配。
三.通过addToPool()方法把当前PoolSubpage对象添加到PoolArena的tinySubpagePools数组中可以分配某种规格大小内存的PoolSubpage链表里。
(5)调用SubPage的allocate()方法进行分配
首先从位图bitmap里寻找一个未被使用的SubPage。如果可用的SubPage的数量为0,则直接把该PoolSubpage对象从PoolArena的tinySubpagePools数组的某种规格的结点中移除。接着将代表未使用SubPage的bitmapIdx转换成handle,也就是拼接成64位 + bitmapIdx变成高32位 + memoryMapIdx变成低32位,所得的handle便表示一个PoolChunk里第几个Page的第几个SubPage,从而可以拿到连续内存给PooledByteBuf进行初始化。
final class PoolSubpage<T> implements PoolSubpageMetric {int elemSize;private final long[] bitmap;private int numAvail;private final int memoryMapIdx;...//Returns the bitmap index of the subpage allocation.long allocate() {if (elemSize == 0) {return toHandle(0);}if (numAvail == 0 || !doNotDestroy) {return -1;}//1.先从位图bitmap中寻找一个未被使用的SubPagefinal int bitmapIdx = getNextAvail();int q = bitmapIdx >>> 6;int r = bitmapIdx & 63;assert (bitmap[q] >>> r & 1) == 0;bitmap[q] |= 1L << r;//如果可用的SubPage为0if (-- numAvail == 0) {//把该PoolSubpage对象从PoolArena的tinySubpagePools数组的某种规格的结点中移除removeFromPool();}//2.将bitmapIdx转换成handlereturn toHandle(bitmapIdx);}private long toHandle(int bitmapIdx) {//拼接成64位,bitmapIdx变成高32位,memoryMapIdx变成低32位return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;}...
}
15.ByteBuf的回收
(1)池化的内存如何释放
(2)将连续内存的区段加到缓存
(3)标记连续内存的区段为未使用
(4)将ByteBuf对象添加到对象池
(1)池化的内存如何释放
比如byteBuf.release()会调用到PooledByteBuf的deallocate()方法。该方法首先会清空PooledByteBuf对象的handle、chunk、memory变量值。然后调用PoolArena的free()方法去释放对应PoolChunk在handle处的内存,也就是将连续内存的区段添加到缓存 + 标记连续内存的区段为未使用。接着调用PooledByteBuf的recycle()方法去复用PooledByteBuf对象。
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {...//byteBuf执行release()方法释放内存@Overridepublic boolean release() {return release0(1);}private boolean release0(int decrement) {for (;;) {int refCnt = this.refCnt;if (refCnt < decrement) {throw new IllegalReferenceCountException(refCnt, -decrement);}if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {if (refCnt == decrement) {deallocate();return true;}return false;}}}protected abstract void deallocate();...
}abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {...protected PoolChunk<T> chunk;protected long handle;protected T memory;...@Overrideprotected final void deallocate() {if (handle >= 0) {//1.清空PooledByteBuf对象的handle、chunk、memory变量值final long handle = this.handle;this.handle = -1;memory = null;//2.调用PoolArena的free()方法去释放内存//也就是将连续内存的区段添加到缓存 + 标记连续内存的区段为未使用;chunk.arena.free(chunk, handle, maxLength, cache);//3.调用PooledByteBuf的recycle()方法,将PooledByteBuf对象添加到对象池recycle();}}
}
(2)将连续内存的区段加到缓存
进行内存分配时,第一个步骤就是从缓存里寻找是否有对应大小的连续内存区段,如果有就直接取出来进行分配。
如果释放内存时,将连续内存的区段添加到缓存成功了,那么下次进行内存分配时,对于相同大小的PooledByteBuf,就可以从缓存中直接取出来进行使用了。
如果释放内存时,将连续内存的区段添加到缓存不成功,比如缓存队列已经满了就会不成功,那么就标记该PooledByteBuf对应的连续内存区段为未使用。
在PoolArena的free()方法中,首先会调用PoolThreadCache的add()方法将释放的连续内存区段添加到缓存,然后调用PoolArena的freeChunk()方法标记连续内存的区段为未使用。
abstract class PoolArena<T> implements PoolArenaMetric {private final PoolSubpage<T>[] tinySubpagePools;private final PoolSubpage<T>[] smallSubpagePools;private final PoolChunkList<T> qInit;private final PoolChunkList<T> q000;private final PoolChunkList<T> q025;private final PoolChunkList<T> q050;private final PoolChunkList<T> q075;private final PoolChunkList<T> q100;...void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {if (chunk.unpooled) {int size = chunk.chunkSize();destroyChunk(chunk);activeBytesHuge.add(-size);deallocationsHuge.increment();} else {//判断要释放的内存大小属于哪一种规格SizeClass sizeClass = sizeClass(normCapacity);//1.调用PoolThreadCache的add()方法将释放的连续内存区段添加到缓存if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {//cached so not free it.return;}//2.调用PoolArena的freeChunk()方法释放内存,也就是标记连续内存的区段为未使用freeChunk(chunk, handle, sizeClass);}}private SizeClass sizeClass(int normCapacity) {if (!isTinyOrSmall(normCapacity)) {return SizeClass.Normal;}return isTiny(normCapacity) ? SizeClass.Tiny : SizeClass.Small;}...
}final class PoolThreadCache {private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;private final MemoryRegionCache<byte[]>[] normalHeapCaches;private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;...//Add PoolChunk and handle to the cache if there is enough room.//Returns true if it fit into the cache false otherwise.@SuppressWarnings({ "unchecked", "rawtypes" })boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);if (cache == null) {return false;}//将PoolChunk的连续内存区段添加到缓存return cache.add(chunk, handle);}private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {switch (sizeClass) {case Normal:return cacheForNormal(area, normCapacity);case Small:return cacheForSmall(area, normCapacity);case Tiny:return cacheForTiny(area, normCapacity);default:throw new Error();}}private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {int idx = PoolArena.tinyIdx(normCapacity);if (area.isDirect()) {return cache(tinySubPageDirectCaches, idx);}return cache(tinySubPageHeapCaches, idx);}private MemoryRegionCache<?> cacheForSmall(PoolArena<?> area, int normCapacity) {int idx = PoolArena.smallIdx(normCapacity);if (area.isDirect()) {return cache(smallSubPageDirectCaches, idx);}return cache(smallSubPageHeapCaches, idx);}private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {if (area.isDirect()) {int idx = log2(normCapacity >> numShiftsNormalDirect);return cache(normalDirectCaches, idx);}int idx = log2(normCapacity >> numShiftsNormalHeap);return cache(normalHeapCaches, idx);}private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {if (cache == null || idx > cache.length - 1) {return null;}return cache[idx];}...private abstract static class MemoryRegionCache<T> {private final Queue<Entry<T>> queue;...//Add to cache if not already full.//将PoolChunk的连续内存区段添加到缓存@SuppressWarnings("unchecked")public final boolean add(PoolChunk<T> chunk, long handle) {Entry<T> entry = newEntry(chunk, handle);boolean queued = queue.offer(entry);if (!queued) {//If it was not possible to cache the chunk, immediately recycle the entryentry.recycle();}return queued;}...}
}
(3)标记连续内存的区段为未使用
标记方式会根据Page级别和SubPage级别进行标记,其中Page级别是根据二叉树来进行标记,SubPage级别是通过位图进行标记。
abstract class PoolArena<T> implements PoolArenaMetric {...void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {final boolean destroyChunk;synchronized (this) {switch (sizeClass) {case Normal:++deallocationsNormal;break;case Small:++deallocationsSmall;break;case Tiny:++deallocationsTiny;break;default:throw new Error();}//调用PoolChunk的parent也就是PoolChunkList的free()方法释放PoolChunkdestroyChunk = !chunk.parent.free(chunk, handle);}if (destroyChunk) {//destroyChunk not need to be called while holding the synchronized lock.destroyChunk(chunk);}}...
}final class PoolChunkList<T> implements PoolChunkListMetric {private PoolChunk<T> head;private PoolChunkList<T> prevList;...boolean free(PoolChunk<T> chunk, long handle) {//标记PoolChunk中连续内存区段为未使用chunk.free(handle);//如果要释放的PoolChunk的使用率小于当前PoolChunkList的最小使用率if (chunk.usage() < minUsage) {//从当前PoolChunkList中移除PoolChunkremove(chunk);//将PoolChunk添加到当前PoolChunkList的下一个PoolChunkList中return move0(chunk);}return true;}private void remove(PoolChunk<T> cur) {if (cur == head) {head = cur.next;if (head != null) {head.prev = null;}} else {PoolChunk<T> next = cur.next;cur.prev.next = next;if (next != null) {next.prev = cur.prev;}}}//Moves the PoolChunk down the PoolChunkList linked-list so it will end up in the right PoolChunkList //that has the correct minUsage / maxUsage in respect to PoolChunk#usage().private boolean move0(PoolChunk<T> chunk) {if (prevList == null) {//There is no previous PoolChunkList so return false which result in having the PoolChunk destroyed and//all memory associated with the PoolChunk will be released.assert chunk.usage() == 0;return false;}return prevList.move(chunk);}...
}final class PoolChunk<T> implements PoolChunkMetric {final PoolArena<T> arena;private final PoolSubpage<T>[] subpages;...//Free a subpage or a run of pages //When a subpage is freed from PoolSubpage, it might be added back to subpage pool of the owning PoolArena//If the subpage pool in PoolArena has at least one other PoolSubpage of given elemSize, //we can completely free the owning Page so it is available for subsequent allocations//@param handle handle to freevoid free(long handle) {int memoryMapIdx = memoryMapIdx(handle);int bitmapIdx = bitmapIdx(handle);if (bitmapIdx != 0) { // free a subpagePoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];assert subpage != null && subpage.doNotDestroy;//Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.//This is need as we may add it back and so alter the linked-list structure.PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);synchronized (head) {//2.SubPage级别通过位图进行标记if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {return;}}}//1.Page级别根据二叉树来进行标记freeBytes += runLength(memoryMapIdx);setValue(memoryMapIdx, depth(memoryMapIdx));updateParentsFree(memoryMapIdx);}...
}
(4)将ByteBuf对象添加到对象池
一开始时,对象池是没有PooledByteBuf对象的,当PooledByteBuf对象被释放时不会被立即销毁,而是会加入到对象池里。
这样当Netty每次去拿一个PooledByteBuf对象时,就可以先从对象池里获取,取出对象之后就可以进行内存分配以及初始化了。
考虑到PooledByteBuf对象会经常被申请和释放,如果QPS非常高,可能会产生很多PooledByteBuf对象,而且频繁创建和释放PooledByteBuf对象也会比较耗费资源和降低性能。
所以Netty便使用了对象池来减少GC:当申请PooledByteBuf对象时,就可以尽可能从对象池里去取。当释放PooledByteBuf对象时,则可以将对象添加到对象池,从而实现对象复用。
abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {private final Recycler.Handle<PooledByteBuf<T>> recyclerHandle;...private void recycle() {recyclerHandle.recycle(this);}...
}