Redis源码解析-skiplist跳跃表

短命女 2022-12-31 07:21 204阅读 0赞

前言


跳跃表是一种有序数据结构,查找和插入操作的平均时间复杂度都是O(log n)。与常用的自平衡搜索树相比,例如红黑树,跳跃表通过多层链表实现,其结构简单易于实现,其查询删除效率通常堪比红黑树。本篇文章会对跳跃表简要说明,并重点分析Redis跳跃表核心源码

其他文章:Redis简单动态字符串SDS源码解析

正文


1. 跳跃表

跳跃表是通过多层有序链表实现的。如下图的有序链表,假设需要查找等于7的元素,必须从头结点(元素为1)依次遍历链表,共经过7次查询比较,找出目标结点。也就是其查询时间复杂度为O(1),删除同理

997f2c9048a4c4b94874609397aca557.png

假如给单个有序链表加索引会怎么样?

如下图,基于单个有序列表,又添加了几层有序列表,可以理解为索引。那么其查询过程为

  • 第一次,1<7,next=9>7,降级
  • 第二次,1.next=5<7,5.next=9>7,降级
  • 第三次,5.next=7,查找到目标结果

可以看到,只需经过四次查询,三次比较即可找出结果,一个设计良好的跳跃表,其时间复杂度能达到O(logn)。这就是跳跃表,道理就是这么简单。那么下面会详细介绍Redis跳跃表的具体实现,其细节还是比较多的

0d23dbe4a98a750b5263d6b0d84dd2f0.png

2. Redis跳跃表数据结构

2.1 跳跃表整体结构

Redis跳跃表的结构如下图,对照着示意图,先对其结构进行简要说明,再分析源码

  • header: 指向表头的指针
  • tail:指向表尾的指针
  • level:层数最大的结点,不包含表头,在图中也就是包含后三个结点
  • BW:backward,后退指针,指向后一个结点
  • score:分值,就是我们用ZADD命令插入时设置的score,1.0,2.0…等
  • sds指针:对应图中o1,o2…,其数据结构为sds,可参考前言

df736bb2b34c93127a470004139e3c22.png

2.2 跳跃表结点源码

zskiplistNode为跳跃表的结点,定义在server.h文件中,其中

  • ele为sds类型字符串,其作用是存储数据
  • level为层级,定义为数组,其中每个元素包含前进指针,和span。注意:层级不包含头节点,头节点再Redis5.0创建跳跃表时会初始化为最大值,也就是64
  • span为当前层当前结点与下一个结点之前跨过的元素个数,span的目的是为了计算rank(排位),也就是第几个元素。例如前面说查询元素7,那么怎么知道7是第几个元素?,每个层结点维护了一个span变量,所以即使跨元素查找,也能通过计算得出rank

    / ZSETs use a specialized version of Skiplists /
    typedef struct zskiplistNode {

    1. sds ele;
    2. double score;
    3. struct zskiplistNode *backward;
    4. struct zskiplistLevel {
    5. struct zskiplistNode *forward;
    6. unsigned long span;
    7. } level[];

    } zskiplistNode;

    //在头文件引入sds数据结构
    typedef char *sds;

2.3 跳跃表结构源码

有了节点的定义,需要一个结构管理这些节点,Redis定义zskiplist跳跃表结构。如下图,就像上面说明那样,其维护了两个指针,分别指向表头,表尾;还有跳表的长度和层高

  1. typedef struct zskiplist {
  2. struct zskiplistNode *header, *tail;
  3. unsigned long length;
  4. int level;
  5. } zskiplist;

3. 创建跳跃表

3.1 创建跳跃表结构

可以看到,创建跳跃表结构分为以下几个步骤

  • 声明结构指针,分配空间
  • 初始化结构层级为1(跳表最大层级),长度初始化为0(跳表长度)
  • 创建表头节点,初始化表头层级为ZSKIPLIST_MAXLEVEL=64
  • 将表头的每层的forward指针初始化为null,将span初始化为0
  • 最后将后退指针,以及表尾指针初始化为null

    / Create a new skiplist. /
    zskiplist *zslCreate(void) {

    1. int j;
    2. zskiplist *zsl;
    3. zsl = zmalloc(sizeof(*zsl));
    4. zsl->level = 1;
    5. zsl->length = 0;
    6. zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    7. for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
    8. zsl->header->level[j].forward = NULL;
    9. zsl->header->level[j].span = 0;
    10. }
    11. zsl->header->backward = NULL;
    12. zsl->tail = NULL;
    13. return zsl;

    }

3.2 zslCreateNode创建节点方法

该方法会创建一个指定层级,指定分数,指定字符串元素的节点

  • 可以看到首先声明zn指针,并指向一块分配好的内存块,通过malloc函数分配,其内存块大小为在zn指针基础上,偏量为移层数 x 每层大小。然后给其元素赋值并返回指针

    / Create a skiplist node with the specified number of levels. The SDS string ‘ele’ is referenced by the node after the call. /
    zskiplistNode
    zslCreateNode(int level, double score, sds ele) {

    1. zskiplistNode *zn =
    2. zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    3. zn->score = score;
    4. zn->ele = ele;
    5. return zn;

    }


4. 插入节点

我们知道在单链表中,插入一个结点n需要维护前一个结点信息,以便改变指针实现插入。Redis跳表同样的道理,不过它维护的是一个长度为64的数组update[],还需要维护一个rank[]数组用来计算排位。其源码如下

zslInsert方法看起,它传入zskiplist结构指针,代表向哪个跳跃表插入元素,需要插入元素的分数,和字符串值。首先声明两个数组就是上面说的。可以看到首先定义一个zskiplistNode指针类型x,从头部依次遍历

4.1 查找要插入的位置

  • 意思为最高层开始,每次循环降低一层
  • 重点关注其中的while循环,如果forward指向的下个元素分数小于等于当前分数,则向后继续向后遍历
  • while循环同时还会计算排位,可以看到rank排位是通过当前层的span累加得出
  • 最后赋值update

    for (i = zsl->level-1; i >= 0; i—) {

    1. /* store rank that is crossed to reach the insert position */
    2. rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
    3. while (x->level[i].forward &&
    4. (x->level[i].forward->score < score ||
    5. (x->level[i].forward->score == score &&
    6. sdscmp(x->level[i].forward->ele,ele) < 0)))
    7. {
    8. rank[i] += x->level[i].span;
    9. x = x->level[i].forward;
    10. }
    11. update[i] = x;
    12. }

4.2 生成随机层高

这里的算法类似幂次定律,层高概率为,越高的层概率越小。其中ZSKIPLIST_P=0.25。整个算法过程为

  • 生成随机数取低16位,与0.25 x 0xffff比较,来判断是否level+1。

    / Returns a random level for the new skiplist node we are going to create. The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL (both inclusive), with a powerlaw-alike distribution where higher levels are less likely to be returned. */
    int zslRandomLevel(void) {

    1. int level = 1;
    2. while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
    3. level += 1;
    4. return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;

    }

4.3 调整最大层级

当插入的结点level大于跳表最大层高,需要更新level

  1. if (level > zsl->level) {
  2. for (i = zsl->level; i < level; i++) {
  3. rank[i] = 0;
  4. update[i] = zsl->header;
  5. update[i]->level[i].span = zsl->length;
  6. }
  7. zsl->level = level;
  8. }

4.4 插入节点

其方式跟单链表插入大同小异

  • 把x当前层的forward指针指向update的当前层下一个结点
  • 再将update当前层的forward指向x
  • 最后是更新span的值

    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {

    1. x->level[i].forward = update[i]->level[i].forward;
    2. update[i]->level[i].forward = x;
    3. /* update span covered by update[i] as x is inserted here */
    4. x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
    5. update[i]->level[i].span = (rank[0] - rank[i]) + 1;

    }

4.5 收尾工作

  • 更新上层span值
  • 调整backward指针,update[0]相当于第一层,用来存储backward指针

    / increment span for untouched levels /

    1. for (i = level; i < zsl->level; i++) {
    2. update[i]->level[i].span++;
    3. }
    4. x->backward = (update[0] == zsl->header) ? NULL : update[0];
    5. if (x->level[0].forward)
    6. x->level[0].forward->backward = x;
    7. else
    8. zsl->tail = x;
    9. zsl->length++;

完整的插入代码如下

  1. //插入整体代码
  2. /* Returns a random level for the new skiplist node we are going to create. * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL * (both inclusive), with a powerlaw-alike distribution where higher * levels are less likely to be returned. */
  3. int zslRandomLevel(void) {
  4. int level = 1;
  5. while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
  6. level += 1;
  7. return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
  8. }
  9. zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
  10. zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
  11. unsigned int rank[ZSKIPLIST_MAXLEVEL];
  12. int i, level;
  13. serverAssert(!isnan(score));
  14. x = zsl->header;
  15. for (i = zsl->level-1; i >= 0; i--) {
  16. /* store rank that is crossed to reach the insert position */
  17. rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
  18. while (x->level[i].forward &&
  19. (x->level[i].forward->score < score ||
  20. (x->level[i].forward->score == score &&
  21. sdscmp(x->level[i].forward->ele,ele) < 0)))
  22. {
  23. rank[i] += x->level[i].span;
  24. x = x->level[i].forward;
  25. }
  26. update[i] = x;
  27. }
  28. /* we assume the element is not already inside, since we allow duplicated * scores, reinserting the same element should never happen since the * caller of zslInsert() should test in the hash table if the element is * already inside or not. */
  29. level = zslRandomLevel();
  30. if (level > zsl->level) {
  31. for (i = zsl->level; i < level; i++) {
  32. rank[i] = 0;
  33. update[i] = zsl->header;
  34. update[i]->level[i].span = zsl->length;
  35. }
  36. zsl->level = level;
  37. }
  38. x = zslCreateNode(level,score,ele);
  39. for (i = 0; i < level; i++) {
  40. x->level[i].forward = update[i]->level[i].forward;
  41. update[i]->level[i].forward = x;
  42. /* update span covered by update[i] as x is inserted here */
  43. x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
  44. update[i]->level[i].span = (rank[0] - rank[i]) + 1;
  45. }
  46. /* increment span for untouched levels */
  47. for (i = level; i < zsl->level; i++) {
  48. update[i]->level[i].span++;
  49. }
  50. x->backward = (update[0] == zsl->header) ? NULL : update[0];
  51. if (x->level[0].forward)
  52. x->level[0].forward->backward = x;
  53. else
  54. zsl->tail = x;
  55. zsl->length++;
  56. return x;
  57. }

5. 删除结点

根据分值来删除结点的代码如下

  • 首先第一个for跟创建结点前的查找逻辑相似
  • 然后比较分值,如果相同则通过zslDeleteNode删除结点

    int zslDelete(zskiplist zsl, double score, sds ele, zskiplistNode *node) {

    1. zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    2. int i;
    3. x = zsl->header;
    4. for (i = zsl->level-1; i >= 0; i--) {
    5. while (x->level[i].forward &&
    6. (x->level[i].forward->score < score ||
    7. (x->level[i].forward->score == score &&
    8. sdscmp(x->level[i].forward->ele,ele) < 0)))
    9. {
    10. x = x->level[i].forward;
    11. }
    12. update[i] = x;
    13. }
    14. /* We may have multiple elements with the same score, what we need * is to find the element with both the right score and object. */
    15. x = x->level[0].forward;
    16. if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
    17. zslDeleteNode(zsl, x, update);
    18. if (!node)
    19. zslFreeNode(x);
    20. else
    21. *node = x;
    22. return 1;
    23. }
    24. return 0; /* not found */

    }

zslDeleteNode为真正删除结点的函数,源代码如下

  • 删除结点前,首先更新前一个结点每层的的span
  • 然后将每层的前一个结点的forward指向被删除结点只想的结点(基础的链表删除元素操作)
  • else语句说明update[i]大于当前层高,因此删除结点需要更新当前层span值,减1
  • 最后是调整被删除元素下一个元素的backward指针

    / Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank /
    void zslDeleteNode(zskiplist zsl, zskiplistNode x, zskiplistNode **update) {

    1. int i;
    2. for (i = 0; i < zsl->level; i++) {
    3. if (update[i]->level[i].forward == x) {
    4. update[i]->level[i].span += x->level[i].span - 1;
    5. update[i]->level[i].forward = x->level[i].forward;
    6. } else {
    7. update[i]->level[i].span -= 1;
    8. }
    9. }
    10. if (x->level[0].forward) {
    11. x->level[0].forward->backward = x->backward;
    12. } else {
    13. zsl->tail = x->backward;
    14. }
    15. while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
    16. zsl->level--;
    17. zsl->length--;

    }


总结

跳跃表是有序集合的实现方式之一,由zskiplist,zskiplistNode两个构成,zskiplist可以看作是一种管理跳表的数据结构,而zskiplistNode为跳跃表结点的数据结构。其更多的API源代码实现,可以参考t_zset.c源文件

发表评论

表情:
评论列表 (有 0 条评论,204人围观)

还没有评论,来说两句吧...

相关阅读

    相关 redis跳跃详解

    前言 跳跃表是一种有序的数据结构,他通过在每个节点中维护多个指向其它节点的指针,从而达到快速访问节点的目的。跳跃表的查找操作平均时间复杂度为o(logN)。在大部分情况下

    相关 redis学习之跳跃

    跳跃表 跳跃表对于我来说是一个比较陌生的数据结构,因此花了一上午的时间先看了一蛤MIT的公开课。[网易云课堂——MIT跳跃表][MIT] 什么是跳跃表,有一个很简单的例