从Redis源码上来聊聊KV模型

3445 研发动态 | 2026-01-28 12:18:31

之前就说了要来西索Redis,现在来辣!本文的部分内容参考自《小林Coding》,部分地方根据源代码进行剖析。Redis源码地址:https://github.com/redis/redis.git阅读本文之前建议先阅读我的上一篇文章:神奇,Redis存储原理竟然是这样! – Karos (wzl.fyi)

Hash

建议先阅读:神奇,Redis存储原理竟然是这样! – Karos (wzl.fyi),或者本页面的第一章

观其面

哈希结构大家也不陌生了,kv嘛,redis不就是kv模型嘛,那么Redis里面hash又是啥?

这里给大家说一下,就是套娃,用伪代码表示就是这样

HashMap>

当然,实际的类型是泛型。

和String相比具体的区别就是这样

如果你用mysql数据库来表示的话

hash其实是field,value

然后每个hash对应一个表

如果放在Redis的话就是一个key里面放了很多个field,value

可以这样理解:

key field value

其实神简单,说多了还容易绕

应用场景

购物车实现

key:uid

field:商品id

value: 数量

添加商品:HSET cart:{用户id} {商品id} 1

添加数量:HINCRBY cart:{用户id} {商品id} 1

商品总数:HLEN cart:{用户id}

删除商品:HDEL cart:{用户id} {商品id}

获取购物车所有商品:HGETALL cart:{用户id}

获取商品信息还得进数据库回表一次,拿到完整的商品信息

分布式锁实现

Redis分布式锁深入分析 – Karos (wzl1.top)

之前我讲过,这里再简单说一下把

key为锁名,field是线程ID(加上服务的UUID),然后value是进入锁的次数

看一个加锁的Lua脚本吧,为什么用Lua脚本?我们后面细说。

-- 如果Lock存在

if redis.call("exists",KEYS[1]) ~= 0 then

-- 如果不是自己的锁

if redis.call("exists",KEYS[1],ARGS[1]) == 0 then

-- 不是自己的锁,返回剩余时间

return redis.call("pttl",KEYS[1]);

end

-- 如果是自己的锁就记录次数

redis.call("hincrby",KEYS[1],ARGS[1],1);

-- 延期

redis.call("pexpire",KEYS[1],ARGS[2]);

else

-- 如果Lock不存在,那么就直接加上就可以了,hhh

redis.call("hset",KEYS[1],ARGS[1],1);

-- 设置默认延期

redis.call("pexpire",KEYS[1],ARGS[2]);

end

return nil;

究其身

当数据量少的时候使用压缩链表(早期,Redis7.0以后完成的对ZipList的全部替代,都用LIstPack来实现了)

如果哈希类型元素个数小于 512 个(默认值,可由 hash-max-ziplist-entries和hash-max-listpack-entries 配置),所有值小于 64 字节(默认值,可由 hash-max-ziplist-value 配置)的话,Redis 会使用压缩列表/ListPack作为 Hash 类型的底层数据结构;

如果哈希类型元素不满足上面条件,Redis 会使用哈希表作为 Hash 类型的 底层数据结构。

哈希结构客户端哈希设计(基于Redis 7.0解释)

在之前我们讲过dict,dict是用于存储key和value的,但是这里我们客户端hash是将dictEntry的集合放入value,显然不能够用原来服务器的value来存储一个新的dict,你不知道客户端输入的是什么属性

呃呃呃,这上面几句话说的可能有点绕,看代码吧。

其实Redis的开发者确实考虑到了这一点,所以做了个区分,两个虽然都叫做dict,但是所属dir不一样

// /deps/hiredis/dict.h 客户端源码

typedef struct dict {

dictEntry **table;

dictType *type;

// 哈希表大小,桶的数量

unsigned long size;

// 哈希表大小掩码,用于计算索引值

unsigned long sizemask;

// 已有哈希节点数量

unsigned long used;

void *privdata;

} dict;

typedef struct dictEntry {

void *key;

void *val;

struct dictEntry *next;

} dictEntry;

typedef struct dictType {

unsigned int (*hashFunction)(const void *key);

void *(*keyDup)(void *privdata, const void *key);

void *(*valDup)(void *privdata, const void *obj);

int (*keyCompare)(void *privdata, const void *key1, const void *key2);

void (*keyDestructor)(void *privdata, void *key); // 析构

void (*valDestructor)(void *privdata, void *obj); // 析构

} dictType;

先说一下新增元素,RedisHash采用的是头插法

/* Add an element to the target hash table */

static int dictAdd(dict *ht, void *key, void *val) {

int index;

dictEntry *entry;

/* Get the index of the new element, or -1 if

* the element already exists. */

if ((index = _dictKeyIndex(ht, key)) == -1)

return DICT_ERR;

/* Allocates the memory and stores key */

entry = hi_malloc(sizeof(*entry));

if (entry == NULL)

return DICT_ERR;

// 执行头茬

entry->next = ht->table[index];

ht->table[index] = entry;

/* Set the hash entry fields. */

dictSetHashKey(ht, entry, key);

dictSetHashVal(ht, entry, val);

ht->used++;

return DICT_OK;

}

对于Hash结构来说,有个扩容操作,同时会伴随rehash,我们来看看

/* Expand or create the hashtable */

static int dictExpand(dict *ht, unsigned long size) {

dict n; /* the new hashtable */

unsigned long realsize = _dictNextPower(size), i; // 通过该方法获取真实内存,扩大为比size大的第一个2^n

/* the size is invalid if it is smaller than the number of

* elements already inside the hashtable */

if (ht->used > size) // size不合法,如果哈希总节点数大于了size

return DICT_ERR;

// 对新的hash进行初始化

_dictInit(&n, ht->type, ht->privdata);

n.size = realsize;

n.sizemask = realsize-1;

n.table = hi_calloc(realsize,sizeof(dictEntry*));

if (n.table == NULL)

return DICT_ERR;

/* Copy all the elements from the old to the new table: 把旧数据复制到新的哈希表中

* note that if the old hash table is empty ht->size is zero,

* so dictExpand just creates an hash table. */

n.used = ht->used;

for (i = 0; i < ht->size && ht->used > 0; i++) {

dictEntry *he, *nextHe;

if (ht->table[i] == NULL) continue;

/* For each hash entry on this slot... */

he = ht->table[i];

while(he) {

unsigned int h;

nextHe = he->next; // 先获取下一次循环需要的entry

/* Get the new element index */ // 通过计算哈希值获取新的下标(索引计算),其实相当于对长度取余

h = dictHashKey(ht, he->key) & n.sizemask;

// 头插

he->next = n.table[h];

n.table[h] = he;

ht->used--; // ht被使用的节点减少

/* Pass to the next element */

he = nextHe; // 移步到下一个

}

}

assert(ht->used == 0);

hi_free(ht->table);

/* Remap the new hashtable in the old */

*ht = n;

return DICT_OK;

}

为什么扩容是2^n

其实这里哈希表扩容和Java中HashMap扩容原理一样,我们来回忆一下

HashMap是如何扩容的

HashMap扩容依靠的是resize()方法,根据负载因子来计算临界值,达到临界值进行扩容。

而resize()方法也是通过一个newTable进行操作的

Rehash如何确定新元素的位置

​ 通过将hashcode()%length,当leng为$2^n$时,刚好等于hashcode()&(length-1)

但是对于Redis对象存储来说,具体的rehash的方法是拉链法,和JavaSE HashMap类似,同样使用,这样做只是为了加快计算(直接对内存庶几乎进行操作)

Redis服务端存储原理-Hash解析

ReHash

在之前,其实讲到了,Redis的存储原理,其中使用的dict中,直接便是dictEntry

dictEntry **ht_table[2]; // 第二个表用来进行rehash

/**

* dictEntry **ht_table[2]; <==>

* typedef struct dictht{

* dictEntry **table;

* }dictht;

* ditcht ht[2];

**/

之前我们说到第二个ht_table用来做rehash,是这样的,原理其实和上面也类似。

不过为了内存切换方便,每次rehash的时候,只需要将ht_table[0]和ht_table[1]进行交换即可,扩容完毕后直接使用,不用再换回去(类比JVM中幸存区From和幸存区To的交换)。

那么为什么服务端存储kv的哈希靠复制交换,而客户端确是直接新建一个dict N呢?

其实原因可想而知。

服务器的ht数量是固定的,而hash的数据是由客户端操作,数量不定,如果采取复制的话浪费空间,要扩容的时候new一个就行,这一个其实不太重要,重要的在后面

最重要的其实时,服务端一定会保证长连接状态,当数据量很大的时候,我们不可能花较长时间来进行rehash,所以我们采用两个哈希表进行渐进式rehash

渐进式ReHash

为了避免 rehash 在数据迁移过程中,因拷贝数据的耗时,影响 Redis 性能的情况,所以 Redis 采用了渐进式 rehash,也就是将数据的迁移的工作不再是一次性迁移完成,而是分多次迁移。

渐进式 rehash 步骤如下:

给「哈希表 2」 分配空间;

在 rehash 进行期间,每次哈希表元素进行新增、删除、查找或者更新操作时,Redis 除了会执行对应的操作之外,还会顺序将「哈希表 1 」中索引位置上的所有 key-value 迁移到「哈希表 2」 上;

随着处理客户端发起的哈希表操作请求数量越多,最终在某个时间点会把「哈希表 1 」的所有 key-value 迁移到「哈希表 2」,从而完成 rehash 操作。

这样就巧妙地把一次性大量数据迁移工作的开销,分摊到了多次处理请求的过程中,避免了一次性 rehash 的耗时操作。

在进行渐进式 rehash 的过程中,会有两个哈希表,所以在渐进式 rehash 进行期间,哈希表元素的删除、查找、更新等操作都会在这两个哈希表进行。

比如,查找一个 key 的值的话,先会在「哈希表 1」 里面进行查找,如果没找到,就会继续到哈希表 2 里面进行找到。

另外,在渐进式 rehash 进行期间,新增一个 key-value 时,会被保存到「哈希表 2 」里面,而「哈希表 1」 则不再进行任何添加操作,这样保证了「哈希表 1 」的 key-value 数量只会减少,随着 rehash 操作的完成,最终「哈希表 1 」就会变成空表。

ReHash触发条件

介绍了 rehash 那么多,还没说什么时情况下会触发 rehash 操作呢?

rehash 的触发条件跟负载因子(load factor)有关系。

负载因子可以通过下面这个公式计算:

触发 rehash 操作的条件,主要有两个:

当负载因子大于等于 1 ,并且 Redis 没有在执行 bgsave 命令或者 bgrewiteaof 命令,也就是没有执行 RDB 快照或没有进行 AOF 重写的时候,就会进行 rehash 操作。

当负载因子大于等于 5 时,此时说明哈希冲突非常严重了,不管有没有有在执行 RDB 快照或 AOF 重写,都会强制进行 rehash 操作。

服务端哈希扩容

在讲扩容源码之前,我们先来回顾之前讲的内容

// dictEntry的实现很简单,就是个链表

struct dictEntry {

void *key;

union {

void *val; // 其他

uint64_t u64; // 整数

int64_t s64; // 整数

double d; // 双精度数值

} v;

struct dictEntry *next; /* Next entry in the same hash bucket. 指向下一个hash值相同的entry*/

void *metadata[]; /* An arbitrary number of bytes (starting at a

* pointer-aligned address) of size as returned

* by dictType's dictEntryMetadataBytes(). */

};

struct dict {

dictType *type;

dictEntry **ht_table[2]; // 第二个表用来进行rehash

/**

* dictEntry **ht_table[2]; <==>

* typedef struct dictht{

* dictEntry **table;

* }dictht;

* ditcht ht[2];

**/

unsigned long ht_used[2];

// 用于保证rehash安全,如果值为-1,那么不能执行rehash

long rehashidx;

// redis6之后新加入,如果值>0则暂停rehash,保证rehash安全

/* Keep small vars at end for optimal (minimal) struct padding */

int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */

// 哈希的内存指数

signed char ht_size_exp[2];

// 由dictType的dictEntryBytes定义的大小的任意字节数(从指针对齐的地址开始)。

void *metadata[];

};

说到这里,我们来看看扩容的源码,先来个宏定义

#define dictIsRehashing(d) ((d)->rehashidx != -1)

再来一个获取新内存大小的源码

static signed char _dictNextExp(unsigned long size)

{

unsigned char e = DICT_HT_INITIAL_EXP;

if (size >= LONG_MAX) return (8*sizeof(long)-1);

// 这里采用的是用移位来计算,保证效率

while(1) {

if (((unsigned long)1<= size)

return e;

e++;

}

}

一个判断是否运行扩容的"私有"函数

/* ------------------------- private functions ------------------------------ */

/* Because we may need to allocate huge memory chunk at once when dict

* expands, we will check this allocation is allowed or not if the dict

* type has expandAllowed member function. */

static int dictTypeExpandAllowed(dict *d) {

// 不允许返回1

if (d->type->expandAllowed == NULL) return 1;

// 这里的计算我们后面讲

return d->type->expandAllowed(

DICTHT_SIZE(_dictNextExp(d->ht_used[0] + 1)) * sizeof(dictEntry*),

(double)d->ht_used[0] / DICTHT_SIZE(d->ht_size_exp[0]));

}

下面是扩容源码

/* Expand the hash table if needed */

static int _dictExpandIfNeeded(dict *d)

{

/* Incremental rehashing already in progress. Return. */

// 如果正在rehash,那么直接返回Ok

if (dictIsRehashing(d)) return DICT_OK;

/* If the hash table is empty expand it to the initial size. */

// 如果哈希表是空的,那么就创建

if (DICTHT_SIZE(d->ht_size_exp[0]) == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

/* If we reached the 1:1 ratio, and we are allowed to resize the hash

* table (global setting) or we should avoid it but the ratio between

* elements/buckets is over the "safe" threshold, we resize doubling

* the number of buckets. */

// 如果不被允许扩展就返回ok

if (!dictTypeExpandAllowed(d))

return DICT_OK;

if ((dict_can_resize == DICT_RESIZE_ENABLE &&

d->ht_used[0] >= DICTHT_SIZE(d->ht_size_exp[0])) ||

(dict_can_resize != DICT_RESIZE_FORBID &&

d->ht_used[0] / DICTHT_SIZE(d->ht_size_exp[0]) > dict_force_resize_ratio))

{

return dictExpand(d, d->ht_used[0] + 1); // +1是为了避免和当前分配相同的内存

}

// 分配成功

return DICT_OK;

}

/* Expand or create the hash table,

* when malloc_failed is non-NULL, it'll avoid panic if malloc fails (in which case it'll be set to 1).

* Returns DICT_OK if expand was performed, and DICT_ERR if skipped. */

int _dictExpand(dict *d, unsigned long size, int* malloc_failed)

{

if (malloc_failed) *malloc_failed = 0;

/* the size is invalid if it is smaller than the number of

* elements already inside the hash table */

if (dictIsRehashing(d) || d->ht_used[0] > size) // 判断是否能够哈希,并且传入的size合法

return DICT_ERR;

/* the new hash table */

dictEntry **new_ht_table; // 创建一个新的哈希表

unsigned long new_ht_used;

signed char new_ht_size_exp = _dictNextExp(size); // 计算log(新哈希表的长度)

/* Detect overflows */

size_t newsize = 1ul<

if (newsize < size || newsize * sizeof(dictEntry*) < newsize) // 如果比原来的还小

return DICT_ERR;

/* Rehashing to the same table size is not useful. */

if (new_ht_size_exp == d->ht_size_exp[0]) return DICT_ERR; // 如果和原来相同,没用

/* Allocate the new hash table and initialize all pointers to NULL */ // 分配内存,并且memset all为NULL

if (malloc_failed) {

new_ht_table = ztrycalloc(newsize*sizeof(dictEntry*));

*malloc_failed = new_ht_table == NULL; // 判断是否分配成功

if (*malloc_failed)

return DICT_ERR;

} else

new_ht_table = zcalloc(newsize*sizeof(dictEntry*));

new_ht_used = 0; // 新的表还没有使用过

/* Is this the first initialization? If so it's not really a rehashing

* we just set the first hash table so that it can accept keys. */

if (d->ht_table[0] == NULL) {

// 如果是第一次次创建,使用第一张表为哈希表

d->ht_size_exp[0] = new_ht_size_exp;

d->ht_used[0] = new_ht_used;

d->ht_table[0] = new_ht_table;

return DICT_OK;

}

/* Prepare a second hash table for incremental rehashing */

// 使用第二张表进行rehash

d->ht_size_exp[1] = new_ht_size_exp;

d->ht_used[1] = new_ht_used;

d->ht_table[1] = new_ht_table;

// 表示当前表正在进行哈希中,后续不可以进行rehash,具体的判断在前面的宏定义内

d->rehashidx = 0;

return DICT_OK;

}