目录
为什么使用快速列表quicklist
对比双向链表
对比压缩列表ziplist
quicklist结构
节点结构quicklistNode
quicklist
管理ziplist信息的结构quicklistEntry
迭代器结构quicklistIter
quicklist的API
1.创建快速列表
2.创建快速列表节点
3.头插quicklistPushHead 和尾插quicklistPushTail
4.特定位置插入元素(不是节点)
5.删除元素
6.查找元素
总结 quicklist的特性
为什么使用快速列表quicklist
在 Redis 的早期设计中,如果列表类型的对象中元素的长度较小或数量比较少的,就采用压缩列表来存储,反之则使用双向链表。
对比双向链表
双向链表便于在链表的两端进行插入和删除操作,在插入节点上复杂度很低,但是它的内存开销比较大,每个节点除了要保存数据之外,还要额外保存两个指针,并且双向链表的各个节点是单独的内存块,地址不连续,容易产生内存碎片。
对比压缩列表ziplist
压缩列表存储在一段连续的内存上,所以存储效率高。但是,它每次变更的时间复杂度都比较高,插入和删除操作需要频繁的申请和释放内存,如果压缩列表长度很长,一次 realloc
可能会导致大批量的数据拷贝。
如何保留ziplist的空间高效性,又能不让其更新复杂度过高?
Redis 在 3.2 版本之后引入了快速列表,列表类型的对象其底层都是由快速列表实现。快速列表是双向链表和压缩列表的混合体,它将双向链表按段切分,每一段都使用压缩列表来紧凑存储,多个压缩列表之间使用双向指针关联起来。
quicklist结构
quicklist是个双端链表,节点结构是quicklistNode,节点的zl字段指向压缩列表。
节点结构quicklistNode
quicklistNode是快速列表的节点。
typedef struct quicklistNode {struct quicklistNode *prev;struct quicklistNode *next;unsigned char *zl; //指向ziplistunsigned int sz; /* ziplist size in bytes */unsigned int count : 16; /* count of items in ziplist */unsigned int encoding : 2; /* RAW==1 or LZF==2 */unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */unsigned int recompress : 1; /* was this node previous compressed? */unsigned int attempted_compress : 1; /* node can't compress; too small */unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;
- prev:前驱节点指针。
- next:后驱节点指针。
- zl:数据指针,如果当前节点的数据没有压缩,则指向一个 ziplist 结构,否则指向一个 quicklistLZF 结构。
- sz:表示zl指向的数据总大小。注意,若数据被压缩,其表示压缩前的数据长度大小。
- count:占16bit,表示当前节点的ziplist的entry的个数
- encoding:占2bit,表示当前节点的数据是否被压缩。1表示没有压缩;2是压缩,用的是LZF算法。
- container:是一个预留字段,本来设计是用来表明一个quicklist节点下面是直接存数据,还是使用ziplist存数据,或者用其它的结构来存数据(用作数据容器,所以叫container)。目前这个值是一个固定的值2,表示使用 ziplist 作为数据容器。
recompress
:当我们使用类似lindex
这样的命令查看了某一项本来压缩的数据时,需要把数据暂时解压,这时就设置recompress=1
做一个标记,等有机会再把数据重新压缩。extra
:其它扩展字段。目前Redis的实现里也没用上。
quicklist
快速列表的结构,从其结构可以看出其是一个链表,保存了头尾节点。
#if UINTPTR_MAX == 0xffffffff
/* 32-bit */
# define QL_FILL_BITS 14
# define QL_COMP_BITS 14
# define QL_BM_BITS 4
#elif UINTPTR_MAX == 0xffffffffffffffff
/* 64-bit */
# define QL_FILL_BITS 16
# define QL_COMP_BITS 16
# define QL_BM_BITS 4 /* we can encode more, but we rather limit the usersince they cause performance degradation. */
#else
# error unknown arch bits count
#endif
//上面的表示:QL_FILL_BITS值在32位机器上是14,64位机器上是16typedef struct quicklist {quicklistNode *head; //头节点quicklistNode *tail; //尾结点unsigned long count; /* total count of all entries in all ziplists */unsigned long len; /* number of quicklistNodes */int fill : QL_FILL_BITS; /* fill factor for individual nodes */unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */unsigned int bookmark_count: QL_BM_BITS;quicklistBookmark bookmarks[];
} quicklist;//当指定使用lzf压缩算法压缩ziplist的entry节点时,quicklistNode结构的zl成员指向quicklistLZF结构
typedef struct quicklistLZF {//表示被LZF算法压缩后的ziplist的大小unsigned int sz; /* LZF size in bytes*///保存压缩后的ziplist的数组,柔性数组char compressed[];
} quicklistLZF;/* quicklist node encodings */
#define QUICKLIST_NODE_ENCODING_RAW 1
#define QUICKLIST_NODE_ENCODING_LZF 2/* quicklist compression disable */
#define QUICKLIST_NOCOMPRESS 0/* quicklist container formats */
#define QUICKLIST_NODE_CONTAINER_NONE 1
#define QUICKLIST_NODE_CONTAINER_ZIPLIST 2#define quicklistNodeIsCompressed(node) \((node)->encoding == QUICKLIST_NODE_ENCODING_LZF)
- count:所有压缩列表的节点数量之和
- len:快速类别的节点数量
- fill:存放
list-max-ziplist-size
参数的值,用于设置每个quicklistnode的压缩列表的所有entry的总长度大小。其值默认是-2,表示每个quicklistNode节点的ziplist所占字节数不能超过8kb。若是任意正数:,表示ziplist结构所最多包含的entry个数,最大为215215。 - compress:存放
list-compress-depth
参数的值,用于设置压缩深度。快速列表默认的压缩深度为 0,即不压缩。为了支持快速的 push/pop 操作,快速列表的首尾两个节点不压缩,此时压缩深度就是1。若压缩深度为2,表示快速列表的首尾第一个及第二个节点都不压缩。 bookmark_count
:占 4 bit,bookmarks
数组的长度。bookmarks
:这是一个可选字段,快速列表重新分配内存时使用,不使用时不占用空间。
管理ziplist信息的结构quicklistEntry
和压缩列表一样,entry结构在储存时是一连串的内存块,需要将其每个entry节点的信息读取到管理该信息的结构体中,以便操作。
//管理quicklist中quicklistNode节点中ziplist信息的结构
typedef struct quicklistEntry {const quicklist *quicklist; //指向所属的quicklist的指针quicklistNode *node; //指向所属的quicklistNode节点的指针unsigned char *zi; //指向当前ziplist结构的指针unsigned char *value; //查找到的元素如果是字符串,则存在value字段long long longval; //查找到的元素如果是整数,则存在longval字段unsigned int sz; //保存当前元素的长度int offset; //保存查找到的元素距离压缩列表头部/尾部隔了多少个节点
} quicklistEntry;
迭代器结构quicklistIter
在Redis的quicklist结构中,实现了自己的迭代器,用于遍历节点。
//quicklist的迭代器结构
typedef struct quicklistIter {const quicklist *quicklist; //指向所属的quicklist的指针quicklistNode *current; //指向当前迭代的quicklist节点的指针unsigned char *zi; //指向当前quicklist节点中迭代的ziplistlong offset; //当前ziplist结构中的偏移量 int direction; //迭代方向
} quicklistIter;
quicklist的API
1.创建快速列表
创建快速列表,快速列表的成员变量都使用默认值
/* Create a new quicklist.* Free with quicklistRelease(). */
//创建快速列表,并对各个字段进行初始化
quicklist *quicklistCreate(void) {struct quicklist *quicklist;quicklist = zmalloc(sizeof(*quicklist));quicklist->head = quicklist->tail = NULL;quicklist->len = 0;quicklist->count = 0;quicklist->compress = 0;quicklist->fill = -2;quicklist->bookmark_count = 0;return quicklist;
}
创建列表,传入自己设置的参数
//设置压缩深度
#define COMPRESS_MAX ((1 << QL_COMP_BITS)-1)
void quicklistSetCompressDepth(quicklist *quicklist, int compress) {if (compress > COMPRESS_MAX) {compress = COMPRESS_MAX;} else if (compress < 0) {compress = 0;}quicklist->compress = compress;
}//设置压缩列表的大小(成员fill),即是每个压缩列表的总entry的总长度大小
#define FILL_MAX ((1 << (QL_FILL_BITS-1))-1)
void quicklistSetFill(quicklist *quicklist, int fill) {if (fill > FILL_MAX) {fill = FILL_MAX;} else if (fill < -5) {fill = -5;}quicklist->fill = fill;
}//设置快速列表的参数
void quicklistSetOptions(quicklist *quicklist, int fill, int depth) {quicklistSetFill(quicklist, fill);quicklistSetCompressDepth(quicklist, depth);
}//通过一些默认参数创建快速列表,就是调用了前面这些封装好的函数
quicklist *quicklistNew(int fill, int compress) {quicklist *quicklist = quicklistCreate();quicklistSetOptions(quicklist, fill, compress);return quicklist;
}
2.创建快速列表节点
REDIS_STATIC quicklistNode *quicklistCreateNode(void) {quicklistNode *node;node = zmalloc(sizeof(*node));node->zl = NULL;node->count = 0;node->sz = 0;node->next = node->prev = NULL;node->encoding = QUICKLIST_NODE_ENCODING_RAW; //默认不压缩node->container = QUICKLIST_NODE_CONTAINER_ZIPLIST; //默认使用压缩列表结构来存数据node->recompress = 0;return node;
}
3.头插quicklistPushHead
和尾插quicklistPushTail
/* 头部插入* 如果在已存在节点插入,返回0* 如果是在新的头结点插入,返回1 */
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {quicklistNode *orig_head = quicklist->head;//判断头节点的空间是否足够容纳要添加的元素if (likely(_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {quicklist->head->zl =ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD); // 在头结点对应的ziplist中插入 quicklistNodeUpdateSz(quicklist->head);} else { // 否则新建一个头结点,然后插入数据 quicklistNode *node = quicklistCreateNode();node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);quicklistNodeUpdateSz(node);//新增元素添加进这个新的快速列表节点里_quicklistInsertNodeBefore(quicklist, quicklist->head, node);}quicklist->count++;quicklist->head->count++;return (orig_head != quicklist->head);
}/* 尾部插入。* 如果在已存在节点插入,返回0* 如果是在新的头结点插入,返回1 */
int quicklistPushTail(quicklist *quicklist, void *value, size_t sz) {quicklistNode *orig_tail = quicklist->tail;if (likely(_quicklistNodeAllowInsert(quicklist->tail, quicklist->fill, sz))) {quicklist->tail->zl =ziplistPush(quicklist->tail->zl, value, sz, ZIPLIST_TAIL);quicklistNodeUpdateSz(quicklist->tail);} else {quicklistNode *node = quicklistCreateNode();node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_TAIL);quicklistNodeUpdateSz(node);_quicklistInsertNodeAfter(quicklist, quicklist->tail, node);}quicklist->count++;quicklist->tail->count++;return (orig_tail != quicklist->tail);
}
头插和尾插都是先调用了_quicklistNodeAllowInsert来判断能否在当前头/尾节点插入。如果能插入就直接插入到对应的ziplist中,否则就需要新建一个新节点再进行操作。
前面讲解过的quicklist结构的fill字段,其实_quicklistNodeAllowInsert就是根据fill的值来判断是否已经超过最大容量的。
其中使用到函数_quicklistInsertNodeBefore 和 _quicklistInsertNodeBefore,这两个就是在指定位置插入元素。
4.特定位置插入元素(不是节点)
注意:我们使用Redis,接触到的快速列表插入的都是插入元素,不是插入快速列表的节点。
插入元素会使用到结构体quicklistEntry。
void quicklistInsertBefore(quicklist *quicklist, quicklistEntry *entry,void *value, const size_t sz) {_quicklistInsert(quicklist, entry, value, sz, 0);
}void quicklistInsertAfter(quicklist *quicklist, quicklistEntry *entry,void *value, const size_t sz) {_quicklistInsert(quicklist, entry, value, sz, 1);
}/* 在一个已经存在的entry前面或者后面插入一个新的entry * 如果after==1表示插入到后面,否则是插入到前面 */
REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,void *value, const size_t sz, int after) {int full = 0, at_tail = 0, at_head = 0, full_next = 0, full_prev = 0;int fill = quicklist->fill;//1. 获取插入位置的quicklist的节点,通过entry的node字段quicklistNode *node = entry->node;quicklistNode *new_node = NULL;assert(sz < UINT32_MAX); /* TODO: add support for quicklist nodes that are sds encoded (not zipped) *///2. 该节点不存在,也只有当快速列表的头或尾节点存在才会进入这个if条件if (!node) {/* we have no reference node, so let's create only node in the list */D("No node given!");new_node = quicklistCreateNode();//将添加的元素插入快速列表节点对应的压缩列表的节点头部new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);__quicklistInsertNode(quicklist, NULL, new_node, after);new_node->count++;quicklist->count++;return;}//3. 判断node节点对应的压缩列表是否能够容纳得下要添加的元素,即node节点是否已满if (!_quicklistNodeAllowInsert(node, fill, sz)) {D("Current node is full with count %d with requested fill %lu",node->count, fill);//表示当前节点的数量已满full = 1;}//4. 判断是否需要是在尾部添加if (after && (entry->offset == node->count)) {//表示在尾部添加at_tail = 1;if (!_quicklistNodeAllowInsert(node->next, fill, sz)) {D("Next node is full too.");//表示下一quicklistNode的ziplist的entry已满了full_next = 1;}}//5.判断是否在头部添加if (!after && (entry->offset == 0)) {D("At Head");at_head = 1;if (!_quicklistNodeAllowInsert(node->prev, fill, sz)) {D("Prev node is full too.");//表示前一节点已满full_prev = 1;}}//未完待续..........,后面再讲解
}
- 1.通过通过entry的node字段获取插入位置的quicklist的节点
- 2.判断该节点是否存在,若不存在,就创建节点,并创建ziplist,插入该元素到ziplist
- 3.判断node节点对应的压缩列表是否能够容纳得下要添加的元素,即node节点是否已满,用来设置变量full
- 4.判断是否在该quicklistNode的ziplist的尾部添加,并判断该quicklistNode的下一节点的ziplist是否已满
- 5.判断是否在该quicklistNode的ziplist的头部添加,并判断该quicklistNode的前驱节点的ziplist是否已满
REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,void *value, const size_t sz, int after) {//........................................../* Now determine where and how to insert the new element */if (!full && after) {//6. 当前节点的zipList不满,并且是在当前位置的后面插入D("Not full, inserting after current position.");quicklistDecompressNodeForUse(node); //当前节点解压缩//entry->zi是ziplist的一个entry,返回entry->zi的下一个ziplist的entryunsigned char *next = ziplistNext(node->zl, entry->zi);if (next == NULL) {node->zl = ziplistPush(node->zl, value, sz, ZIPLIST_TAIL);} else {node->zl = ziplistInsert(node->zl, next, value, sz);}node->count++;quicklistNodeUpdateSz(node);//添加完元素后再根据node->recompress判断是否对压缩列表进行压缩quicklistRecompressOnly(quicklist, node);} else if (!full && !after) {//7. 当前节点的ziplist不满,在当前entry的前面插入D("Not full, inserting before current position.");quicklistDecompressNodeForUse(node);node->zl = ziplistInsert(node->zl, entry->zi, value, sz);node->count++;quicklistNodeUpdateSz(node);quicklistRecompressOnly(quicklist, node);} else if (full && at_tail && node->next && !full_next && after) {/* If we are: at tail, next has free space, and inserting after:* - insert entry at head of next node. *///8. D("Full and tail, but next isn't full; inserting next node head");new_node = node->next;quicklistDecompressNodeForUse(new_node);new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_HEAD);new_node->count++;quicklistNodeUpdateSz(new_node);quicklistRecompressOnly(quicklist, new_node);} else if (full && at_head && node->prev && !full_prev && !after) {/* If we are: at head, previous has free space, and inserting before:* - insert entry at tail of previous node. *///9. D("Full and head, but prev isn't full, inserting prev node tail");new_node = node->prev;quicklistDecompressNodeForUse(new_node);new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_TAIL);new_node->count++;quicklistNodeUpdateSz(new_node);quicklistRecompressOnly(quicklist, new_node);} else if (full && ((at_tail && node->next && full_next && after) ||(at_head && node->prev && full_prev && !after))) {/* If we are: full, and our prev/next is full, then:* - create new node and attach to quicklist *///10. D("\tprovisioning new node...");new_node = quicklistCreateNode();new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);new_node->count++;quicklistNodeUpdateSz(new_node);__quicklistInsertNode(quicklist, node, new_node, after);} else if (full) {/* else, node is full we need to split it. *//* covers both after and !after cases */D("\tsplitting node...");//11quicklistDecompressNodeForUse(node);new_node = _quicklistSplitNode(node, entry->offset, after);new_node->zl = ziplistPush(new_node->zl, value, sz,after ? ZIPLIST_HEAD : ZIPLIST_TAIL);new_node->count++;quicklistNodeUpdateSz(new_node);__quicklistInsertNode(quicklist, node, new_node, after);_quicklistMergeNodes(quicklist, node);}quicklist->count++;
}
这部分主要是分了几种情况来插入:
- 6.当前节点的ziplist没满,并在当前entry的后面插入
- 7.当前节点的ziplist没满,并在当前entry的前面插入
- 8.当前节点的ziplist已满、要添加在尾部、并且后移节点是存在的、后一节点的ziplist没满,那就添加到后一节点对应的ziplist的第一个entry的前面
- 9.当前节点的ziplist已满、要添加在头部、并且前一节点存在、前一节点的ziplist没满,就添加到前一节点的ziplist的尾部。
- 10.当前节点的ziplist已满、插入的位置是在头/尾的、并且当前节点的前/后节点的ziplist已满,则需要创建新的quicklistNode来存放要放的元素。
- 11.当前节点的ziplist已满,但是插入的位置不在两端的,则要从插入位置把当前节点分裂成两个节点
5.删除元素
快速列表删除元素有两个函数 quicklistDelEntry
和 quicklistDelRange
,分别是删除单个元素和删除某个区间的元素。
删除元素使用到了迭代器结构quicklistIter,需要更新迭代器对应的节点等信息。
/* Delete one element represented by 'entry'*/
void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry) {quicklistNode *prev = entry->node->prev;quicklistNode *next = entry->node->next;//删除元素,返回值 deleted_node 表示当前节点是否要删除。1表示该节点已删除int deleted_node = quicklistDelIndex((quicklist *)entry->quicklist,entry->node, &entry->zi);/* after delete, the zi is now invalid for any future usage. */iter->zi = NULL;/* If current node is deleted, we must update iterator node and offset. */if (deleted_node) {if (iter->direction == AL_START_HEAD) {iter->current = next;iter->offset = 0;} else if (iter->direction == AL_START_TAIL) {iter->current = prev;iter->offset = -1;}}
}REDIS_STATIC int quicklistDelIndex(quicklist *quicklist, quicklistNode *node,unsigned char **p) {int gone = 0;//删除 node 节点对应的压缩列表 p 位置的entry,返回新的zipListnode->zl = ziplistDelete(node->zl, p);node->count--;if (node->count == 0) {gone = 1;__quicklistDelNode(quicklist, node);//当前节点的ziplist的entry个数为0,就删除该节点} else {quicklistNodeUpdateSz(node); //更新该节点的ziplist的总长度大小}quicklist->count--;/* If we deleted the node, the original node is no longer valid */return gone ? 1 : 0;
}
6.查找元素
Redis中没有提供直接查找元素的API。查找元素是通过遍历查找的,这就需要通过上面所讲的迭代器quicklistIter。
quicklistIter *iter = quicklistGetIterator(ql, AL_START_HEAD);
quicklistEntry entry;
int i = 0;
while (quicklistNext(iter, &entry)) { //获取迭代器中的下一个元素//比较当前的压缩列表节点存储的元素与所要查找的是否相同if (quicklistCompare(entry.zi, (unsigned char *)"bar", 3)) {//进行一些操作...........}i++;
}
总结 quicklist的特性
- 其是一个节点为ziplist的双端链表
- 节点采用了ziplist,解决了传统链表的内存占用和易产生内存碎片问题
- 对比单个ziplist,quicklist使用了多个ziplist,那每个ziplist的entry个数就可以控制的比较小,解决了连续空间申请效率和ziplist变更的时间复杂度过于大的问题
- 中间节点可以压缩,进一步节省了内存