Redis实现-跳跃表

Redis实现-跳跃表

一、什么是跳跃表

跳跃表(skiplist)是一种有序数据结构,它通过在每个节点中维持多个指向其他节点的指针,从而达到快速访问节点的目的。

跳跃表支持平均O(logN)、最坏O(N)复杂度

在大部分情况下,跳跃表的效率可以和平衡树相媲美,并且因为跳跃表的实现比平衡树要更简单。

跳跃表实例

1
2
3
4
5
1. 一个跳跃表应该有若干个层(Level)链表组成
2. 跳跃表中最底层的链表包含所有数据; 每一层链表中的数据都是有序的
3. 如果一个元素 X 出现在第i层,那么编号比 i 小的层都包含元素 X
4. 第 i 层的元素通过一个指针指向下一层拥有相同值的元素
5. 头指针(head)指向最高一层的第一个元素

二、跳跃表在Redis中的使用

Redis使用跳跃表作为有序集合键的底层实现之一,如果一个有序集合包含的元素数量比较多,或者有序集合中的元素的成员是比较长的字符串时,Redis就会使用跳跃表来作为有序集合键的底层实现。

1
2
3
4
5
6
7
8
127.0.0.1:6379> ZADD runoobket 1 redis
(integer) 1
127.0.0.1:6379> ZADD runoobket 2 mongodb
(integer) 1
127.0.0.1:6379> ZADD runoobket 3 mysql
(integer) 1
127.0.0.1:6379> ZADD runoobket 4 mysql
(integer) 0

三、跳跃表的实现

在Redis中,跳跃表有两个结构来实现。zskiplistNode和zskiplist

zskiplist结构用于保存跳跃表节点的相关信息。

zskiplistNode用于表示跳跃表的节点。

跳跃表节点

1、跳跃表节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
typedef struct zskiplistNode {
// 成员对象
robj *obj;

// 分值
double score;

// 后退指针
struct zskiplistNode *backward;

// 层
struct zskiplistLevel {
// 前进指针
struct zskiplistNode *forward;
// 跨度
unsigned int span;
} level[];

} zskiplistNode;

节点

  • 节点中的层是一个level[]数组。包含一个指向其他节点的指针。

    上图中展示了节点分别为1层和3层。

  • 前进指针

    每一层都有一个指向表尾的前进指针(level[n].forward属性),用于从表头向表尾方法访问节点,实际就是这个前进指针就是遍历跳跃表的所有节点的路径。

  • 跨度

    层的跨度(level[n].span属性)用于记录两个节点之间的距离。

    1
    2
    1. 两个节点之间的跨度越大,它们相距就越远
    2. 前进指针是NULL时,跨度为0,因为它们没有连向任何节点

    跨度实际上是用来计算排位(rank)的:在查找某个节点的过程中,将沿途访问过的所有层的跨度累计起来,得到的结果就是目标节点在跳跃表的排位。

  • 后退指针

    节点中的后退指针(backword属性)用于从表尾向表头方向访问节点。

    每个节点有多层,就有多个前进指针。但一个节点只有后退指针,而且每次只能后退到当前节点的前一个结点。

  • 分值

    节点的分值(score属性)是一个double类型的浮点数,跳跃表中的所有结点都按分值来排序。

  • 成员

    节点的成员(obj属性)是一个指针,指向一个字符串对象,这个字符串对象保存一个SDS值。

    在同一个跳跃表中,各个节点保存的成员对象必须是唯一的。但是节点的分值可以是相同的。

    如果分值相同,那么节点将按照成员对象在字典序中的大小来进行排序。

2、跳跃表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/*
* 跳跃表
*/
typedef struct zskiplist {

// 表头节点和表尾节点
struct zskiplistNode *header, *tail;

// 表中节点的数量
unsigned long length;

// 表中层数最大的节点的层数
int level;

} zskiplist;

多个跳跃表节点通过前进指针和后退指针进行连接。

我们通过zskiplist结构来持有这些节点。

header和tail指针分别指向跳跃表的表头和表尾节点,通过这两个节点,程序可以快速定位表头节点和表尾节点,时间复杂度为o(1)

length属性来记录节点的数量,表头结点不计算在内。

level属性,记录最大的层数。

redis中的跳跃表

四、源码

创建一个层数为 level 的跳跃表节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
* 创建一个层数为 level 的跳跃表节点,
* 并将节点的成员对象设置为 obj ,分值设置为 score 。
*
* 返回值为新创建的跳跃表节点
*
* T = O(1)
*/
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {

// 分配空间
zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));

// 设置属性
zn->score = score;
zn->obj = obj;

return zn;
}

创建并返回一个新的跳跃表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/*
* 创建并返回一个新的跳跃表
*
* T = O(1)
*/
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;

// 分配空间
zsl = zmalloc(sizeof(*zsl));

// 设置高度和起始层数
zsl->level = 1;
zsl->length = 0;

// 初始化表头节点
// T = O(1)
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;

// 设置表尾
zsl->tail = NULL;

return zsl;
}

将新节点插入到跳跃表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/*
* 创建一个成员为 obj ,分值为 score 的新节点,
* 并将这个新节点插入到跳跃表 zsl 中。
*
* 函数的返回值为新节点。
*
* T_wrost = O(N^2), T_avg = O(N log N)
*/
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;

redisAssert(!isnan(score));

// 在各个层查找节点的插入位置
// T_wrost = O(N^2), T_avg = O(N log N)
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {

/* store rank that is crossed to reach the insert position */
// 如果 i 不是 zsl->level-1 层
// 那么 i 层的起始 rank 值为 i+1 层的 rank 值
// 各个层的 rank 值一层层累积
// 最终 rank[0] 的值加一就是新节点的前置节点的排位
// rank[0] 会在后面成为计算 span 值和 rank 值的基础
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];

// 沿着前进指针遍历跳跃表
// T_wrost = O(N^2), T_avg = O(N log N)
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
// 比对分值
(x->level[i].forward->score == score &&
// 比对成员, T = O(N)
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {

// 记录沿途跨越了多少个节点
rank[i] += x->level[i].span;

// 移动至下一指针
x = x->level[i].forward;
}
// 记录将要和新节点相连接的节点
update[i] = x;
}

/*
*
* zslInsert() 的调用者会确保同分值且同成员的元素不会出现,
* 所以这里不需要进一步进行检查,可以直接创建新元素。
*/

// 获取一个随机值作为新节点的层数
// T = O(N)
level = zslRandomLevel();

// 如果新节点的层数比表中其他节点的层数都要大
// 那么初始化表头节点中未使用的层,并将它们记录到 update 数组中
// 将来也指向新节点
if (level > zsl->level) {

// 初始化未使用层
// T = O(1)
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}

// 更新表中节点最大层数
zsl->level = level;
}

// 创建新节点
x = zslCreateNode(level,score,obj);

// 将前面记录的指针指向新节点,并做相应的设置
// T = O(1)
for (i = 0; i < level; i++) {

// 设置新节点的 forward 指针
x->level[i].forward = update[i]->level[i].forward;

// 将沿途记录的各个节点的 forward 指针指向新节点
update[i]->level[i].forward = x;

/* update span covered by update[i] as x is inserted here */
// 计算新节点跨越的节点数量
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);

// 更新新节点插入之后,沿途节点的 span 值
// 其中的 +1 计算的是新节点
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}

/* increment span for untouched levels */
// 未接触的节点的 span 值也需要增一,这些节点直接从表头指向新节点
// T = O(1)
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}

// 设置新节点的后退指针
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;

// 跳跃表的节点计数增一
zsl->length++;

return x;
}

从跳跃表删除给定节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/* 
* 从跳跃表 zsl 中删除包含给定节点 score 并且带有指定对象 obj 的节点。
*
* T_wrost = O(N^2), T_avg = O(N log N)
*/
int zslDelete(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;

// 遍历跳跃表,查找目标节点,并记录所有沿途节点
// T_wrost = O(N^2), T_avg = O(N log N)
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {

// 遍历跳跃表的复杂度为 T_wrost = O(N), T_avg = O(log N)
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
// 比对分值
(x->level[i].forward->score == score &&
// 比对对象,T = O(N)
compareStringObjects(x->level[i].forward->obj,obj) < 0)))

// 沿着前进指针移动
x = x->level[i].forward;

// 记录沿途节点
update[i] = x;
}

/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object.
*
* 检查找到的元素 x ,只有在它的分值和对象都相同时,才将它删除。
*/
x = x->level[0].forward;
if (x && score == x->score && equalStringObjects(x->obj,obj)) {
// T = O(1)
zslDeleteNode(zsl, x, update);
// T = O(1)
zslFreeNode(x);
return 1;
} else {
return 0; /* not found */
}

return 0; /* not found */
}

释放给定跳跃表,以及表中的所有节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
* 释放给定跳跃表,以及表中的所有节点
*
* T = O(N)
*/
void zslFree(zskiplist *zsl) {
zskiplistNode *node = zsl->header->level[0].forward, *next;
// 释放表头
zfree(zsl->header);
// 释放表中所有节点
// T = O(N)
while(node) {
next = node->level[0].forward;
zslFreeNode(node);
node = next;
}
// 释放跳跃表结构
zfree(zsl);
}

Redis实现-跳跃表
https://johnjoyjzw.github.io/2021/09/10/Redis实现-跳跃表/
Author
John Joy
Posted on
September 10, 2021
Licensed under