摘要:函數(shù)考慮了一些與客戶端交互的內(nèi)容,學(xué)的時候沒必要細(xì)看,它主要分為以下兩步調(diào)用找到排名的節(jié)點從這個節(jié)點開始遍歷個節(jié)點下面是的代碼從高層向底層累加直到累加的值等于范圍查找功能給定一個的范圍,從中找出滿足該范圍的所有節(jié)點。
跳躍表是Redis zset的底層實現(xiàn)之一,zset在member較多時會采用跳躍表作為底層實現(xiàn),它在添加、刪除、查找節(jié)點上都擁有與紅黑樹相當(dāng)?shù)男阅?,它其實說白了就是一種特殊的鏈表,鏈表的每個節(jié)點存了不同的“層”信息,用這種分層存節(jié)點的方式在查找節(jié)點時能跳過些節(jié)點,從而使添加、刪除、查找操作都擁有了O(logn)的平均時間復(fù)雜度。
下面簡單介紹一下跳躍表:
跳躍表最低層(第一層)是一個擁有跳躍表所有節(jié)點的普通鏈表,每次在往跳躍表插入鏈表節(jié)點時一定會插入到這個最低層,至于是否插入到上層 就由拋硬幣決定(這么說不是很準(zhǔn)確,redis里這個概率是1/4而非1/2,為了表述方便先這么說),什么意思呢?假設(shè)已經(jīng)有一個跳躍表,其高度只有一層:
往表中插入節(jié)點“7”時,假設(shè)插入7時拋硬幣的結(jié)果是正,則在第二層中插入“7”節(jié)點,繼續(xù)拋一次看看還能不能上到第三層,為反則停止插入,上層不再插入“7”節(jié)點了:
同理插入“4”節(jié)點假設(shè)連續(xù)拋兩次都拋了正面,第三次拋了反面,則“4”節(jié)點會插入到2、3層:
這就是跳躍表最基本的樣子。
查找一個節(jié)點時,我們只需從高層到低層,一個個鏈表查找,每次找到該層鏈表中小于等于目標(biāo)節(jié)點的最大節(jié)點,直到找到為止。由于高層的鏈表迭代時會“跳過”低層的部分節(jié)點,所以跳躍表會比正常的鏈表查找少查部分節(jié)點,這也是skiplist名字的由來。
假如我們需要查找節(jié)點“5”:
先遍歷最高層,發(fā)現(xiàn)第三層頭結(jié)點的下一個節(jié)點是“4”,4<5,所以游標(biāo)定位到“4”節(jié)點,但是“4”節(jié)點的下一個節(jié)點是空,得繼續(xù)往低層走;第二層也從“4”節(jié)點開始,“4”節(jié)點在第二層的下一個節(jié)點是“7”,7>5,公交車做過頭了,回來依舊定位在“4”節(jié)點;繼續(xù)往低層走,第一層“4”節(jié)點的下一個節(jié)點是“5”,這就找到了。
事實上插入前也需要進行跳躍表查找操作,上文演示的插入流程只是直接用了鏈表而省略了這一步。
跳躍表有了個大概的了解,接下來我們細(xì)說redis里的skiplist
redis的skiplist有兩個主要的數(shù)據(jù)結(jié)構(gòu),
zskiplistNode:skiplist的節(jié)點
zskiplist:用來記錄一個skiplist的長度、高度等信息
zskiplistNode的定義typedef struct zskiplistNode { robj *obj;//zset的member double score;//zset的score struct zskiplistNode *backward;//指向上一個節(jié)點,用于zrevrange命令 struct zskiplistLevel { struct zskiplistNode *forward;//指向下一個節(jié)點 unsigned int span;//到達(dá)后一個節(jié)點的跨度(兩個相鄰節(jié)點span為1) } level[];//該節(jié)點在各層的信息,柔性數(shù)組成員 } zskiplistNode;
backward變量是特意為zrevrange*系列命令準(zhǔn)備的,目的是為了使跳躍表實現(xiàn)反向遍歷,普通跳躍表的實現(xiàn)里是非必要的。
level變量記錄了此節(jié)點各層的信息:
level[x]->forward表示該節(jié)點在第x層的下一個節(jié)點。跳躍表節(jié)點都會出現(xiàn)在最低層的鏈表里,所以都會有l(wèi)evel[0],通過level[0]->forward實現(xiàn)跳躍表正向遍歷,zrange*系列命令就是以此實現(xiàn)的。
level[x]->span表示該節(jié)點到第x層的下一個節(jié)點跳躍了多少節(jié)點。span主要是為了zrank、zrevrank服務(wù),普通跳躍表的實現(xiàn)里是非必要的。
zskiplist的定義typedef struct zskiplist { struct zskiplistNode *header, *tail;//跳躍表頭尾節(jié)點 unsigned long length;//節(jié)點個數(shù) int level;//除頭結(jié)點外最大的層數(shù) } zskiplist;
zskiplist的頭結(jié)點不是一個有效的節(jié)點,它有ZSKIPLIST_MAXLEVEL層(32層),每層的forward指向該層跳躍表的第一個節(jié)點,若沒有則為null。
btw:跳躍表節(jié)點的層數(shù)限制在了32,若想超過32層得連續(xù)32次拋硬幣都得到正面,這得有足夠多的節(jié)點,redis限定了拋硬幣正面的概率為1/4,所以到達(dá)32層的概率為(1/2)^64,一般一臺64位的計算機能擁有的最大內(nèi)存也無法存儲這么多zskiplistNode,所以對于基本使用 32層的上限已經(jīng)足夠高了,再高也沒必要 浪費頭節(jié)點的內(nèi)存。
最終zskiplist內(nèi)存布局如下:
也可以去掉不必要的部分(backward、obj),作出如下抽象后的圖:
下文我們都通過簡化圖來分析跳躍表的基本操作流程。
zskiplist相關(guān)接口 創(chuàng)建一個zskiplist 時間復(fù)雜度O(1)創(chuàng)建一個zskiplist就是創(chuàng)建一個高度為ZSKIPLIST_MAXLEVEL(32)的頭結(jié)點,score為0,member為null。
代碼如下:
zskiplist *zslCreate(void) { int j; zskiplist *zsl; zsl = zmalloc(sizeof(*zsl)); zsl->level = 1; zsl->length = 0; zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);//畫圖,32level的頭結(jié)點 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) { //頭結(jié)點每個level的下一個節(jié)點都初始化為null,跨度為0 zsl->header->level[j].forward = NULL; zsl->header->level[j].span = 0; } zsl->header->backward = NULL; zsl->tail = NULL; return zsl; } //為指定高度的節(jié)點分配空間并賦值,insert操作也要用到 zskiplistNode *zslCreateNode(int level, double score, robj *obj) { zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));//柔性數(shù)組成員 zn->score = score; zn->obj = obj; return zn; }
執(zhí)行完zslCreate后會得到如下布局:
zskiplist插入一個節(jié)點 時間復(fù)雜度O(logn)插入一個節(jié)點時需要做以下工作
要找到新節(jié)點的插入位置,只需像上文介紹的那樣,從高層向低層找即可。在找的過程中用update[]數(shù)組記錄每一層插入位置的上一個節(jié)點,用rank[]數(shù)組記錄每一層插入位置的上一個節(jié)點在跳躍表中的排名。根據(jù)update[]插入新節(jié)點,插入完成后再根據(jù)rank[]更新跨度信息即可。
ps:redis允許節(jié)點有重復(fù)的score,當(dāng)score相同時根據(jù)member(代碼里的obj指向的字符串)的字典序來排名。
/* Returns a random level for the new skiplist node we are going to create. * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL * (both inclusive), with a powerlaw-alike distribution where higher * levels are less likely to be returned. */ int zslRandomLevel(void) {//返回一個隨機的層數(shù),不是level的索引是層數(shù) int level = 1; while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))//有1/4的概率加入到上一層中 level += 1; return (levelheader;//x用于迭代zskiplistNode for (i = zsl->level-1; i >= 0; i--) {//從最高層向最底層查詢,找到合適的插入位置 /* store rank that is crossed to reach the insert position */ rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];//記錄每一層插入節(jié)點的上一個節(jié)點的排名 while (x->level[i].forward &&//當(dāng)前層的下一個節(jié)點存在 (x->level[i].forward->score < score ||//下一個節(jié)點的分?jǐn)?shù)小于需要插入的分?jǐn)?shù) (x->level[i].forward->score == score &&//score相同的情況下,根據(jù)member字符串的大小來比較(二進制安全的memcmp) compareStringObjects(x->level[i].forward->obj,obj) < 0))) { rank[i] += x->level[i].span;//每層的跨度 x = x->level[i].forward;//下一個節(jié)點 } update[i] = x;//當(dāng)前層的最后一個小于score的節(jié)點 } /* we assume the key is not already inside, since we allow duplicated * scores, and the re-insertion of score and redis object should never * happen since the caller of zslInsert() should test in the hash table * if the element is already inside or not. */ level = zslRandomLevel(); if (level > zsl->level) {//大于之前跳躍表的高度所以沒有記錄update[i],因為插入的節(jié)點有這么高所以要修改這些頭結(jié)點的信息 for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; update[i]->level[i].span = zsl->length;//高出部分的頭結(jié)點在還沒插入當(dāng)前節(jié)點時跨度應(yīng)該是整張表,插入之后會重新更新這個值 } zsl->level = level; } x = zslCreateNode(level,score,obj); for (i = 0; i < level; i++) { x->level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x; /* update span covered by update[i] as x is inserted here */ //rank[0]是x在第0層的上一個節(jié)點的實際排名,rank[i]是x在第i層的上一個節(jié)點的實際排名,它們倆的差值為x在第i層的上一個節(jié)點與x之間的距離 x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); update[i]->level[i].span = (rank[0] - rank[i]) + 1; } /* increment span for untouched levels */ for (i = level; i < zsl->level; i++) { update[i]->level[i].span++; } x->backward = (update[0] == zsl->header) ? NULL : update[0]; if (x->level[0].forward) x->level[0].forward->backward = x; else zsl->tail = x;//尾節(jié)點 zsl->length++; return x; }
假如往圖2插入一個score為7的節(jié)點,則會按照下圖方式所示進行:
找到插入位置(藍(lán)色的線表示查找路徑):
假設(shè)層數(shù)為3,將節(jié)點7插入進skiplist并修改附近節(jié)點的相關(guān)屬性:
zskiplist里的刪除操作void zslFreeNode(zskiplistNode *node) { decrRefCount(node->obj);//member的引用計數(shù)-1,防止內(nèi)存泄漏 zfree(node); }
void zslFree(zskiplist *zsl) { //任何一個節(jié)點一定有l(wèi)evel[0],所以迭代level[0]來刪除所有節(jié)點 zskiplistNode *node = zsl->header->level[0].forward, *next; zfree(zsl->header); while(node) { next = node->level[0].forward; zslFreeNode(node); node = next; } zfree(zsl); }
主要分為以下3個步驟:
根據(jù)member(obj)和score找到節(jié)點的位置(代碼里變量x即為該節(jié)點,update記錄每層x的上一個節(jié)點)
調(diào)動zslDeleteNode把x節(jié)點從skiplist邏輯上刪除
釋放x節(jié)點內(nèi)存。
/* Delete an element with matching score/object from the skiplist. */ //從skiplist邏輯上刪除一個節(jié)點并釋放該節(jié)點的內(nèi)存 int zslDelete(zskiplist *zsl, double score, robj *obj) { zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; int i; x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && compareStringObjects(x->level[i].forward->obj,obj) < 0))) x = x->level[i].forward; update[i] = x; } /* We may have multiple elements with the same score, what we need * is to find the element with both the right score and object. */ x = x->level[0].forward;//要刪除的節(jié)點 if (x && score == x->score && equalStringObjects(x->obj,obj)) { zslDeleteNode(zsl, x, update); zslFreeNode(x);//obj的引用計數(shù)-1并釋放節(jié)點內(nèi)存 return 1; } return 0; /* not found */ } /* Internal function used by zslDelete, zslDeleteRangeByScore and zslDeleteRangeByRank and zslDeleteRangeByLex*/ //從skiplist邏輯上刪除一個節(jié)點(不釋放內(nèi)存,僅改變節(jié)點位置關(guān)系) //x為要刪除的節(jié)點 //update為每一層x的上一個節(jié)點(為了更新x上一個節(jié)點的forward和span屬性) void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) { int i; for (i = 0; i < zsl->level; i++) { if (update[i]->level[i].forward == x) {//當(dāng)前層有x節(jié)點 update[i]->level[i].span += x->level[i].span - 1;//...=x->level[i].span; update[i]->level[i].forward = x->level[i].forward;//跨過x節(jié)點 } else {//當(dāng)前層沒有x節(jié)點 update[i]->level[i].span -= 1; } } if (x->level[0].forward) {//是否是tail節(jié)點 x->level[0].forward->backward = x->backward; } else { zsl->tail = x->backward; } while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)//刪除了最高層數(shù)的節(jié)點 zsl->level--; zsl->length--; }
zslDeleteRangeByScore //刪除 滿足score范圍 的節(jié)點
zslDeleteRangeByRank //刪除 滿足排名范圍 的節(jié)點
zslDeleteRangeByLex //在所有節(jié)點的score相同的skiplist中,刪除 滿足member字典序范圍 的節(jié)點
代碼懶得貼了
范圍查找操作 時間復(fù)雜度O(log(n)+m), m是范圍內(nèi)元素的個數(shù)根據(jù)查找范圍的類型 zskiplist查找可以分為三類:
rank范圍查找
score范圍查找
member范圍查找
功能:給定一個zero-based排名范圍(start,end),從zskiplist中找出滿足該范圍的所有節(jié)點。
zrangeGenericCommand函數(shù)考慮了一些與客戶端交互的內(nèi)容,學(xué)zskiplist的時候沒必要細(xì)看,它主要分為以下兩步:
1.調(diào)用zslGetElementByRank找到排名start+1的節(jié)點·······O(logn)
2.從這個節(jié)點開始遍歷(end-start+1)個節(jié)點·······O(m)
下面是zslGetElementByRank的代碼:
/* Finds an element by its rank. The rank argument needs to be 1-based. */ //O(logn) zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) { zskiplistNode *x; unsigned long traversed = 0; int i; x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { //從高層向底層累加span直到累加的值等于rank while (x->level[i].forward && (traversed + x->level[i].span) <= rank) { traversed += x->level[i].span; x = x->level[i].forward; } if (traversed == rank) { return x; } } return NULL; }
功能:給定一個score的范圍,從zskiplist中找出滿足該score范圍的所有節(jié)點。
為了方便的表示score范圍的開閉區(qū)間,redis在server.h里聲明了一個表示zset分?jǐn)?shù)區(qū)間的類型zrangespec,關(guān)于score范圍查找的相關(guān)函數(shù)都會用到它:
/* Struct to hold a inclusive/exclusive range spec by score comparison. */ typedef struct { double min, max; //是否是開閉區(qū)間,1為開,0位閉 int minex, maxex; /* are min or max exclusive? */ } zrangespec;
判斷一個score與zrangespec區(qū)間內(nèi)最小值、最大值的關(guān)系:
//給定value是否>(>=)此范圍的下界 //gte(greater than or equal to) static int zslValueGteMin(double value, zrangespec *spec) { return spec->minex ? (value > spec->min) : (value >= spec->min); } //給定value是否<(<=)此范圍的上界 //lte(less than or equal to) int zslValueLteMax(double value, zrangespec *spec) { return spec->maxex ? (value < spec->max) : (value <= spec->max); }
根據(jù)上述兩個函數(shù),就可以用O(1)時間復(fù)雜度判斷一個zskiplist的score是否與zrangespec分?jǐn)?shù)區(qū)間有交集:
/* Returns if there is a part of the zset is in range. */ //用O(1)的時間復(fù)雜度判斷zset(zsl)的分?jǐn)?shù)范圍是否與給定分?jǐn)?shù)范圍(range)有交集。 // //range(6,9] zsl{1,2,3,4,5} 或zsl{10,12,13} 都是不在范圍內(nèi) // int zslIsInRange(zskiplist *zsl, zrangespec *range) { zskiplistNode *x; /* Test for ranges that will always be empty. */ if (range->min > range->max || (range->min == range->max && (range->minex || range->maxex))) return 0; x = zsl->tail; if (x == NULL || !zslValueGteMin(x->score,range))//尾節(jié)點小于范圍下界 return 0; x = zsl->header->level[0].forward; if (x == NULL || !zslValueLteMax(x->score,range))//頭節(jié)點大于范圍上界 return 0; return 1;//在 }
genericZrangebyscoreCommand函數(shù)也考慮了很多與客戶端交互的內(nèi)容,就學(xué)習(xí)底層跳躍表實現(xiàn)時沒必要細(xì)看,我們只需要知道底層是如何做到的即可,主要執(zhí)行如下步驟:
1.調(diào)用zslParseRange把客戶端傳過來的范圍min、max轉(zhuǎn)換成zrangespec區(qū)間類型 保存在range變量里。
2.調(diào)用zslFirstInRange找到zskiplist中滿足range條件的最小節(jié)點。(假設(shè)是正序的范圍查找)·······O(logn)
3.從這個節(jié)點開始遍歷,直到調(diào)用zslValueLteMax()找到最后一個小于range條件上界的節(jié)點?!ぁぁぁぁぁぁ(m)
放一下核心的代碼zslFirstInRange:
/* Find the first node that is contained in the specified range. * Returns NULL when no element is contained in the range. */ //滿足range條件最小的那個 O(logn) zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) { zskiplistNode *x; int i; /* If everything is out of range, return early. */ if (!zslIsInRange(zsl,range)) return NULL;//保證下面的邏輯一定能找到范圍內(nèi)的節(jié)點 x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { /* Go forward while *OUT* of range. */ while (x->level[i].forward && !zslValueGteMin(x->level[i].forward->score,range)) x = x->level[i].forward; } /* This is an inner range, so the next node cannot be NULL. */ x = x->level[0].forward; serverAssert(x != NULL); /* Check if score <= max. */ if (!zslValueLteMax(x->score,range)) return NULL; return x; }
功能:在一個所有節(jié)點的score都相同的zskiplist中,找到滿足member字符串字典序范圍的所有節(jié)點。
底層用memcmp比較兩個字符串的大小。實現(xiàn)的流程與genericZrangebyscoreCommand很像,有興趣再看。
zslgetrank 根據(jù)member和score獲得節(jié)點在該skiplist中的rank
zslParseRange 把客戶端傳過來的范圍min、max轉(zhuǎn)換成zrangespec區(qū)間類型 返回給range參數(shù)。
...
其實作者Antirez已經(jīng)給出了答復(fù):
https://news.ycombinator.com/item?id=1171423
劃重點:They are not very memory intensive. It"s up to you basically.
既然取決于自己,skiplist實現(xiàn)簡單就選它了。至于可能的好處和壞處大概整理了一下有這些:
缺點:
比紅黑樹占用更多的內(nèi)存,每個節(jié)點的大小取決于該節(jié)點的層數(shù)
空間局部性較差導(dǎo)致緩存命中率低,感覺上會比紅黑樹更慢
優(yōu)點:
實現(xiàn)比紅黑樹簡單
比紅黑樹更容易擴展,作者之后實現(xiàn)zrank指令時沒怎么改動代碼。
紅黑樹插入刪除時為了平衡高度需要旋轉(zhuǎn)附近節(jié)點,高并發(fā)時需要鎖。skiplist不需要考慮。
一般用zset的操作都是執(zhí)行zrange之類的操作,取出一片連續(xù)的節(jié)點。這些操作的緩存命中率不會比紅黑樹低。
參考資料Skip Lists: A Probabilistic Alternative to Balanced Trees
Skip Lists: Done Right
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/28308.html
摘要:用不用作為分界是因為是的值,它用于判斷是否到達(dá)尾部。這種類型的節(jié)點的大小介于與之間,但是為了表示整數(shù),取出低四位之后會將其作為實際的值。當(dāng)小于字節(jié)時,節(jié)點存為上圖的第二種類型,高位為,后續(xù)位表示的長度。 // 文中引用的代碼來源于Redis3.2 前言 Redis是基于內(nèi)存的nosql,有些場景下為了節(jié)省內(nèi)存redis會用時間換空間。ziplist就是很典型的例子。 介紹 ziplis...
閱讀 2726·2021-10-08 10:04
閱讀 2765·2021-09-06 15:02
閱讀 884·2019-08-30 13:50
閱讀 1587·2019-08-30 13:21
閱讀 2615·2019-08-30 11:15
閱讀 2156·2019-08-29 17:19
閱讀 1620·2019-08-26 13:55
閱讀 1286·2019-08-26 10:15