Redis学习笔记
本文分析的redis版本为6.0。
安装配置
windows安装
从github上下载windows版本。
之后解压就好了。
数据类型
字符串
Redis用动态字符串(SDS,Simple dynamic string)的类型来保存字符串。
定义在sds.h
中的sdshdr
中。源码在sds.h
和sds.c
中。
比较关键的是,当要求分配后的空间为$n$,那么实际分配的空间为$\min(2n,n+2^{20})$,多出来的部分称为预留空间。源码:
sds sdsMakeRoomFor(sds s, size_t addlen) {
...
newlen = (len+addlen);
//小于1M就预留相同空间
if (newlen < SDS_MAX_PREALLOC)
newlen *= 2;
else
//>=1M,则只预留1M
newlen += SDS_MAX_PREALLOC;
...
}
链表
定义在adlist.h
和adlist.c
文件中。
字典
字典的实现为哈希表,通过链表解决冲突。其实现在dict.h
和dict.c
文件中。
类似于java中的实现,字典的bucket数目始终为2的幂,这样就可以用且运算来确定哈希值对应的槽了。
//从4开始增加,直到找到2^c,满足2^{c-1}<size 且 2^c>=size
/* Our hash table capability is a power of two */
static unsigned long _dictNextPower(unsigned long size)
{
unsigned long i = DICT_HT_INITIAL_SIZE;
if (size >= LONG_MAX) return LONG_MAX + 1LU;
while(1) {
if (i >= size)
return i;
i *= 2;
}
}
记$\alpha$为哈希表中元素数目与槽数的比例。扩张比较复杂,一般情况下只需要$\alpha\geq 1$就会发生,但是如果正在执行BGSAVE或BGREWRITEAOF等持久化命令,则只有在$\alpha\geq 6$的时候才会发生。这是因为redis在做持久化时会开启一个子进程,通过共享内存来做落盘操作,操作系统对于进程间共享的内存会采用写时复制的技术,因此这期间redis会通过提高扩张的阈值来避免扩张带来的写入操作,从而节约内存。同理当$\alpha<0.1$就会减少bucket,释放内存,但是如果在执行BGSAVE或BGREWRITEAOF等持久化命令,就不会执行缩小操作。
//是否支持缩放,如果元素数/bucket数>dict_force_resize_ratio,则会强制发生哈希,即使不支持缩放
/* Using dictEnableResize() / dictDisableResize() we make possible to
* enable/disable resizing of the hash table as needed. This is very important
* for Redis, as we use copy-on-write and don't want to move too much memory
* around when there is a child performing saving operations.
*
* Note that even when dict_can_resize is set to 0, not all resizes are
* prevented: a hash table is still allowed to grow if the ratio between
* the number of elements and the buckets > dict_force_resize_ratio. */
static int dict_can_resize = 1;
static unsigned int dict_force_resize_ratio = 5;
...
//缩小哈希表
/* Resize the table to the minimal size that contains all the elements,
* but with the invariant of a USED/BUCKETS ratio near to <= 1 */
int dictResize(dict *d)
{
unsigned long minimal;
//只有在允许缩放以及没有处于哈希中才能进行缩小操作
if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR;
minimal = d->ht[0].used;
if (minimal < DICT_HT_INITIAL_SIZE)
minimal = DICT_HT_INITIAL_SIZE;
return dictExpand(d, minimal);
}
...
//执行一步rehash操作
/* This function performs just a step of rehashing, and only if there are
* no safe iterators bound to our hash table. When we have iterators in the
* middle of a rehashing we can't mess with the two hash tables otherwise
* some element can be missed or duplicated.
*
* This function is called by common lookup or update operations in the
* dictionary so that the hash table automatically migrates from H1 to H2
* while it is actively used. */
static void _dictRehashStep(dict *d) {
//这里需要保证没有安全迭代器存在
if (d->iterators == 0) dictRehash(d,1);
}
...
//按需扩张
/* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d)
{
/* Incremental rehashing already in progress. Return. */
if (dictIsRehashing(d)) return DICT_OK;
//这里做初始化操作
/* If the hash table is empty expand it to the initial size. */
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
/* If we reached the 1:1 ratio, and we are allowed to resize the hash
* table (global setting) or we should avoid it but the ratio between
* elements/buckets is over the "safe" threshold, we resize doubling
* the number of buckets. */
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize ||
//由于是整数除法,实际上这里要求达到6倍才行。。
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
{
//翻一倍大小
return dictExpand(d, d->ht[0].used*2);
}
return DICT_OK;
}
跳表
Redis中用跳表来实现有序集合,以及在集群节点中用作内部数据结构。其源码出现在server.h
和t_zset.c
文件中。
Redis的跳表的特点是需要同时维护一个哈希表和跳表,其中哈希表存储对象到某个分数的映射,同时负责排重工作,而跳表则支持插入删除排名等操作,其元素按照score作为第一关键字,元素作为第二关键字进行排序。
typedef struct zset {
//需要同时使用哈希表和skiplist,前者用于查询判重等,后者用于排名,排序等
//dict的关键字为名称,值为score
dict *dict;
zskiplist *zsl;
} zset;
跳表的元素只能是字符串,为了节省内存,因此哈希表和跳表用的是相同的引用,而释放元素由跳表负责(因此每次删除需要先从哈希表中删除,再删除跳表,否则会存在坏键)。
同时使用哈希表和跳表的主要原因是为了提高性能,哈希表能快速找到某个元素的分数。
跳表的最大高度为32,每个元素的高度由投硬币决定。
//上升概率为0.25
#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
/* Returns a random level for the new skiplist node we are going to create.
* The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
* (both inclusive), with a powerlaw-alike distribution where higher
* levels are less likely to be returned. */
int zslRandomLevel(void) {
//投硬币
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, double *newscore) {
/* Turn options into simple to check vars. */
int incr = (in_flags & ZADD_IN_INCR) != 0;
int nx = (in_flags & ZADD_IN_NX) != 0;
int xx = (in_flags & ZADD_IN_XX) != 0;
int gt = (in_flags & ZADD_IN_GT) != 0;
int lt = (in_flags & ZADD_IN_LT) != 0;
*out_flags = 0; /* We'll return our response flags. */
double curscore;
//分数不合理
if (isnan(score)) {
*out_flags = ZADD_OUT_NAN;
return 0;
}
//如果存储格式为ziplist
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *eptr;
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
/* NX? Return, same element already exists. */
if (nx) {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
/* Prepare the score for the increment if needed. */
if (incr) {
score += curscore;
if (isnan(score)) {
*out_flags |= ZADD_OUT_NAN;
return 0;
}
}
/* GT/LT? Only update if score is greater/less than current. */
if ((lt && score >= curscore) || (gt && score <= curscore)) {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
if (newscore) *newscore = score;
/* Remove and re-insert when score changed. */
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
*out_flags |= ZADD_OUT_UPDATED;
}
return 1;
} else if (!xx) {
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
sdslen(ele) > server.zset_max_ziplist_value)
//如果链表变得太长,就转化为ziplist
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
if (newscore) *newscore = score;
*out_flags |= ZADD_OUT_ADDED;
return 1;
} else {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplistNode *znode;
dictEntry *de;
de = dictFind(zs->dict,ele);
if (de != NULL) {
//如果存在重名的对象
if (nx) {
//如果指定了nx选项
*out_flags |= ZADD_OUT_NOP;
return 1;
}
//对应的分数
curscore = *(double*)dictGetVal(de);
if (incr) {
//如果指定了分数要合并
score += curscore;
if (isnan(score)) {
*out_flags |= ZADD_OUT_NAN;
return 0;
}
}
if ((lt && score >= curscore) || (gt && score <= curscore)) {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
if (newscore) *newscore = score;
//如果分数发生了变化,更新分数
if (score != curscore) {
znode = zslUpdateScore(zs->zsl,curscore,ele,score);
dictGetVal(de) = &znode->score;
*out_flags |= ZADD_OUT_UPDATED;
}
return 1;
} else if (!xx) {
//没有指定对象必须已经存在,执行插入操作
ele = sdsdup(ele);
znode = zslInsert(zs->zsl,score,ele);
serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
*out_flags |= ZADD_OUT_ADDED;
if (newscore) *newscore = score;
return 1;
} else {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
return 0; /* Never reached. */
}
具体的插入逻辑:
unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) {
unsigned char *eptr = ziplistIndex(zl,0), *sptr;
double s;
while (eptr != NULL) {
sptr = ziplistNext(zl,eptr);
serverAssert(sptr != NULL);
s = zzlGetScore(sptr);
if (s > score) {
/* First element with score larger than score for element to be
* inserted. This means we should take its spot in the list to
* maintain ordering. */
zl = zzlInsertAt(zl,eptr,ele,score);
break;
} else if (s == score) {
if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) {
zl = zzlInsertAt(zl,eptr,ele,score);
break;
}
}
eptr = ziplistNext(zl,sptr);
}
/* Push on tail of list when it was not yet inserted. */
if (eptr == NULL)
zl = zzlInsertAt(zl,NULL,ele,score);
return zl;
}
跟踪插入代码:
unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) {
unsigned char *eptr = ziplistIndex(zl,0), *sptr;
double s;
while (eptr != NULL) {
sptr = ziplistNext(zl,eptr);
serverAssert(sptr != NULL);
s = zzlGetScore(sptr);
//分数是比较的第一关键字
//而名称是比较的第二关键字
if (s > score) {
zl = zzlInsertAt(zl,eptr,ele,score);
break;
} else if (s == score) {
/* Ensure lexicographical ordering for elements. */
if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) {
zl = zzlInsertAt(zl,eptr,ele,score);
break;
}
}
/* Move to next element. */
eptr = ziplistNext(zl,sptr);
}
/* Push on tail of list when it was not yet inserted. */
if (eptr == NULL)
zl = zzlInsertAt(zl,NULL,ele,score);
return zl;
}
整数集合
如果一个集合只包含整数,并且数量不多,Redis就会用整数集合来存储。其源码出现在intset.h
和intset.c
文件中。
typedef struct intset {
//存储的是几位长度的整数
uint32_t encoding;
uint32_t length;
int8_t contents[];
} intset;
整数集合中的元素以从小到大的方式排序存放在contents
这个柔性数组中,不允许重复元素的出现。
如果放入需要更多字节才能表示的整数,需要对整数集合进行升级,要求重新分配空间。升级后的整数集合并不会因为删除元素而降级。引入升级机制的好处是适用最小的类型来存储整数,这样可以节约内存。
/* Upgrades the intset to a larger encoding and inserts the given integer. */
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
uint8_t curenc = intrev32ifbe(is->encoding);
uint8_t newenc = _intsetValueEncoding(value);
int length = intrev32ifbe(is->length);
//如果是因为过小,就加到前面,否则后面
int prepend = value < 0 ? 1 : 0;
/* First set new encoding and resize */
is->encoding = intrev32ifbe(newenc);
is = intsetResize(is,intrev32ifbe(is->length)+1);
/* Upgrade back-to-front so we don't overwrite values.
* Note that the "prepend" variable is used to make sure we have an empty
* space at either the beginning or the end of the intset. */
//从后往前处理,避免覆盖问题
while(length--)
_intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));
/* Set the value at the beginning or the end. */
//插前面还是后面,这是一个问题
if (prepend)
_intsetSet(is,0,value);
else
_intsetSet(is,intrev32ifbe(is->length),value);
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}
当然是用数组来是不可能有有效支持插入删除操作的,这里它们的时间复杂度是$O(n)$。
/* Insert an integer in the intset */
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
uint8_t valenc = _intsetValueEncoding(value);
uint32_t pos;
//先默认成功
if (success) *success = 1;
/* Upgrade encoding if necessary. If we need to upgrade, we know that
* this value should be either appended (if > 0) or prepended (if < 0),
* because it lies outside the range of existing values. */
if (valenc > intrev32ifbe(is->encoding)) {
//升级的同时解决插入问题
/* This always succeeds, so we don't need to curry *success. */
return intsetUpgradeAndAdd(is,value);
} else {
/* Abort if the value is already present in the set.
* This call will populate "pos" with the right position to insert
* the value when it cannot be found. */
//如果存在就返回失败
if (intsetSearch(is,value,&pos)) {
if (success) *success = 0;
return is;
}
//???,果真暴力
is = intsetResize(is,intrev32ifbe(is->length)+1);
if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
}
_intsetSet(is,pos,value);
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}
/* Delete integer from intset */
intset *intsetRemove(intset *is, int64_t value, int *success) {
uint8_t valenc = _intsetValueEncoding(value);
uint32_t pos;
if (success) *success = 0;
if (valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,&pos)) {
uint32_t len = intrev32ifbe(is->length);
/* We know we can delete */
if (success) *success = 1;
//删除也是暴力
/* Overwrite value with tail and update length */
if (pos < (len-1)) intsetMoveTail(is,pos+1,pos);
//每次都需要重新分配内存??
is = intsetResize(is,len-1);
is->length = intrev32ifbe(len-1);
}
return is;
}
由于是有序存储,因此查找可以通过二分:
/* Search for the position of "value". Return 1 when the value was found and
* sets "pos" to the position of the value within the intset. Return 0 when
* the value is not present in the intset and sets "pos" to the position
* where "value" can be inserted. */
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
int64_t cur = -1;
//剪枝
/* The value can never be found when the set is empty */
if (intrev32ifbe(is->length) == 0) {
if (pos) *pos = 0;
return 0;
} else {
/* Check for the case where we know we cannot find the value,
* but do know the insert position. */
if (value > _intsetGet(is,max)) {
if (pos) *pos = intrev32ifbe(is->length);
return 0;
} else if (value < _intsetGet(is,0)) {
if (pos) *pos = 0;
return 0;
}
}
//感人二分
while(max >= min) {
mid = ((unsigned int)min + (unsigned int)max) >> 1;
cur = _intsetGet(is,mid);
if (value > cur) {
min = mid+1;
} else if (value < cur) {
max = mid-1;
} else {
break;
}
}
if (value == cur) {
if (pos) *pos = mid;
return 1;
} else {
if (pos) *pos = min;
return 0;
}
}
感觉没什么软用啊。。
压缩列表
压缩列表(ziplist)是列表键和哈希键的底层实现之一。当一个列表键只包含少量列表键,且每个列表项要么是小整数,要么是长度较短的字符串,那么Redis就会使用压缩列表来做列表键的底层实现。
压缩列表是以双端链表的方式实现的,因此可以从头部和尾部删除或插入。但是由于每次修改操作都需要将整个压缩链表重新分配,因此时间复杂度总是$O(n)$的。
//每次都需要重新分配内存
/* Resize the ziplist. */
unsigned char *ziplistResize(unsigned char *zl, unsigned int len) {
zl = zrealloc(zl,len);
ZIPLIST_BYTES(zl) = intrev32ifbe(len);
zl[len-1] = ZIP_END;
return zl;
}
ziplist的优点是每个结点都可以有自己独立的压缩模式,因此即使列表中有超大整数,也不会影响较小的数占用非常少的空间。且虽然是双端队列,但是由于是存储了前驱和自身的大小来实现查找前驱后继的功能,因此占用的空间非常小。且由于占用的是连续的空间,因此对缓存友好,且不容易出现内存碎片。
ziplist的中保存的元素的数目是用16位整数存储的,因此当达到最大时,就不会再增长了,之后获取大小必须通过遍历列表实现。
/* Return length of ziplist. */
unsigned int ziplistLen(unsigned char *zl) {
unsigned int len = 0;
if (intrev16ifbe(ZIPLIST_LENGTH(zl)) < UINT16_MAX) {
len = intrev16ifbe(ZIPLIST_LENGTH(zl));
} else {
//长度达到$2^{16}$,需要执行扫描才能获得真正的长度
unsigned char *p = zl+ZIPLIST_HEADER_SIZE;
while (*p != ZIP_END) {
p += zipRawEntryLength(p);
len++;
}
/* Re-store length if small enough */
if (len < UINT16_MAX) ZIPLIST_LENGTH(zl) = intrev16ifbe(len);
}
return len;
}
/* Increment the number of items field in the ziplist header. Note that this
* macro should never overflow the unsigned 16 bit integer, since entries are
* always pushed one at a time. When UINT16_MAX is reached we want the count
* to stay there to signal that a full scan is needed to get the number of
* items inside the ziplist. */
#define ZIPLIST_INCR_LENGTH(zl,incr) { \
if (ZIPLIST_LENGTH(zl) < UINT16_MAX) \
ZIPLIST_LENGTH(zl) = intrev16ifbe(intrev16ifbe(ZIPLIST_LENGTH(zl))+incr); \
}
quicklist
quicklist是redis中list类型的实现方式。
学习了ziplist后,可以发现ziplist的优点是占用空间小,但是缺点是所有操作的时间复杂度都是$O(n)$,因此ziplist一旦达到百万级别,队列操作所花的时间就非常多了。而实现队列较好的选择就是linklist,插入弹出时间复杂度均为$O(1)$,但是对应的内存使用量高。
quicklist结合了linklist和ziplist的优点,我们可以将其理解为对列表的分块,每一块都是一个ziplist,不同的块通过linklist的前驱后继指针的方式串联起来。这样我们就可以较快的实现队列操作,且由于底层的块是由ziplist存储的,因此内存使用量也少。而且由于作为队列时,除了两端的块,中间的块不常被访问,因此可以用压缩算法进行压缩,释放更加多的空间。
可以通过list-max-ziplist-size
来配置块大小,为正数的时候用于配置每一块最多能包含的元素数目(此时每个ziplist中的元素大小都不能超过8K),负数$-x$表示每一块内存使用量少于$2^{x+1}$KB,$1\leq x\leq 5$。
REDIS_STATIC int
_quicklistNodeSizeMeetsOptimizationRequirement(const size_t sz,
const int fill) {
//fill>=0表示限制项目数
if (fill >= 0)
return 0;
//<0表示限定内存占用
size_t offset = (-fill) - 1;
if (offset < (sizeof(optimization_level) / sizeof(*optimization_level))) {
if (sz <= optimization_level[offset]) {
//没有达到上限
return 1;
} else {
return 0;
}
} else {
//使用了未定义的范围
return 0;
}
}
REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node,
const int fill, const size_t sz) {
if (unlikely(!node))
return 0;
int ziplist_overhead;
/* size of previous offset */
if (sz < 254)
ziplist_overhead = 1;
else
ziplist_overhead = 5;
/* size of forward offset */
if (sz < 64)
ziplist_overhead += 1;
else if (likely(sz < 16384))
ziplist_overhead += 2;
else
ziplist_overhead += 5;
/* new_sz overestimates if 'sz' encodes to an integer type */
unsigned int new_sz = node->sz + sz + ziplist_overhead;
//判断新的块大小是否满足占用空间约束
if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))
return 1;
//如果不满足占用空间要求,那么每个元素的最大不能超过8096
else if (!sizeMeetsSafetyLimit(new_sz))
return 0;
//如果fill是非负,那么只要求count不超过fill
else if ((int)node->count < fill)
return 1;
else
return 0;
}
对于压缩,可以通过list-compress-depth
来配置,若值为$x$,则表示除了最前边的$x$个块和最后边的$x$个块外,其余块全部进行压缩。
//尝试进行压缩
/* Force 'quicklist' to meet compression guidelines set by compress depth.
* The only way to guarantee interior nodes get compressed is to iterate
* to our "interior" compress depth then compress the next node we find.
* If compress depth is larger than the entire list, we return immediately. */
REDIS_STATIC void __quicklistCompress(const quicklist *quicklist,
quicklistNode *node) {
/* If length is less than our compress depth (from both sides),
* we can't compress anything. */
//quicklist->compress为0时表示不压缩
if (!quicklistAllowsCompression(quicklist) ||
//剪枝
quicklist->len < (unsigned int)(quicklist->compress * 2))
return;
/* Iterate until we reach compress depth for both sides of the list.a
* Note: because we do length checks at the *top* of this function,
* we can skip explicit null checks below. Everything exists. */
quicklistNode *forward = quicklist->head;
quicklistNode *reverse = quicklist->tail;
int depth = 0;
//指数变量,表示node是否在两端(压缩部分外)
int in_depth = 0;
while (depth++ < quicklist->compress) {
quicklistDecompressNode(forward);
quicklistDecompressNode(reverse);
if (forward == node || reverse == node)
in_depth = 1;
if (forward == reverse)
return;
forward = forward->next;
reverse = reverse->prev;
}
//需要压缩node
if (!in_depth)
quicklistCompressNode(node);
if (depth > 2) {
//顺带压缩了?
/* At this point, forward and reverse are one node beyond depth */
quicklistCompressNode(forward);
quicklistCompressNode(reverse);
}
}
对象
Redis没有用上面提到的数据结构实现键值对数据库,而是基于这些数据结构创建了一个对象系统。这个系统中包含字符串对象、列表对象、哈希对象、集合对象、有序集合对象共五种类型的对象。
Redis的对象使用引用计数的方式实现了垃圾回收,当某个对象不再被使用的时候就会被自动释放。
Redis的对象还带有访问时间,这样服务器就可以将一些过久没有使用的对象进行淘汰。
Redis使用对象来表示键值对,当我们创建一个新的对象的时候,我们会创建一个SDS用来存储对象名,同时创建一个值对象。
Redis的每个对象都由redisObject所表示,下面是server.h
文件中的定义:
#define LRU_BITS 24
typedef struct redisObject {
//值类型
unsigned type:4;
//使用的什么数据结构
unsigned encoding:4;
//LRU时间,低8位存储频率,高16位存储访问时间
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
//引用计数
int refcount;
//指向实际的值
void *ptr;
} robj;
其中type字段表示的是什么类型的对象,可能值可以是字符串类型、列表类型、哈希类型、集合类型、有序集合。而encoding则存储了值对应的数据结构,比如即使是列表类型,其也可能是linklist或quicklist。
字符串类型
encoding中字符串类型可以分为int,raw,embstr。如果字符串中实际上保存的是一个不超过64位的整数,则会以int类型存储,否则会以raw类型(底层用的sds)存储。但是特殊的如果字符串长度不超过32,那么会用embstr类型存储这个字符串emstr的特点就是将redisObject和sds作为一块连续的区间分配管理,于是这样就减少了内存分配和回收的次数,同时也更好的利用了缓存。在redis中使用字符串类型存储浮点数,而浮点数的字符串形式长度一般不超过32,因此一般会用embstr表示。
列表类型
encoding中列表类型可以分为linklist,ziplist,quicklist,其中前两种在Redis3.2版本之前使用,后来quicklist替代了前两者。下面是lpush和rpush的核心实现
/* The function pushes an element to the specified list object 'subject',
* at head or tail position as specified by 'where'.
*
* There is no need for the caller to increment the refcount of 'value' as
* the function takes care of it if needed. */
void listTypePush(robj *subject, robj *value, int where) {
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
//对value进行解码,并增加引用计数
value = getDecodedObject(value);
size_t len = sdslen(value->ptr);
quicklistPush(subject->ptr, value->ptr, len, pos);
//减少引用计数,释放解码后的对象(quicklist会拷贝值)
decrRefCount(value);
} else {
serverPanic("Unknown list encoding");
}
}
//where表示头部还是尾部加
void pushGenericCommand(client *c, int where) {
int j, pushed = 0;
//找到对象
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
if (lobj && lobj->type != OBJ_LIST) {
addReply(c,shared.wrongtypeerr);
return;
}
//遍历待加对象
for (j = 2; j < c->argc; j++) {
if (!lobj) {
//如果是新对象,就创建一个quicklist对象
lobj = createQuicklistObject();
//设置fill和depth参数
quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
//将新对象注册到数据库中
dbAdd(c->db,c->argv[1],lobj);
}
listTypePush(lobj,c->argv[j],where);
pushed++;
}
//回复长度
addReplyLongLong(c, (lobj ? listTypeLength(lobj) : 0));
if (pushed) {
//推送事件
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}
//修改版本
server.dirty += pushed;
}
哈希类型
redis中哈希类型的编码可能为ziplist和hashtable。ziplist中将键值对作为表中的两个项紧密排放,之后每次查询都暴力查表。新加入的元素会插入到ziplist的尾部。而hashtable编码的哈希类型底层使用dict存储键值对。
//哈希类型默认使用ziplist编码
robj *createHashObject(void) {
unsigned char *zl = ziplistNew();
robj *o = createObject(OBJ_HASH, zl);
o->encoding = OBJ_ENCODING_ZIPLIST;
return o;
}
ziplist会因为存储的元素占用过大空间和ziplist长度过大原因升级为哈希表。
//判断是否需要将ziplist转哈希表
/* Check the length of a number of objects to see if we need to convert a
* ziplist to a real hash. Note that we only check string encoded objects
* as their string length can be queried in constant time. */
void hashTypeTryConversion(robj *o, robj **argv, int start, int end) {
int i;
if (o->encoding != OBJ_ENCODING_ZIPLIST) return;
for (i = start; i <= end; i++) {
if (sdsEncodedObject(argv[i]) &&
sdslen(argv[i]->ptr) > server.hash_max_ziplist_value)
{
//如果键值中有一个超出server.hash_max_ziplist_value就转哈希表
hashTypeConvert(o, OBJ_ENCODING_HT);
break;
}
}
}
//哈希表set操作
int hashTypeSet(robj *o, sds field, sds value, int flags) {
int update = 0;
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl, *fptr, *vptr;
...
/* Check if the ziplist needs to be converted to a hash table */
if (hashTypeLength(o) > server.hash_max_ziplist_entries)
//如果ziplist项数超过server.hash_max_ziplist_entries也要转哈希表
hashTypeConvert(o, OBJ_ENCODING_HT);
}
...
}
集合对象
集合对象的编码可以是intset或hashtable。
/* Factory method to return a set that *can* hold "value". When the object has
* an integer-encodable value, an intset will be returned. Otherwise a regular
* hash table. */
//创建一个新的集合
robj *setTypeCreate(sds value) {
if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
//如果插入的新值可以编码为整数,就创建intset
return createIntsetObject();
//否则还是建一个集合吧
return createSetObject();
}
//集合类型用哈希表来实现
robj *createSetObject(void) {
dict *d = dictCreate(&setDictType,NULL);
robj *o = createObject(OBJ_SET,d);
o->encoding = OBJ_ENCODING_HT;
return o;
}
intset在元素类型不能以64位整数表示,以及长度过大的时候都会自动转哈希表。
//新增操作
if (subject->encoding == OBJ_ENCODING_INTSET) {
if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
uint8_t success = 0;
subject->ptr = intsetAdd(subject->ptr,llval,&success);
if (success) {
/* Convert to regular set when the intset contains
* too many entries. */
if (intsetLen(subject->ptr) > server.set_max_intset_entries)
//如果整数集合大小超过server.set_max_intset_entries,转哈希表
setTypeConvert(subject, OBJ_ENCODING_HT);
return 1;
}
} else {
//如果新元素不能表示为64位整数,转哈希表
/* Failed to get integer from object, convert to regular set. */
setTypeConvert(subject,OBJ_ENCODING_HT);
/* The set *was* an intset and this value is not integer
* encodable, so dictAdd should always work. */
serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
return 1;
}
}
当使用哈希表的时候,键为集合中的元素,值为NULL。
dict *ht = subject->ptr;
dictEntry *de = dictAddRaw(ht,value,NULL);
if (de) {
dictSetKey(ht,de,sdsdup(value));
dictSetVal(ht,de,NULL);
return 1;
}
有序集合类型
有序列表的编码可以是ziplist或skiplist。
/* Lookup the key and create the sorted set if does not exist. */
zobj = lookupKeyWrite(c->db,key);
if (zobj == NULL) {
if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
//如果第一个新元素就要求升级,直接上zset
zobj = createZsetObject();
} else {
//否则创建一个ziplist凑合用
zobj = createZsetZiplistObject();
}
dbAdd(c->db,key,zobj);
}
ziplist中的元素按从小到大排序。
/* Update the sorted set according to its encoding. */
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *eptr;
//找到插入位置
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
/* NX? Return, same element already exists. */
if (nx) {
*flags |= ZADD_NOP;
return 1;
}
/* Prepare the score for the increment if needed. */
if (incr) {
score += curscore;
if (isnan(score)) {
*flags |= ZADD_NAN;
return 0;
}
if (newscore) *newscore = score;
}
/* Remove and re-insert when score changed. */
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
*flags |= ZADD_UPDATED;
}
return 1;
} else if (!xx) {
/* Optimize: check if the element is too large or the list
* becomes too long *before* executing zzlInsert. */
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
...
}
}
可以发现ziplist当过长或元素过大的时候,就会升级为skiplist。
...
//过大或过长?
if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries ||
sdslen(ele) > server.zset_max_ziplist_value)
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
if (newscore) *newscore = score;
*flags |= ZADD_ADDED;
return 1;
二进制类型
Redis提供了一些二进制操作命令SETBIT
,GETBIT
等允许我们直接处理二进制类型。
Redis中使用字符串类型来表示二进制。下面是取字符串的代码:
//获取二进制位数组
robj *lookupStringForBitCommand(client *c, size_t maxbit) {
size_t byte = maxbit >> 3;
robj *o = lookupKeyWrite(c->db,c->argv[1]);
if (o == NULL) {
//不存在,就创建一个二进制数组
o = createObject(OBJ_STRING,sdsnewlen(NULL, byte+1));
dbAdd(c->db,c->argv[1],o);
} else {
//如果不是字符串类型就返回NULL
if (checkType(c,o,OBJ_STRING)) return NULL;
//转成raw类型的字符串
o = dbUnshareStringValue(c->db,c->argv[1],o);
o->ptr = sdsgrowzero(o->ptr,byte+1);
}
return o;
}
下面是SETBIT
的代码:
/* SETBIT key offset bitvalue */
void setbitCommand(client *c) {
robj *o;
char *err = "bit is not an integer or out of range";
size_t bitoffset;
ssize_t byte, bit;
int byteval, bitval;
long on;
if (getBitOffsetFromArgument(c,c->argv[2],&bitoffset,0,0) != C_OK)
return;
if (getLongFromObjectOrReply(c,c->argv[3],&on,err) != C_OK)
return;
/* Bits can only be set or cleared... */
if (on & ~1) {
addReplyError(c,err);
return;
}
if ((o = lookupStringForBitCommand(c,bitoffset)) == NULL) return;
/* Get current values */
byte = bitoffset >> 3;
byteval = ((uint8_t*)o->ptr)[byte];
bit = 7 - (bitoffset & 0x7);
bitval = byteval & (1 << bit);
/* Update byte with new bit value and return original value */
byteval &= ~(1 << bit);
byteval |= ((on & 0x1) << bit);
((uint8_t*)o->ptr)[byte] = byteval;
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id);
server.dirty++;
addReply(c, bitval ? shared.cone : shared.czero);
}
下面是GETBIT
的代码:
/* GETBIT key offset */
void getbitCommand(client *c) {
robj *o;
char llbuf[32];
size_t bitoffset;
size_t byte, bit;
size_t bitval = 0;
if (getBitOffsetFromArgument(c,c->argv[2],&bitoffset,0,0) != C_OK)
return;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,o,OBJ_STRING)) return;
byte = bitoffset >> 3;
bit = 7 - (bitoffset & 0x7);
if (sdsEncodedObject(o)) {
if (byte < sdslen(o->ptr))
bitval = ((uint8_t*)o->ptr)[byte] & (1 << bit);
} else {
if (byte < (size_t)ll2string(llbuf,sizeof(llbuf),(long)o->ptr))
bitval = llbuf[byte] & (1 << bit);
}
addReply(c, bitval ? shared.cone : shared.czero);
}
内存回收
C语言不自带内存回收,redis通过引用计数的方式自己实现了内存回收机制。
typedef struct redisObject {
...
//引用计数
int refcount;
...
} robj;
当创建一个新对象的时候,其计数为1。在对象被引用的时候,其计数会加1,当对象不再被引用时,其计数会减1。而当对象计数为0的时候,对象就会被释放。
空转时长
Redis中采用lru字段记录对象最后一次被客户端访问的时间。
typedef struct redisObject {
//LRU时间,低8位存储频率,高16位存储访问时间
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
} robj;
当我们设置了maxmemory选项,且服务器的淘汰算法为voltile-lruh或者allkeys-lru。那么当服务器占用了超过maxmemory的内存,那么就会优先释放那些空转时长较大的对象。
数据库
数据库对象
服务器对象对应的是redisServer
对象。
Redis服务器中所有数据库都保存在redisServer
结构的db数组中。下面是server.h
中的源码:
struct redisServer {
...
redisDb *db;
int dbnum; /* Total number of configured DBs */
...
};
默认情况下客户端使用的数据库是0号数据库,客户端可以通过SELECT命令来切换数据库。
redis的数据库对象的源码如下,它的dict字段来保存对象名称到对象的映射,这些映射称为键空间。
/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
typedef struct redisDb {
//键空间
dict *dict; /* The keyspace for this DB */
//所有键的过期时间
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;
客户端对象
客户端对象对应的是client
结构。
struct client {
//使用哪个数据库
redisDb *db; /* Pointer to currently SELECTed DB. */
}
过期
expires成员中保存了所有的键的过期时间,过期时间是以毫秒为精度的UNIX时间戳。
/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
typedef struct redisDb {
//键空间
dict *dict; /* The keyspace for this DB */
//所有键的过期时间
dict *expires; /* Timeout of keys with a timeout set */
} redisDb;
可以通过expire命令设置过期时间。
/* Set an expire to the specified key. If the expire is set in the context
* of an user calling a command 'c' is the client, otherwise 'c' is set
* to NULL. The 'when' parameter is the absolute unix time in milliseconds
* after which the key will no longer be considered valid. */
void setExpire(client *c, redisDb *db, robj *key, long long when) {
dictEntry *kde, *de;
/* Reuse the sds from the main dict in the expire dict */
kde = dictFind(db->dict,key->ptr);
serverAssertWithInfo(NULL,key,kde != NULL);
//设置过期时间(用同一个key对象)
de = dictAddOrFind(db->expires,dictGetKey(kde));
dictSetSignedIntegerVal(de,when);
int writable_slave = server.masterhost && server.repl_slave_ro == 0;
if (c && writable_slave && !(c->flags & CLIENT_MASTER))
rememberSlaveKeyWithExpire(db,key);
}
/* This is the generic command implementation for EXPIRE, PEXPIRE, EXPIREAT
* and PEXPIREAT. Because the commad second argument may be relative or absolute
* the "basetime" argument is used to signal what the base time is (either 0
* for *AT variants of the command, or the current time for relative expires).
*
* unit is either UNIT_SECONDS or UNIT_MILLISECONDS, and is only used for
* the argv[2] parameter. The basetime is always specified in milliseconds. */
void expireGenericCommand(client *c, long long basetime, int unit) {
robj *key = c->argv[1], *param = c->argv[2];
long long when; /* unix time in milliseconds when the key will expire. */
if (getLongLongFromObjectOrReply(c, param, &when, NULL) != C_OK)
return;
if (unit == UNIT_SECONDS) when *= 1000;
when += basetime;
/* No key, return zero. */
if (lookupKeyWrite(c->db,key) == NULL) {
//找不到key
addReply(c,shared.czero);
return;
}
/* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past
* should never be executed as a DEL when load the AOF or in the context
* of a slave instance.
*
* Instead we take the other branch of the IF statement setting an expire
* (possibly in the past) and wait for an explicit DEL from the master. */
if (when <= mstime() && !server.loading && !server.masterhost) {
//在过去过期,就是删除啦
robj *aux;
int deleted = server.lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) :
dbSyncDelete(c->db,key);
serverAssertWithInfo(c,key,deleted);
server.dirty++;
/* Replicate/AOF this as an explicit DEL or UNLINK. */
aux = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
rewriteClientCommandVector(c,2,aux,key);
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
addReply(c, shared.cone);
return;
} else {
//设一下过期时间
setExpire(c,c->db,key,when);
addReply(c,shared.cone);
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
server.dirty++;
return;
}
}
也可以用persist命令去除某个对象的过期时间。
删除
键的删除分为定时删除和惰性删除两种。
可以使用lazyfree_lazy_server_del
开启异步删除。如果对象比较大且是立即被释放(引用计数为1),则会走异步,否则释放走同步。
/* This is a wrapper whose behavior depends on the Redis lazy free
* configuration. Deletes the key synchronously or asynchronously. */
//删除对象
int dbDelete(redisDb *db, robj *key) {
return server.lazyfree_lazy_server_del ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
}
//异步删除
//异步删除是指对value的异步释放,但是key还是会同步释放的
int dbAsyncDelete(redisDb *db, robj *key) {
//从过期时间中删除key
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
/* If the value is composed of a few allocations, to free in a lazy way
* is actually just slower... So under a certain limit we just free
* the object synchronously. */
dictEntry *de = dictUnlink(db->dict,key->ptr);
if (de) {
//存在于数据库
robj *val = dictGetVal(de);
//大概删除费时,如果是字符串,则返回1,如果是集合,则返回集合的大小
size_t free_effort = lazyfreeGetFreeEffort(val);
/* If releasing the object is too much work, do it in the background
* by adding the object to the lazy free list.
* Note that if the object is shared, to reclaim it now it is not
* possible. This rarely happens, however sometimes the implementation
* of parts of the Redis core may call incrRefCount() to protect
* objects, and then call dbDelete(). In this case we'll fall
* through and reach the dictFreeUnlinkedEntry() call, that will be
* equivalent to just calling decrRefCount(). */
if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
//首先真的会删除,其次删除花的时间达到LAZYFREE_THRESHOLD才执行异步,否则就是浪费时间
atomicIncr(lazyfree_objects, 1);
//插入异步任务
bioCreateBackgroundJob(BIO_LAZY_FREE, val, NULL, NULL);
//值异步释放,因此这里把值设成NULL防止被后面的同步操作锁释放
dictSetVal(db->dict, de, NULL);
}
}
/* Release the key-val pair, or just the key if we set the val
* field to NULL in order to lazy free it later. */
if (de) {
dictFreeUnlinkedEntry(db->dict,de);
if (server.cluster_enabled) slotToKeyDel(key->ptr);
return 1;
} else {
return 0;
}
}
惰性删除是指每次客户端访问某个对象前都需要检查键是否过期。
/* This function is called when we are going to perform some operation
* in a given key, but such key may be already logically expired even if
* it still exists in the database. The main way this function is called
* is via lookupKey*() family of functions.
*
* The behavior of the function depends on the replication role of the
* instance, because slave instances do not expire keys, they wait
* for DELs from the master for consistency matters. However even
* slaves will try to have a coherent return value for the function,
* so that read commands executed in the slave side will be able to
* behave like if the key is expired even if still present (because the
* master has yet to propagate the DEL).
*
* In masters as a side effect of finding a key which is expired, such
* key will be evicted from the database. Also this may trigger the
* propagation of a DEL/UNLINK command in AOF / replication stream.
*
* The return value of the function is 0 if the key is still valid,
* otherwise the function returns 1 if the key is expired. */
int expireIfNeeded(redisDb *db, robj *key) {
//未过期
if (!keyIsExpired(db,key)) return 0;
/* If we are running in the context of a slave, instead of
* evicting the expired key from the database, we return ASAP:
* the slave key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys.
*
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time. */
//slave过期了不执行删除,但是只返回正确信息
if (server.masterhost != NULL) return 1;
/* Delete the key */
//真的删除
server.stat_expiredkeys++;
propagateExpire(db,key,server.lazyfree_lazy_expire);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id);
//同步还是异步删除
int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
if (retval) signalModifiedKey(NULL,db,key);
return retval;
}
/* Lookup a key for read operations, or return NULL if the key is not found
* in the specified DB.
*
* As a side effect of calling this function:
* 1. A key gets expired if it reached it's TTL.
* 2. The key last access time is updated.
* 3. The global keys hits/misses stats are updated (reported in INFO).
* 4. If keyspace notifications are enabled, a "keymiss" notification is fired.
*
* This API should not be used when we write to the key after obtaining
* the object linked to the key, but only for read only operations.
*
* Flags change the behavior of this command:
*
* LOOKUP_NONE (or zero): no special flags are passed.
* LOOKUP_NOTOUCH: don't alter the last access time of the key.
*
* Note: this function also returns NULL if the key is logically expired
* but still existing, in case this is a slave, since this API is called only
* for read operations. Even if the key expiry is master-driven, we can
* correctly report a key is expired on slaves even if the master is lagging
* expiring our key via DELs in the replication link. */
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
robj *val;
if (expireIfNeeded(db,key) == 1) {
...
}
...
}
/* Lookup a key for write operations, and as a side effect, if needed, expires
* the key if its TTL is reached.
*
* Returns the linked value object if the key exists or NULL if the key
* does not exist in the specified DB. */
robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
expireIfNeeded(db,key);
return lookupKey(db,key,flags);
}
定时删除是指服务器周期性地遍历各个数据库,从每个数据库的expires字典中随机检查一部分键的过期时间并删除其中的过期键。
void activeExpireCycle(int type) {
/* Adjust the running parameters according to the configured expire
* effort. The default effort is 1, and the maximum configurable effort
* is 10. */
unsigned long
effort = server.active_expire_effort-1, /* Rescale from 0 to 9. */
config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP +
ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort,
config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION +
ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4*effort,
config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC +
2*effort,
config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE-
effort;
/* This function has some global state in order to continue the work
* incrementally across calls. */
//上一个扫描过的DB下标
static unsigned int current_db = 0; /* Last DB tested. */
//timelimit_exit表示上次清理退出的原因:
//1表示上次是超时推出的,还有未清理的内容。
//0表示上次全部都清理完成了,没有未清理的内容
static int timelimit_exit = 0; /* Time limit hit in previous call? */
static long long last_fast_cycle = 0; /* When last fast cycle ran. */
int j, iteration = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
long long start = ustime(), timelimit, elapsed;
/* When clients are paused the dataset should be static not just from the
* POV of clients not being able to write, but also from the POV of
* expires and evictions of keys not being performed. */
if (clientsArePaused()) return;
if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
/* Don't start a fast cycle if the previous cycle did not exit
* for time limit, unless the percentage of estimated stale keys is
* too high. Also never repeat a fast cycle for the same period
* as the fast cycle total duration itself. */
if (!timelimit_exit &&
server.stat_expired_stale_perc < config_cycle_acceptable_stale)
return;
if (start < last_fast_cycle + (long long)config_cycle_fast_duration*2)
return;
last_fast_cycle = start;
}
/* We usually should test CRON_DBS_PER_CALL per iteration, with
* two exceptions:
*
* 1) Don't test more DBs than we have.
* 2) If last time we hit the time limit, we want to scan all DBs
* in this iteration, as there is work to do in some DB and we don't want
* expired keys to use memory for too much time. */
if (dbs_per_call > server.dbnum || timelimit_exit)
dbs_per_call = server.dbnum;
/* We can use at max 'config_cycle_slow_time_perc' percentage of CPU
* time per iteration. Since this function gets called with a frequency of
* server.hz times per second, the following is the max amount of
* microseconds we can spend in this function. */
timelimit = config_cycle_slow_time_perc*1000000/server.hz/100;
timelimit_exit = 0;
if (timelimit <= 0) timelimit = 1;
if (type == ACTIVE_EXPIRE_CYCLE_FAST)
timelimit = config_cycle_fast_duration; /* in microseconds. */
/* Accumulate some global stats as we expire keys, to have some idea
* about the number of keys that are already logically expired, but still
* existing inside the database. */
long total_sampled = 0;
long total_expired = 0;
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
/* Expired and checked in a single loop. */
unsigned long expired, sampled;
//选择下一个db
redisDb *db = server.db+(current_db % server.dbnum);
/* Increment the DB now so we are sure if we run out of time
* in the current DB we'll restart from the next. This allows to
* distribute the time evenly across DBs. */
current_db++;
/* Continue to expire if at the end of the cycle there are still
* a big percentage of keys to expire, compared to the number of keys
* we scanned. The percentage, stored in config_cycle_acceptable_stale
* is not fixed, but depends on the Redis configured "expire effort". */
do {
unsigned long num, slots;
long long now, ttl_sum;
int ttl_samples;
iteration++;
/* If there is nothing to expire try next DB ASAP. */
//不存在允许过期的键
if ((num = dictSize(db->expires)) == 0) {
db->avg_ttl = 0;
break;
}
slots = dictSlots(db->expires);
now = mstime();
//如果expire表很稀松(数据量/slot < 0.01)就跳出
/* When there are less than 1% filled slots, sampling the key
* space is expensive, so stop here waiting for better times...
* The dictionary will be resized asap. */
if (num && slots > DICT_HT_INITIAL_SIZE &&
(num*100/slots < 1)) break;
/* The main collection cycle. Sample random keys among keys
* with an expire set, checking for expired ones. */
expired = 0;
sampled = 0;
ttl_sum = 0;
ttl_samples = 0;
//采样数上限为min(dictSize(db->expires), config_keys_per_loop)
if (num > config_keys_per_loop)
num = config_keys_per_loop;
/* Here we access the low level representation of the hash table
* for speed concerns: this makes this code coupled with dict.c,
* but it hardly changed in ten years.
*
* Note that certain places of the hash table may be empty,
* so we want also a stop condition about the number of
* buckets that we scanned. However scanning for free buckets
* is very fast: we are in the cache line scanning a sequential
* array of NULL pointers, so we can scan a lot more buckets
* than keys in the same time. */
long max_buckets = num*20;
long checked_buckets = 0;
while (sampled < num && checked_buckets < max_buckets) {
//还没采够样
for (int table = 0; table < 2; table++) {
if (table == 1 && !dictIsRehashing(db->expires)) break;
unsigned long idx = db->expires_cursor;
idx &= db->expires->ht[table].sizemask;
dictEntry *de = db->expires->ht[table].table[idx];
long long ttl;
/* Scan the current bucket of the current table. */
checked_buckets++;
//遍历链表
while(de) {
/* Get the next entry now since this entry may get
* deleted. */
dictEntry *e = de;
de = de->next;
ttl = dictGetSignedIntegerVal(e)-now;
//找到可能过期的键
if (activeExpireCycleTryExpire(db,e,now)) expired++;
if (ttl > 0) {
/* We want the average TTL of keys yet
* not expired. */
//统计所有过期时间的总和来计算平均数
ttl_sum += ttl;
ttl_samples++;
}
//又采了个样
sampled++;
}
}
db->expires_cursor++;
}
total_expired += expired;
total_sampled += sampled;
/* Update the average TTL stats for this database. */
//更新平均TTL数据
if (ttl_samples) {
long long avg_ttl = ttl_sum/ttl_samples;
/* Do a simple running average with a few samples.
* We just use the current estimate with a weight of 2%
* and the previous estimate with a weight of 98%. */
if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
}
/* We can't block forever here even if there are many keys to
* expire. So after a given amount of milliseconds return to the
* caller waiting for the other active expire cycle. */
//如果超时了,就赶紧退出吧
if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
elapsed = ustime()-start;
if (elapsed > timelimit) {
timelimit_exit = 1;
server.stat_expired_time_cap_reached_count++;
break;
}
}
/* We don't repeat the cycle for the current database if there are
* an acceptable amount of stale keys (logically expired but yet
* not reclaimed). */
} while (sampled == 0 ||
(expired*100/sampled) > config_cycle_acceptable_stale);
}
elapsed = ustime()-start;
server.stat_expire_cycle_time_used += elapsed;
latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
/* Update our estimate of keys existing but yet to be expired.
* Running average with this sample accounting for 5%. */
double current_perc;
if (total_sampled) {
current_perc = (double)total_expired/total_sampled;
} else
current_perc = 0;
server.stat_expired_stale_perc = (current_perc*0.05)+
(server.stat_expired_stale_perc*0.95);
}
订阅
redis支持订阅功能。redis的订阅功能由PUBLISH
,SUBSCRIBE
,PSUBSCRIBE
等命令组成,PSUBSCRIBE
表示订阅所有匹配带通配符渠道名的渠道。
一个客户端可以通过PUBLISH
命令订阅多个频道。每当有客户端向某个频道发送消息时,其它客户端就能收到这条消息。
redisServer类型中维护了下面相关字段。
struct redisServer {
/* Pubsub */
dict *pubsub_channels; /* Map channels to list of subscribed clients */
list *pubsub_patterns; /* A list of pubsub_patterns */
dict *pubsub_patterns_dict; /* A dict of pubsub_patterns */
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
xor of NOTIFY_... flags. */
}
具体的发布的代码如下:
void publishCommand(client *c) {
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
if (server.cluster_enabled)
clusterPropagatePublish(c->argv[1],c->argv[2]);
else
forceCommandPropagation(c,PROPAGATE_REPL);
addReplyLongLong(c,receivers);
}
/* Publish a message */
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
dictIterator *di;
listNode *ln;
listIter li;
/* Send to clients listening for that channel */
de = dictFind(server.pubsub_channels,channel);
if (de) {
//如果有这个渠道
list *list = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
//向c发布消息
addReplyPubsubMessage(c,channel,message);
receivers++;
}
}
/* Send to clients listening to matching channels */
//浏览所有订阅模式模式
di = dictGetIterator(server.pubsub_patterns_dict);
if (di) {
channel = getDecodedObject(channel);
while((de = dictNext(di)) != NULL) {
robj *pattern = dictGetKey(de);
list *clients = dictGetVal(de);
//判断是否匹配
if (!stringmatchlen((char*)pattern->ptr,
sdslen(pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) continue;
listRewind(clients,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
//向c发布消息
addReplyPubsubPatMessage(c,pattern,channel,message);
receivers++;
}
}
decrRefCount(channel);
dictReleaseIterator(di);
}
return receivers;
}
下面是订阅的代码
void subscribeCommand(client *c) {
int j;
//一次性可以订阅多个渠道
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
c->flags |= CLIENT_PUBSUB;
}
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(client *c, robj *channel) {
dictEntry *de;
list *clients = NULL;
int retval = 0;
/* Add the channel to the client -> channels hash table */
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
/* Add the client to the channel -> list of clients hash table */
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
//如果之前没有渠道,则创建一个空的列表
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
listAddNodeTail(clients,c);
}
/* Notify the client */
addReplyPubsubSubscribed(c,channel);
return retval;
}
下面是取消订阅的代码:
void unsubscribeCommand(client *c) {
if (c->argc == 1) {
//如果不传具体的channel,则取消所有
pubsubUnsubscribeAllChannels(c,1);
} else {
int j;
//逐一取消
for (j = 1; j < c->argc; j++)
pubsubUnsubscribeChannel(c,c->argv[j],1);
}
//如果不订阅了,就删除flags中的bit位
if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
* 0 if the client was not subscribed to the specified channel. */
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
dictEntry *de;
list *clients;
listNode *ln;
int retval = 0;
/* Remove the channel from the client -> channels hash table */
incrRefCount(channel); /* channel may be just a pointer to the same object
we have in the hash tables. Protect it... */
if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
retval = 1;
/* Remove the client from the channel -> clients list hash table */
de = dictFind(server.pubsub_channels,channel);
serverAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
ln = listSearchKey(clients,c);
serverAssertWithInfo(c,NULL,ln != NULL);
listDelNode(clients,ln);
if (listLength(clients) == 0) {
/* Free the list and associated hash entry at all if this was
* the latest client, so that it will be possible to abuse
* Redis PUBSUB creating millions of channels. */
dictDelete(server.pubsub_channels,channel);
}
}
/* Notify the client */
if (notify) addReplyPubsubUnsubscribed(c,channel);
decrRefCount(channel); /* it is finally safe to release it */
return retval;
}
事务
你可以通过MULTI
命令开启一个事务,通过EXEC
执行当前开启的事务。
Redis中事务的实现是为每个事务维护一个队列,只有在提交的时候才真正执行队列中的命令。
为了实现CAS的性质,我们可以为WATCH
一些对象,如果这些对象在我们事务提交前被修改,那么事务就会提交失败,这部分的实现机制是为每个数据都维护一个watched_keys
变量,并在客户端中也维护一个watched_keys
列表。所有被观测的对象都会
typedef struct redisDb {
//实现事务提交时的CAS
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
} redisDb;
typedef struct client {
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
} client;
执行WATCH命令的代码如下:
void watchCommand(client *c) {
int j;
if (c->flags & CLIENT_MULTI) {
addReplyError(c,"WATCH inside MULTI is not allowed");
return;
}
for (j = 1; j < c->argc; j++)
watchForKey(c,c->argv[j]);
addReply(c,shared.ok);
}
/* Watch for the specified key */
void watchForKey(client *c, robj *key) {
list *clients = NULL;
listIter li;
listNode *ln;
watchedKey *wk;
/* Check if we are already watching for this key */
//要判断当前客户端是否已经在WATCH了,不重复WATCH
listRewind(c->watched_keys,&li);
while((ln = listNext(&li))) {
wk = listNodeValue(ln);
if (wk->db == c->db && equalStringObjects(key,wk->key))
//已经WATCH就什么都不用做,直接退出
return; /* Key already watched */
}
//在db中加入一个WATCH对象,表示当前客户端,在观测这个key
/* This key is not already watched in this DB. Let's add it */
clients = dictFetchValue(c->db->watched_keys,key);
if (!clients) {
clients = listCreate();
dictAdd(c->db->watched_keys,key,clients);
incrRefCount(key);
}
listAddNodeTail(clients,c);
/* Add the new key to the list of keys watched by this client */
//在客户端列表尾加一项
wk = zmalloc(sizeof(*wk));
wk->key = key;
wk->db = c->db;
incrRefCount(key);
listAddNodeTail(c->watched_keys,wk);
}
在修改了某个对象key
后,会将所有关注key
的事务全部失效,删除等操作也会通知。
/* "Touch" a key, so that if this key is being WATCHed by some client the
* next EXEC will fail. */
//修改了key
void touchWatchedKey(redisDb *db, robj *key) {
list *clients;
listIter li;
listNode *ln;
if (dictSize(db->watched_keys) == 0) return;
clients = dictFetchValue(db->watched_keys, key);
if (!clients) return;
/* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
/* Check if we are already watching for this key */
listRewind(clients,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
//设置修改脏标记
c->flags |= CLIENT_DIRTY_CAS;
}
}
/* Note that the 'c' argument may be NULL if the key was modified out of
* a context of a client. */
void signalModifiedKey(client *c, redisDb *db, robj *key) {
touchWatchedKey(db,key);
trackingInvalidateKey(c,key);
}
/* High level Set operation. This function can be used in order to set
* a key, whatever it was existing or not, to a new object.
*
* 1) The ref count of the value object is incremented.
* 2) clients WATCHing for the destination key notified.
* 3) The expire time of the key is reset (the key is made persistent),
* unless 'keepttl' is true.
*
* All the new keys in the database should be created via this interface.
* The client 'c' argument may be set to NULL if the operation is performed
* in a context where there is no clear client performing the operation. */
void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) {
if (lookupKeyWrite(db,key) == NULL) {
dbAdd(db,key,val);
} else {
dbOverwrite(db,key,val);
}
incrRefCount(val);
//如果删除
if (!keepttl) removeExpire(db,key);
if (signal) signalModifiedKey(c,db,key);
}
而在执行事务之前会检查观测的KEY是否修改过,如果修改过就直接失败,否则执行。这里需要注意的是,一旦执行事务,是无法回退的,如果中间命令出现错误,则会被忽略继续执行下去。redis也不支持rollback等命令,作者的解释是增加回退的支持会使得redis变得复杂,且这类错在在开发环境一般就能被发现。
由于Redis是单线程处理命令的,因此原子性是能保证的。
对于一致性,在处理事务中堆积的命令的时宕机。如果使用的RDB持久化,则显然没有任何影响,因为中间的修改命令不会被导出。如果服务器使用了AOF持久化,这时候可能只有部分命令写入,这样会导致恢复的数据库处于事务执行部分的状态,redis通过在AOF文件中插入MULTI和EXEC命令解决,这样在恢复的时候MULTI只有通过EXEC关闭后,中间的命令才会被恢复,否则会被丢弃。
//提交事务
void execCommand(client *c) {
int j;
robj **orig_argv;
int orig_argc;
struct redisCommand *orig_cmd;
int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
int was_master = server.masterhost == NULL;
//非事务状态
if (!(c->flags & CLIENT_MULTI)) {
addReplyError(c,"EXEC without MULTI");
return;
}
/* Check if we need to abort the EXEC because:
* 1) Some WATCHed key was touched.
* 2) There was a previous error while queueing commands.
* A failed EXEC in the first case returns a multi bulk nil object
* (technically it is not an error but a special behavior), while
* in the second an EXECABORT error is returned. */
//如果Watch的key被修改了
if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
shared.nullarray[c->resp]);
discardTransaction(c);
goto handle_monitor;
}
/* If there are write commands inside the transaction, and this is a read
* only slave, we want to send an error. This happens when the transaction
* was initiated when the instance was a master or a writable replica and
* then the configuration changed (for example instance was turned into
* a replica). */
//在只读客户端执行事务,且事务中带写命令
if (!server.loading && server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE)
{
addReplyError(c,
"Transaction contains write commands but instance "
"is now a read-only replica. EXEC aborted.");
discardTransaction(c);
goto handle_monitor;
}
/* Exec all the queued commands */
//执行所有队列中的命令
...
/* Make sure the EXEC command will be propagated as well if MULTI
* was already propagated. */
//写入EXEC命令
if (must_propagate) {
int is_master = server.masterhost == NULL;
server.dirty++;
/* If inside the MULTI/EXEC block this instance was suddenly
* switched from master to slave (using the SLAVEOF command), the
* initial MULTI was propagated into the replication backlog, but the
* rest was not. We need to make sure to at least terminate the
* backlog with the final EXEC. */
if (server.repl_backlog && was_master && !is_master) {
char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
feedReplicationBacklog(execcmd,strlen(execcmd));
}
}
...
}
持久化
由于redis是内存数据库,因此重启后所有对象都会丢失。redis提供了一些持久化的方案。
RDB
执行SAVE和BGSAVE命令的时候,会将Redis中的对象导出为一个dump.rdb
文件。这里我看到一些书上说是服务器只会导出未过期的关键字,redis源码的注释上也这么写,但是代码里好像没有做过滤,这里存疑。
/* Save a key-value pair, with expire time, type, key, value.
* On error -1 is returned.
* On success if the key was actually saved 1 is returned, otherwise 0
* is returned (the key was already expired). */
//但是看代码好像过期对象也会写入到rdb中。
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;
/* Save the expire time */
if (expiretime != -1) {
//保存过期时间
if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
}
/* Save the LRU info. */
if (savelru) {
uint64_t idletime = estimateObjectIdleTime(val);
idletime /= 1000; /* Using seconds is enough and requires less space.*/
if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
if (rdbSaveLen(rdb,idletime) == -1) return -1;
}
/* Save the LFU info. */
if (savelfu) {
uint8_t buf[1];
buf[0] = LFUDecrAndReturn(val);
/* We can encode this in exactly two bytes: the opcode and an 8
* bit counter, since the frequency is logarithmic with a 0-255 range.
* Note that we do not store the halving time because to reset it
* a single time when loading does not affect the frequency much. */
if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
}
/* Save type, key, value */
if (rdbSaveObjectType(rdb,val) == -1) return -1;
if (rdbSaveStringObject(rdb,key) == -1) return -1;
if (rdbSaveObject(rdb,val,key) == -1) return -1;
/* Delay return if required (for testing) */
if (server.rdb_key_save_delay)
//挂起一段时间,貌似用于测试
usleep(server.rdb_key_save_delay);
return 1;
}
之后重启服务器后会自动识别rdb文件并从rdb文件中恢复(没有加载命令)。
SAVE是同步操作,BGSAVE是异步操作。执行SAVE会导致服务器在导出的过程中阻塞,不会接受任何客户端的请求。而BGSAVE实际上会fork当前进程创建一个子进程,之后由子进程通过共享内存的方式导出所有对象。
比较特殊的是,如果运行的服务器是主服务器,则加载RDB文件时不会恢复过期的键,而从服务器则会将所有键全部恢复。
注意,由于AOF文件一般比RDB文件更接近最后的版本,因此如果服务器开启了AOF功能,那么启动时会优先利用AOF文件恢复。只有在AOF功能关闭的情况下,才会选择使用RDB文件恢复。
在执行BGSAVE命令的时候,如果客户端发来SAVE或BGSAVE命令都会被拒绝。
/* BGSAVE [SCHEDULE] */
//异步dump
void bgsaveCommand(client *c) {
...
if (server.rdb_child_pid != -1) {
//已经有任务在跑了
addReplyError(c,"Background save already in progress");
}
...
}
void saveCommand(client *c) {
if (server.rdb_child_pid != -1) {
addReplyError(c,"Background save already in progress");
return;
}
...
}
BGSAVE也可以定时执行。当距离上一次修改时间够大,且修改量足够大,就会触发BGSAVE任务(当然前提是没有其它子进程存活)。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
/* Check if a background saving or AOF rewrite in progress terminated. */
if (hasActiveChildProcess() || ldbPendingChildren())
{
checkChildrenDone();
} else {
/* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now. */
//如果没有子进程在跑,就看一下需不需要启动一个子进程
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
/* Save if we reached the given amount of changes,
* the given amount of seconds, and if the latest bgsave was
* successful or if, in case of an error, at least
* CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
//于是乎启动了一个
rdbSaveBackground(server.rdb_filename,rsiptr);
break;
}
}
}
...
}
可以通过config set save
来设置saveparam。
struct redisServer {
struct saveparam *saveparams; /* Save points array for RDB */
};
struct saveparam {
time_t seconds;
int changes;
};
saveparam在配置文件中的默认值。
################################ SNAPSHOTTING ################################
#
# Save the DB on disk:
#
# save <seconds> <changes>
#
# Will save the DB if both the given number of seconds and the given
# number of write operations against the DB occurred.
#
# In the example below the behaviour will be to save:
# after 900 sec (15 min) if at least 1 key changed
# after 300 sec (5 min) if at least 10 keys changed
# after 60 sec if at least 10000 keys changed
#
# Note: you can disable saving completely by commenting out all "save" lines.
#
# It is also possible to remove all the previously configured save
# points by adding a save directive with a single empty string argument
# like in the following example:
#
# save ""
save 900 1
save 300 10
save 60 10000
AOF
RDB文件是通过dump整个服务器来实现持久化,AOF是通过将写命令落日志来实现持久化。
当AOF功能打开后,服务器在执行完一个写命令后,会将写命令追加到服务的aof_buf
缓冲区中。
struct redisServer {
sds aof_buf; /* AOF buffer, written before entering the event loop */
};
AOF的刷盘策略有三种,可以在配置文件中配置。默认值是everysec表示每秒同步一次,还有always和no,always表示每次写入请求都会落盘,而no则表示让操作系统自由决定什么时候落盘。no的性能最快,everysec会将落盘操作作为异步任务执行,因此速度其次,最慢的是always。
# The fsync() call tells the Operating System to actually write data on disk
# instead of waiting for more data in the output buffer. Some OS will really flush
# data on disk, some other OS will just try to do it ASAP.
#
# Redis supports three different modes:
#
# no: don't fsync, just let the OS flush the data when it wants. Faster.
# always: fsync after every write to the append only log. Slow, Safest.
# everysec: fsync only one time every second. Compromise.
#
# The default is "everysec", as that's usually the right compromise between
# speed and data safety. It's up to you to understand if you can relax this to
# "no" that will let the operating system flush the output buffer when
# it wants, for better performances (but if you can live with the idea of
# some data loss consider the default persistence mode that's snapshotting),
# or on the contrary, use "always" that's very slow but a bit safer than
# everysec.
#
# More details please check the following article:
# http://antirez.com/post/redis-persistence-demystified.html
#
# If unsure, use "everysec".
# appendfsync always
appendfsync everysec
# appendfsync no
刷盘的代码。
void flushAppendOnlyFile(int force) {
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/* With this append fsync policy we do background fsyncing.
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds. */
if (sync_in_progress) {
//每秒刷盘,且非强制,且有刷盘任务存在了
if (server.aof_flush_postponed_start == 0) {
//之前没有挂起任务,就记录一下当前时间
/* No previous write postponing, remember that we are
* postponing the flush and return. */
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
//已经在同步了,可以把任务延迟
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
return;
}
/* Otherwise fall trough, and go write since we can't wait
* over two seconds. */
server.aof_delayed_fsync++;
serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
...
try_fsync:
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background. */
if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())
return;
/* Perform the fsync if needed. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
//每次都需要刷盘
/* redis_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
latencyStartMonitor(latency);
redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_fsync_offset = server.aof_current_size;
server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) {
//每秒刷的话,如果现在没有后台任务,就加个后台任务
aof_background_fsync(server.aof_fd);
server.aof_fsync_offset = server.aof_current_size;
}
server.aof_last_fsync = server.unixtime;
}
}
在每次事件循环开始前,都会执行刷盘操作。但是是否刷盘,还是取决于策略。
//阻塞前事件
void beforeSleep(struct aeEventLoop *eventLoop) {
...
/* Write the AOF buffer on disk */
//清一下AOF的buffer
flushAppendOnlyFile(0);
...
}
而在定时任务中,也会检查一下是否有挂起的刷盘任务以及超时重试。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
/* AOF postponed flush: Try at every cron cycle if the slow fsync
* completed. */
//又挂起的延迟刷盘任务,就执行
if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
/* AOF write errors: in this case we have a buffer to flush as well and
* clear the AOF error in case of success to make the DB writable again,
* however to try every second is enough in case of 'hz' is set to
* an higher frequency. */
run_with_period(1000) {
if (server.aof_last_write_status == C_ERR)
//每秒重试
flushAppendOnlyFile(0);
}
...
}
我们可以发送aof_rewrite请求,在执行BGSAVE的时候这个请求会被挂起,直到BGSAVE结束后才会执行。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
if (!hasActiveChildProcess() &&
server.aof_rewrite_scheduled)
{
rewriteAppendOnlyFileBackground();
}
...
}
在重启服务器的时候,如果开启了AOF模式,则会自动从AOF文件中加载数据。其具体操作是创建一个伪客户端,之后将记录在AOF文件中的写操作按顺序重演一遍。
/* Function called at startup to load RDB or AOF file in memory. */
//从磁盘恢复数据库
void loadDataFromDisk(void) {
long long start = ustime();
if (server.aof_state == AOF_ON) {
//如果开启了AOF,就从AOF文件中加载
if (loadAppendOnlyFile(server.aof_filename) == C_OK)
serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
}
...
}
/* Replay the append log file. On success C_OK is returned. On non fatal
* error (the append only file is zero-length) C_ERR is returned. On
* fatal error an error message is logged and the program exists. */
//从AOF文件中恢复
int loadAppendOnlyFile(char *filename) {
//这里创建一个伪客户端
struct client *fakeClient;
FILE *fp = fopen(filename,"r");
struct redis_stat sb;
int old_aof_state = server.aof_state;
long loops = 0;
off_t valid_up_to = 0; /* Offset of latest well-formed command loaded. */
off_t valid_before_multi = 0; /* Offset before MULTI command loaded. */
if (fp == NULL) {
serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
exit(1);
}
...
}
可以预测到随着服务器运行时间的增加,AOF文件会越来越大,这会导致每次服务器重启加载AOF文件所要花费的时间也会越来越久。Redis提供了AOF重写的功能,其会根据最新的服务器状态以命令的方式序列化到AOF文件中,其不会包含那些历史上已经被移除的对象,因此可以有效减少AOF文件的大小。
AOF重写过程,类似于rdb的异步导出,采用了子进程的方式。但是还有一个问题,但是还有一个问题,就是AOF重写时,服务器会继续处理客户端的写请求,这样会导致重写完成后新的AOF文件缺少这部分写请求的内容。redis通过创建一个AOF重写缓冲区解决了这个问题,每执行完一个写命令后,它会同时将命令发送给AOF缓冲区和AOF重写缓冲区。
struct redisServer {
list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */
};
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
if (server.aof_state == AOF_ON)
//只有开启了AOF才需要追加到AOF缓冲区中
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
/* If a background append only file rewriting is in progress we want to
* accumulate the differences between the child DB and the current one
* in a buffer, so that when the child process will do its work we
* can append the differences to the new append only file. */
if (server.aof_child_pid != -1)
//如果有aof子进程在跑,恭喜加入缓冲区
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
}
这里比较特殊的是,主进程会不断将aof重写缓冲区的内容推送给子进程,来尽可能减少子进程导出的aof重写文件和真实的服务器状态的差距。
/* Event handler used to send data to the child process doing the AOF
* rewrite. We send pieces of our AOF differences buffer so that the final
* write when the child finishes the rewrite will be small. */
//发送消息给AOF子进程,让它写入差量
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
listNode *ln;
aofrwblock *block;
ssize_t nwritten;
UNUSED(el);
UNUSED(fd);
UNUSED(privdata);
UNUSED(mask);
while(1) {
ln = listFirst(server.aof_rewrite_buf_blocks);
block = ln ? ln->value : NULL;
if (server.aof_stop_sending_diff || !block) {
//最后卸载事件
aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
AE_WRITABLE);
return;
}
if (block->used > 0) {
//向管道推送数据
nwritten = write(server.aof_pipe_write_data_to_child,
block->buf,block->used);
if (nwritten <= 0) return;
memmove(block->buf,block->buf+nwritten,block->used-nwritten);
block->used -= nwritten;
block->free += nwritten;
}
if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
}
}
子进程会在dump任务完成后,退出前接受并处理父进程推来的差量。
/* Write a sequence of commands able to fully rebuild the dataset into
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
*
* In order to minimize the number of commands needed in the rewritten
* log Redis uses variadic commands when possible, such as RPUSH, SADD
* and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
* are inserted using a single command. */
int rewriteAppendOnlyFile(char *filename) {
...
/* Read again a few times to get more data from the parent.
* We can't read forever (the server may receive data from clients
* faster than it is able to send data to the child), so we try to read
* some more data in a loop as soon as there is a good chance more data
* will come. If it looks like we are wasting time, we abort (this
* happens after 20 ms without new data). */
int nodata = 0;
mstime_t start = mstime();
//花费一秒钟处理主进程推来的消息,且连续20毫秒内都没有收到数据就退出
while(mstime()-start < 1000 && nodata < 20) {
//等待主进程
if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
{
nodata++;
continue;
}
nodata = 0; /* Start counting from zero, we stop on N *contiguous*
timeouts. */
aofReadDiffFromParent();
}
/* Ask the master to stop sending diffs. */
//要求父进程停止发送差量(用另外一个管道)
if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr;
if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK)
goto werr;
/* We read the ACK from the server using a 10 seconds timeout. Normally
* it should reply ASAP, but just in case we lose its reply, we are sure
* the child will eventually get terminated. */
//等待5s,读父进程响应
if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||
byte != '!') goto werr;
serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");
/* Read the final diff if any. */
//清理剩下的父进程发送的差量数据
aofReadDiffFromParent();
...
}
但是仅靠上面代码是不足以输出所有的rewrite期间的写事件的,最后需要父进程定时去完成最后的收尾。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* Check if a background saving or AOF rewrite in progress terminated. */
if (hasActiveChildProcess() || ldbPendingChildren())
{
checkChildrenDone();
}
}
//看一下子进程是否结束了
void checkChildrenDone(void) {
...
else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
if (!bysignal && exitcode == 0) receiveChildInfo();
}
...
}
/* A background append only file rewriting (BGREWRITEAOF) terminated its work.
* Handle this. */
//aof rewrite的收尾工作
void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
//把rewrite期间剩下的差量处理掉(父进程负责)
if (aofRewriteBufferWrite(newfd) == -1) {
serverLog(LL_WARNING,
"Error trying to flush the parent diff to the rewritten AOF: %s", strerror(errno));
close(newfd);
goto cleanup;
}
...
//rename将重写文件原子替换为当前的aof文件
if (rename(tmpfile,server.aof_filename) == -1) {
serverLog(LL_WARNING,
"Error trying to rename the temporary AOF file %s into %s: %s",
tmpfile,
server.aof_filename,
strerror(errno));
close(newfd);
if (oldfd != -1) close(oldfd);
goto cleanup;
}
...
}
RDB-AOF混合模式
RDB模式导出的文件小,导入快,但是RDB文件一般实时性差。AOF模式导出的文件大,导入慢,但是AOF文件的实时性强。Redis4.0推出了RDB-AOF混合持久模式,兼具了二者的优点。
混合模式下,导出的AOF文件中的开头部分是RDB文件内容,而RDB代表的是当时的数据库快照,之后新的写入数据会以AOF的格式进行保存。
默认情况下,混合模式是不启动。需要设置服务器的配置项aof-use-rdb-preamble
。
int rewriteAppendOnlyFile(char *filename) {
if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
} else {
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
}
}
集群
复制(源码未分析)
在redis中,我们可以利用slaveof命令,将某台redis服务变成另外一台redis服务的从服务器,两台服务器中保存的数据将保持同步。
如何从主服务器拷贝到从服务器,这个过程非常类似于aof的重写。在执行了salveof命令后,从服务器会向主服务器发送sync命令。主服务器需要通过bgsave命令将目前的数据异步导出成rdb文件,之后将导出的rdb文件发送给从服务器。同时主服务器必须开一个新的缓冲区,将导出开始到目前所发生的写操作都放在缓冲区中,以后需要发送给从服务器。
之后如果主服务器接受到写命令,写命令可能会导致主从数据不一致,主服务器需要负责将造成不一致的写命令传播给从服务器。
如果从服务器和主服务器连接断线重连后,从服务器需要与主服务器进行重新同步。同步的方式有两种sync(完整同步)以及psync(部分同步)。
其实现方式就是让主从服务维护一个复制偏移量,主服务器每次向从服务器传播n个字节的数据的时候,就会将自己的复制偏移量加上n。而从服务器从主服务器收到n个字节的数据的时候,就将自己的复制偏移量加上n。而通过对比主从的复制偏移量就可以判断主从服务器目前是否同步。
那么主服务器知道了从服务器的目前复制偏移量,怎么能找到接下来要发送的指令呢?主服务器会维护一个叫做复制积压缓冲区的固定大小队列,其用于维护最近一段时间的发送给从服务器的数据。很显然由于队列大小有限,比如从服务器断线了两个小时,而固定队列仅保存了最近一个小时应该发送给从服务器的数据,那么也是无法恢复的。此时就会发送sync命令,否则就会利用psync快速恢复从服务器。
Redis给复制积压缓冲区设计的默认大小为1M,可以通过修改repl-backlog-size
设置自定义大小。一般是设置为(断线重连时间X每秒传播数据量X2)。
但是这里还有一个问题,主服务器是如何将复制积压缓冲区与某个重启的客户端重新关联起来?实际上每个服务器在启动的时候都会分配到一个长度为40的随机的16进制字符串。而ID也会一同序列化到本地的RDB和AOF中。
从服务器不会删除过期的对象。只有在显式收到主服务器的del命令才会执行真正的删除。(但是请求从服务器时过期对象是无法取到的,所以和过期了是相同的结果)之所以这么做是为了维持主从服务器数据的一致性。
int expireIfNeeded(redisDb *db, robj *key) {
//未过期
if (!keyIsExpired(db,key)) return 0;
/* If we are running in the context of a slave, instead of
* evicting the expired key from the database, we return ASAP:
* the slave key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys.
*
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time. */
//slave过期了不执行删除,但是只返回正确信息
if (server.masterhost != NULL) return 1;
/* Delete the key */
//真的删除
server.stat_expiredkeys++;
propagateExpire(db,key,server.lazyfree_lazy_expire);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id);
//同步还是异步删除
int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
if (retval) signalModifiedKey(NULL,db,key);
return retval;
}
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
robj *val;
if (expireIfNeeded(db,key) == 1) {
/* Key expired. If we are in the context of a master, expireIfNeeded()
* returns 0 only when the key does not exist at all, so it's safe
* to return NULL ASAP. */
if (server.masterhost == NULL) {
//自己为主服务器
server.stat_keyspace_misses++;
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
return NULL;
}
/* However if we are in the context of a slave, expireIfNeeded() will
* not really try to expire the key, it only returns information
* about the "logical" status of the key: key expiring is up to the
* master in order to have a consistent view of master's data set.
*
* However, if the command caller is not the master, and as additional
* safety measure, the command invoked is a read-only command, we can
* safely return NULL here, and provide a more consistent behavior
* to clients accessign expired values in a read-only fashion, that
* will say the key as non existing.
*
* Notably this covers GETs when slaves are used to scale reads. */
if (server.current_client &&
server.current_client != server.master &&
server.current_client->cmd &&
server.current_client->cmd->flags & CMD_READONLY)
{
//从服务器返回空
server.stat_keyspace_misses++;
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
return NULL;
}
}
val = lookupKey(db,key,flags);
if (val == NULL) {
server.stat_keyspace_misses++;
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
}
else
server.stat_keyspace_hits++;
return val;
}
Sentinel(源码未分析)
Sentinel是Redis的高可用性(HA)的解决方案。由若干个Sentinel实例组成一个系统,负责监视任意多个主服务器以及这些服务器下的从服务器。
如果某个监视中的主服务器下线时长超过设定的上限,Sentinel就会负责从它的从服务器中挑选一个作为新的主服务器,而其余从服务器就成为新的主服务器的从服务器。而原本下线的主服务器在重启后就会成为新主服务器的从服务器。
可以通过redis-server --sentinel
命令将redis以sentinel模式启动。实际上sentinel是一个特殊的redis服务器,但是有如下区别:
- sentinel启动流程和redis服务器不同,它不会从本地文件中恢复数据。
- sentinel支持的命令与redis服务器不同
下面是sentinel支持的命令
struct redisCommand sentinelcmds[] = {
{"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
{"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
{"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
{"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
{"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
{"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
{"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
{"role",sentinelRoleCommand,1,"ok-loading",0,NULL,0,0,0,0,0},
{"client",clientCommand,-2,"read-only no-script",0,NULL,0,0,0,0,0},
{"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0},
{"auth",authCommand,2,"no-auth no-script ok-loading ok-stale fast",0,NULL,0,0,0,0,0},
{"hello",helloCommand,-2,"no-auth no-script fast",0,NULL,0,0,0,0,0}
};
sentinel的服务器的状态:
/* Main state. */
struct sentinelState {
char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
uint64_t current_epoch; /* Current epoch. */
//所有被监视的主服务器名字=>实例的映射
dict *masters; /* Dictionary of master sentinelRedisInstances.
Key is the instance name, value is the
sentinelRedisInstance structure pointer. */
int tilt; /* Are we in TILT mode? */
int running_scripts; /* Number of scripts in execution right now. */
mstime_t tilt_start_time; /* When TITL started. */
mstime_t previous_time; /* Last time we ran the time handler. */
list *scripts_queue; /* Queue of user scripts to execute. */
char *announce_ip; /* IP addr that is gossiped to other sentinels if
not NULL. */
int announce_port; /* Port that is gossiped to other sentinels if
non zero. */
unsigned long simfailure_flags; /* Failures simulation. */
int deny_scripts_reconfig; /* Allow SENTINEL SET ... to change script
paths at runtime? */
} sentinel;
每个redis服务器都对应一个sentinelRedisInstance
对象。
typedef struct sentinelRedisInstance {
int flags; /* See SRI_... defines */
char *name; /* Master name from the point of view of this sentinel. */
char *runid; /* Run ID of this instance, or unique ID if is a Sentinel.*/
uint64_t config_epoch; /* Configuration epoch. */
sentinelAddr *addr; /* Master host. */
instanceLink *link; /* Link to the instance, may be shared for Sentinels. */
mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
we received a hello from this Sentinel
via Pub/Sub. */
mstime_t last_master_down_reply_time; /* Time of last reply to
SENTINEL is-master-down command. */
mstime_t s_down_since_time; /* Subjectively down since time. */
mstime_t o_down_since_time; /* Objectively down since time. */
mstime_t down_after_period; /* Consider it down after that period. */
mstime_t info_refresh; /* Time at which we received INFO output from it. */
dict *renamed_commands; /* Commands renamed in this instance:
Sentinel will use the alternative commands
mapped on this table to send things like
SLAVEOF, CONFING, INFO, ... */
/* Role and the first time we observed it.
* This is useful in order to delay replacing what the instance reports
* with our own configuration. We need to always wait some time in order
* to give a chance to the leader to report the new configuration before
* we do silly things. */
int role_reported;
mstime_t role_reported_time;
mstime_t slave_conf_change_time; /* Last time slave master addr changed. */
/* Master specific. */
dict *sentinels; /* Other sentinels monitoring the same master. */
dict *slaves; /* Slaves for this master instance. */
unsigned int quorum;/* Number of sentinels that need to agree on failure. */
int parallel_syncs; /* How many slaves to reconfigure at same time. */
char *auth_pass; /* Password to use for AUTH against master & replica. */
char *auth_user; /* Username for ACLs AUTH against master & replica. */
/* Slave specific. */
mstime_t master_link_down_time; /* Slave replication link down time. */
int slave_priority; /* Slave priority according to its INFO output. */
mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
struct sentinelRedisInstance *master; /* Master instance if it's slave. */
char *slave_master_host; /* Master host as reported by INFO */
int slave_master_port; /* Master port as reported by INFO */
int slave_master_link_status; /* Master link status as reported by INFO */
unsigned long long slave_repl_offset; /* Slave replication offset. */
/* Failover */
char *leader; /* If this is a master instance, this is the runid of
the Sentinel that should perform the failover. If
this is a Sentinel, this is the runid of the Sentinel
that this Sentinel voted as leader. */
uint64_t leader_epoch; /* Epoch of the 'leader' field. */
uint64_t failover_epoch; /* Epoch of the currently started failover. */
int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
mstime_t failover_state_change_time;
mstime_t failover_start_time; /* Last failover attempt start time. */
mstime_t failover_timeout; /* Max time to refresh failover state. */
mstime_t failover_delay_logged; /* For what failover_start_time value we
logged the failover delay. */
struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
/* Scripts executed to notify admin or reconfigure clients: when they
* are set to NULL no script is executed. */
char *notification_script;
char *client_reconfig_script;
sds info; /* cached INFO output */
} sentinelRedisInstance;
Sentinel会创建与所有被监视的主服务器之间的连接,并伪装成redis服务器的一个客户端。实际上sentinel会与每个主服务器建立两个网络连接,一个是命令连接,用于向主服务器发布命令,另外一个是订阅连接,用于订阅__sentinel__:hello
频道。
Sentinel默认以10s一次的频率,通过命令连接向主服务器发送INFO命令,获取主服务器的状态,其中就包括主服务器下的所有从属服务器。
当Sentinel发现主服务器出现新的从属服务器,Sentinel同样会以客户端的身份与它们创建网络连接。
在默认情况下,Sentinel会以两秒一次的频率,通过订阅连接向所有被监视的主服务器和从服务器发布消息,消息包含自己的信息。对于监视同一台redis服务器的Sentinel,就可以收到彼此发送的消息,从而更新其它Sentinel的信息。
如果sentinel发现新的sentinel(监视的主服务器有交集),那么前者会向后者以客户端的身份创建一个命令连接。
Sentinel会以每秒一次(默认)的频率向其余服务器(主从或sentinel)发送PING命令,并根据回复来判断实例是否在线。如果一个服务器连续down-after-milliseconds
毫秒内,连续向sentinel返回无效回复,那么就会被认定为下线了。
这种下线是主观的,如果服务器是主服务器,那么需要进一步验证个猜想,sentinel会向其它sentinel伙伴询问是否它们的看法,如果接收到足够多的已下线判断(超过quorum
),sentinel就会将主观下线的服务器视作客观下线,并对主服务器进行故障转移操作。
在进行故障转移之前,需要进行sentinel选主。对于所有监视下线主服务器的sentinel中,它们要通过选主算法选出一个leader,并由leader决定哪个从服务器应该晋升。
sentinel的选主算法采用的是raft算法。在每一轮中,每个发现主服务器客观下线的sentinel会要求其余sentinel选自己作为leader(获得它的选票)。按照先到先得的规则,其余sentinel会将选票提供给当前选主轮中第一个到达的请求。如果一台sentinel得到了半数以上的选票,就正式成为leader。如果每个一台sentinel得到半数以上的票,那么需要在一定时间后重新执行选主。
故障转移的部分,leader需要先选择一个状态良好且数据完整(即选择复制偏移量最大的从服务器)的从服务器,然后发送slave no one命令给它,将这个服务器转换为主服务器。在发送了命令,sentinel会每秒一次地发送INFO命令给这个服务器,如果它的状态变成了master,那么从服务器就成功地升级成了主服务器。
当升级成功了后,leader需要通过slaveof命令让其余从服务器去复制新的主服务器。同时等原来的主服务器上线后,leader也会负责将它转换为新的主服务器的从服务器。
集群部署(源码未分析)
Redis集群是redis提供的分布式解决方案。集群通过分片实现数据共享。
每个redis主服务器都对应一个集群节点。节点之间通过边连通成网络。我们可以通过CLUSTER MEET
命令从当前节点当指定节点建立一条边。发送命令后,两个节点会进行握手,握手成功后前者就加入了后者所在的集群。初始的时候之后后者知道前者的存在,但是后者会通过gossip算法将前者的信息同步给同个集群的其它节点。
要让redis主服务器变成节点,需要配置cluster-enable
选项。
集群节点的代码。
typedef struct clusterNode {
mstime_t ctime; /* Node object creation time. */
char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
int flags; /* CLUSTER_NODE_... */
uint64_t configEpoch; /* Last configEpoch observed for this node */
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
int numslots; /* Number of slots handled by this node */
int numslaves; /* Number of slave nodes, if this is a master */
struct clusterNode **slaves; /* pointers to slave nodes */
struct clusterNode *slaveof; /* pointer to the master node. Note that it
may be NULL even if the node is a slave
if we don't have the master node in our
tables. */
mstime_t ping_sent; /* Unix time we sent latest ping */
mstime_t pong_received; /* Unix time we received the pong */
mstime_t data_received; /* Unix time we received any data */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t voted_time; /* Last time we voted for a slave of this master */
mstime_t repl_offset_time; /* Unix time we received offset for this node */
mstime_t orphaned_time; /* Starting time of orphaned master condition */
long long repl_offset; /* Last known repl offset for this node. */
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
int port; /* Latest known clients port of this node */
int cport; /* Latest known cluster port of this node. */
clusterLink *link; /* TCP/IP link with this node */
list *fail_reports; /* List of nodes signaling this as failing */
} clusterNode;
集群通过分片的方式保存数据库的键值对。集群的整个数据库被分为$2^{14}$个slot,数据库每个键都属于这些slot中的一个。如果所有slot都有节点在处理,那么集群处于上线状态,否则处于下线状态。
redis中采用slots成员来实现一个bitset,其中记录哪些槽是当前节点负责的。同时节点会将自己的slots信息通过gossip协议传播给其它的节点,因此最终每个节点都会得知集群中所有节点负责哪些slots。
typedef struct clusterNode {
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
}
redis也建立了槽到负责节点的索引。
typedef struct clusterState {
clusterNode *slots[CLUSTER_SLOTS];
} clusterState;
一个键key
属于哪个slot,是通过crc16(key) & ((1 << 14) - 1)
得到的,即键的crc16编码形式的后14位。
如果客户端将操作某个对象的请求发送给负责这个对象的节点,那么节点会直接处理请求。如果发送给了错误的节点,那么节点会返回一个moved错误(错误中包含正确的节点地址),并指引客户端将请求转发给正确的节点。注意客户端接收到moved错误后,这个客户端所有对这个对象之后都会直接发送给正确的节点。
集群还支持在线重新分片。即将任意数量已经指派给源节点的槽指派给另一个节点。重新分片是由redis-trib负责执行的。redis-trib会向源节点和目标节点发送命令来完成工作。其工作原理是redis-trib先向源节点和目标节点发送准备指令,得到回复后,redis-trib会不断从源节点取若干个迁移槽中的键,对于每个返回的键,redis-trib会向源节点发送一个迁移命令,要求它将这个键迁移到目标节点中。完成了槽中所有的对象的迁移后,redis-trib还需要通知集群中的某些节点,槽迁移了,之后由这些节点传播给整个集群。如果要迁移复数和slot,就相当于把上面过程重新执行若干次。
当然由于迁移不是原子性的,因此对于迁移槽,可能部分键迁移了,部分键还留在源节点中。如果这时候客户端向源主机请求一个已经被迁移的键,源主机发现键不存在,有可能已经迁移到目标主机了,对应源主机会向客户端返回一个ASK错误,让客户端转发请求给目标主机。当然如果键还未被迁移,源主机会直接负责处理命令。ASK错误是临时的,因此客户端不会进行重定向,之后对相同的对象的访问依旧会被发送给源服务器。
集群也支持主从,节点分成主节点和从节点,每个从节点都会复制某个主节点,在主节点下线的时候,会选择一个它的从节点来升级为主节点。一个从节点在复制某个主节点这一事件会通知给所有集群中的节点,最终大家都会知道。
集群中的每个节点都会定期向其它所有节点发送PING消息,接受到PING消息的主机会返回一个PONG消息。如果超时未收到某个主机返回的PONG消息,那么发送方会认为接收方可能下线了。如果集群中的半数以上都认为某个节点可能下线,那么就将这个节点标记为已经下线,并会向集群发送一个广播,通知所有节点这个事件。
当一个从节点发现自己复制的主节点进入了下线的状态,从节点就会进行故障转移。具体步骤如下:
- 某个从节点执行SAVEOF no one命令,称为新的主节点。
- 新的主节点会将原来主节点指派的slot全部重新指派给自己。
- 新的主节点向集群中所有节点发送一个PONG消息,通知所有节点自己已经成为了主节点。
选举算法大概如下,每个下线的主节点的从节点都成为可投票对象,如果哪个从节点在一轮投票过程中收集到了半数以上的投票,那么就成为新的主节点,否则就等待一段时间后进行下一轮投票。
事件
Redis是事件驱动的,服务器需要处理两类事件。
- 文件事件:redis通过套接字与客户端(或其它redis服务器)进行连接,而文件事件就是服务器对套接字操作的抽象。服务器通过监听并处理这些事件完成网络通信。
- 时间事件:redis服务器定时执行一些操作,比如
serverCron
。
文件事件
Redis基于Reactor模式开发了自己的网络事件处理器。这个处理器称为文件事件处理器。文件事件处理器使用IO多路复用来实现同时监听多个套接字,并根据套接字执行的任务来为套接字关联不同的事件处理器。而当被监听的套接字准备好读写关闭等操作时,与操作相关的文件事件就会发生,这时文件事件处理器会调用与套接字关联事件处理器来处理这些事件。
文件处理器以单线程运行,但是通过多路复用可以同时监听多个套接字,从而实现高性能的网络通信模型。
Redis的多路复用是通过对常用的select、epoll、evport和kqueue这些IO多路复用函数库实现的,每个实现方式都对应一个c文件,比如ae_evport.c
,ae_epoll.c
,ae_kqueue.c
,ae_select.c
,它们中定义了同名的方法(但是实现不同)。Redis会通过宏选择性能最好的方式。
/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
文件事件发生的时机:
- AE_READABLE事件:当套接字变的可读(客户端对套接字执行write或close操作)
- AE_WRITABLE事件:如果套接字变的可写(客户端对套接字执行read操作)。
下面展示的是ae_select.c
中的多路复用等待事件的代码。
//多路复用系列
//发现事件,记录在eventLoop->fired中(队列形式)
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, j, numevents = 0;
memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
//阻塞等待,超时时间在tvp中
retval = select(eventLoop->maxfd+1,
&state->_rfds,&state->_wfds,NULL,tvp);
if (retval > 0) {
for (j = 0; j <= eventLoop->maxfd; j++) {
int mask = 0;
aeFileEvent *fe = &eventLoop->events[j];
if (fe->mask == AE_NONE) continue;
if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
mask |= AE_WRITABLE;
eventLoop->fired[numevents].fd = j;
eventLoop->fired[numevents].mask = mask;
numevents++;
}
}
return numevents;
}
下面展示的是事件循环中如何处理文件事件的。
//处理事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
...
//执行阻塞前事件
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
//等待客户端事件
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
/* Normally we execute the readable event first, and the writable
* event laster. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if AE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsynching a file to disk,
* before replying to a client. */
//如果有AE_BARRIER指令,就不将读命令排到写命令之前
int invert = fe->mask & AE_BARRIER;
/* Note the "fe->mask & mask & ..." code: maybe an already
* processed event removed an element that fired and we still
* didn't processed, so we check if the event is still valid.
*
* Fire the readable event if the call sequence is not
* inverted. */
if (!invert && fe->mask & mask & AE_READABLE) {
//如果没有AE_BARRIER,先读
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
//再写
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert) {
//否则最后读
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
...
}
对于客户端连接,目前读处理器在不使用tls的情况下,是readQueryFromClient
,而写处理器使用的是sendReplyToClient
。后者用于将响应内容返回给客户端。
时间事件
服务器将所有时间事件都放在一个无序列表中,每次都遍历列表找当前时间或之前发生的事件,并处理。
/* Time event structure */
//时间事件对象
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
int refcount; /* refcount to prevent timer events from being
* freed in recursive time event calls. */
} aeTimeEvent;
/* State of an event based program */
typedef struct aeEventLoop {
// 时间事件链表
aeTimeEvent *timeEventHead;
} aeEventLoop;
每次执行事件循环的时候都会遍历时间序列中的元素,找出最近发生的事件,以决定等待多路复用最多允许的时间。
//搜索最近发生的事件
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
aeTimeEvent *te = eventLoop->timeEventHead;
aeTimeEvent *nearest = NULL;
while(te) {
if (!nearest || te->when_sec < nearest->when_sec ||
(te->when_sec == nearest->when_sec &&
te->when_ms < nearest->when_ms))
nearest = te;
te = te->next;
}
return nearest;
}
//处理事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
...
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
/* How many milliseconds we need to wait for the next
* time event to fire? */
//距离下个事件等待时间,下面阻塞等待事件花费的时间不能超过这个
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
//没有事件哦哦
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
//允许等待
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
if (eventLoop->flags & AE_DONT_WAIT) {
//不允许等待
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
}
//执行阻塞前事件
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
...
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
}
下面是具体处理时间事件的代码。
/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
...
while(te) {
...
//不处理这次处理定时任务循环加入的定时任务
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
te->refcount++;
//执行定时任务
retval = te->timeProc(eventLoop, id, te->clientData);
te->refcount--;
processed++;
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
...
}
...
}
多线程IO
Redis6.0引入了多线程IO,因此实际上真正从客户端和服务器读数据的不一定是处理事件循环的线程。其具体代码在networking.c
中。
服务器启动的最后一步会创建多线程IO。
/* Some steps in server initialization need to be done last (after modules
* are loaded).
* Specifically, creation of threads due to a race bug in ld.so, in which
* Thread Local Storage initialization collides with dlopen call.
* see: https://sourceware.org/bugzilla/show_bug.cgi?id=19329 */
//服务器初始化的最后一步,生或死
void InitServerLast() {
bioInit();
initThreadedIO();
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
/* Initialize the data structures needed for threaded I/O. */
//初始化IO线程
void initThreadedIO(void) {
io_threads_active = 0; /* We start with threads not active. */
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
//如果只需要一个IO线程,当前线程就够了
if (server.io_threads_num == 1) return;
//线程数也不能太多
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
//0号线程不需要创建(就是咱了)
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
//IOThreadMain是它们的主函数
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
0号线程是主线程(负责处理事件循环的那个),其余线程都是工作线程,其循环如下:
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
long id = (unsigned long)myid;
char thdname[16];
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
redisSetCpuAffinity(server.server_cpulist);
while(1) {
/* Wait for start */
//等待开始(不能一直拿锁和主线程冲突)
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
/* Give the main thread a chance to stop this thread. */
if (io_threads_pending[id] == 0) {
//如果线程挂起,就给主线程一个机会暂停当前线程
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(io_threads_pending[id] != 0);
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
/* Process: note that the main thread will never touch our list
* before we drop the pending count to 0. */
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
//为客户端执行读写操作
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Done\n", id);
}
}
只要挂起的读写操作的客户端比较少,那么就会停用多线程,而是主线程负责所有的读写。
/* This function checks if there are not enough pending clients to justify
* taking the I/O threads active: in that case I/O threads are stopped if
* currently active. We track the pending writes as a measure of clients
* we need to handle in parallel, however the I/O threading is disabled
* globally for reads as well if we have too little pending clients.
*
* The function returns 0 if the I/O threading should be used becuase there
* are enough active threads, otherwise 1 is returned and the I/O threads
* could be possibly stopped (if already active) as a side effect. */
int stopThreadedIOIfNeeded(void) {
int pending = listLength(server.clients_pending_write);
/* Return ASAP if IO threads are disabled (single threaded mode). */
if (server.io_threads_num == 1) return 1;
//挂起的写操作不超过线程的一倍,就停止一些线程(一个人分配不到两个工作)
if (pending < (server.io_threads_num*2)) {
if (io_threads_active) stopThreadedIO();
return 1;
} else {
return 0;
}
其具体的停止方法,就是让主线程拿走所有工作线程的各自的锁,而工作线程会在闲置的情况下不断加锁解锁尝试被阻塞。
void stopThreadedIO(void) {
/* We may have still clients with pending reads when this function
* is called: handle them before stopping the threads. */
//停止IO线程前,需要先处理挂起的读请求
handleClientsWithPendingReadsUsingThreads();
if (tio_debug) { printf("E"); fflush(stdout); }
if (tio_debug) printf("--- STOPPING THREADED IO [R%d] [W%d] ---\n",
(int) listLength(server.clients_pending_read),
(int) listLength(server.clients_pending_write));
serverAssert(io_threads_active == 1);
//0位主线程因此不阻塞
for (int j = 1; j < server.io_threads_num; j++)
//停止线程的方式就是我们拿到它们的专属锁,在它们取锁的时候,就会阻塞
pthread_mutex_lock(&io_threads_mutex[j]);
io_threads_active = 0;
}
void *IOThreadMain(void *myid) {
...
while(1) {
/* Wait for start */
//等待开始(不能一直拿锁和主线程冲突)
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
/* Give the main thread a chance to stop this thread. */
if (io_threads_pending[id] == 0) {
//如果线程挂起,就给主线程一个机会暂停当前线程
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
...
}
...
}
在beforeSleep
方法之前会执行触发挂起的读写请求。
void beforeSleep(struct aeEventLoop *eventLoop) {
...
/* We should handle pending reads clients ASAP after event loop. */
handleClientsWithPendingReadsUsingThreads();
...
/* Handle writes with pending output buffers. */
//处理挂起的写操作
handleClientsWithPendingWritesUsingThreads();
...
}
网络连接
Redis用类似面向对象的方式实现了网络连接部分的内容。
typedef struct ConnectionType {
void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
int (*write)(struct connection *conn, const void *data, size_t data_len);
int (*read)(struct connection *conn, void *buf, size_t buf_len);
void (*close)(struct connection *conn);
int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
const char *(*get_last_error)(struct connection *conn);
int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
} ConnectionType;
struct connection {
ConnectionType *type;
ConnectionState state;
short int flags;
short int refs;
int last_errno;
void *private_data;
ConnectionCallbackFunc conn_handler;
ConnectionCallbackFunc write_handler;
ConnectionCallbackFunc read_handler;
int fd;
};
Socket类型绑定的策略如下:
ConnectionType CT_Socket = {
.ae_handler = connSocketEventHandler,
.close = connSocketClose,
.write = connSocketWrite,
.read = connSocketRead,
.accept = connSocketAccept,
.connect = connSocketConnect,
.set_write_handler = connSocketSetWriteHandler,
.set_read_handler = connSocketSetReadHandler,
.get_last_error = connSocketGetLastError,
.blocking_connect = connSocketBlockingConnect,
.sync_write = connSocketSyncWrite,
.sync_read = connSocketSyncRead,
.sync_readline = connSocketSyncReadLine
};
在接受到连接后会装配客户端对象。
//接受新的客户端连接请求
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
/* Create connection and client */
if ((c = createClient(conn)) == NULL) {
char conninfo[100];
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (conn: %s)",
connGetLastError(conn),
connGetInfo(conn, conninfo, sizeof(conninfo)));
connClose(conn); /* May be already closed, just ignore errors */
return;
}
}
我们可以通过配置文件设置是否使用心跳以及超时时间。
# Unix socket.
#
# Specify the path for the Unix socket that will be used to listen for
# incoming connections. There is no default, so Redis will not listen
# on a unix socket when not specified.
#
# unixsocket /tmp/redis.sock
# unixsocketperm 700
# Close the connection after a client is idle for N seconds (0 to disable)
timeout 0
# TCP keepalive.
#
# If non-zero, use SO_KEEPALIVE to send TCP ACKs to clients in absence
# of communication. This is useful for two reasons:
#
# 1) Detect dead peers.
# 2) Take the connection alive from the point of view of network
# equipment in the middle.
#
# On Linux, the specified value (in seconds) is the period used to send ACKs.
# Note that to close the connection the double of the time is needed.
# On other kernels the period depends on the kernel configuration.
#
# A reasonable value for this option is 60 seconds.
tcp-keepalive 0
Redis中的心跳使用的是TCP的option选项。
/* Set TCP keep alive option to detect dead peers. The interval option
* is only used for Linux as we are using Linux-specific APIs to set
* the probe send time, interval, and count. */
int anetKeepAlive(char *err, int fd, int interval)
{
int val = 1;
//设置过期
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) == -1)
{
anetSetError(err, "setsockopt SO_KEEPALIVE: %s", strerror(errno));
return ANET_ERR;
}
#ifdef __linux__
/* Default settings are more or less garbage, with the keepalive time
* set to 7200 by default on Linux. Modify settings to make the feature
* actually useful. */
/* Send first probe after interval. */
val = interval;
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val)) < 0) {
anetSetError(err, "setsockopt TCP_KEEPIDLE: %s\n", strerror(errno));
return ANET_ERR;
}
/* Send next probes after the specified interval. Note that we set the
* delay as interval / 3, as we send three probes before detecting
* an error (see the next setsockopt call). */
val = interval/3;
if (val == 0) val = 1;
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &val, sizeof(val)) < 0) {
anetSetError(err, "setsockopt TCP_KEEPINTVL: %s\n", strerror(errno));
return ANET_ERR;
}
/* Consider the socket in error state after three we send three ACK
* probes without getting a reply. */
val = 3;
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &val, sizeof(val)) < 0) {
anetSetError(err, "setsockopt TCP_KEEPCNT: %s\n", strerror(errno));
return ANET_ERR;
}
#else
((void) interval); /* Avoid unused var warning for non Linux systems. */
#endif
return ANET_OK;
}
geohash
在二维平面中geohash是将所有位置信息用四方树来管理,同时用哈希值来表示某个点在四方树中的位置(四方树的任意一层,有四个孩子,因此可以编码为两个二进制位),这个哈希值就是这个点的geohash。两个距离较近的点,公共前缀就会相对较大(但是这不是绝对的)。要查找以某个点为圆心半径为$r$的的圆内所有的点,需要找到一个最大的深度$k$,这一层中每个四方树结点的横坐标和纵坐标的范围都大于$k$。我们在这一层中,找到圆心对应的点以及周围8个点,之后暴力查找即可(这实际上是一个有效的剪枝)。对应的geohash值就是要找前缀为9个不同值的所有点。
实际上geohash可以很容易扩展到三维空间中去,这时候用8方树来管理,每一层的信息正好对应3个二进制位。
redis中也支持geohash。其包含geoadd
、geodist
、geohash
、georadius
等命令。
下面是通过点的经纬度计算geohash的源码:
#define GEO_STEP_MAX 26 /* 26*2 = 52 bits. */
int geohashEncode(const GeoHashRange *long_range, const GeoHashRange *lat_range,
double longitude, double latitude, uint8_t step,
GeoHashBits *hash) {
/* Check basic arguments sanity. */
if (hash == NULL || step > 32 || step == 0 ||
RANGEPISZERO(lat_range) || RANGEPISZERO(long_range)) return 0;
/* Return an error when trying to index outside the supported
* constraints. */
if (longitude > GEO_LONG_MAX || longitude < GEO_LONG_MIN ||
latitude > GEO_LAT_MAX || latitude < GEO_LAT_MIN) return 0;
hash->bits = 0;
hash->step = step;
if (latitude < lat_range->min || latitude > lat_range->max ||
longitude < long_range->min || longitude > long_range->max) {
return 0;
}
//将经度和纬度都规约到[0,1)之间
double lat_offset =
(latitude - lat_range->min) / (lat_range->max - lat_range->min);
double long_offset =
(longitude - long_range->min) / (long_range->max - long_range->min);
//这里step默认为26,因此只有高26位会被考虑,约7位精度
/* convert to fixed point based on the step size */
lat_offset *= (1ULL << step);
long_offset *= (1ULL << step);
//将它们交错形成哈希值
hash->bits = interleave64(lat_offset, long_offset);
return 1;
}
/* Interleave lower bits of x and y, so the bits of x
* are in the even positions and bits from y in the odd;
* x and y must initially be less than 2**32 (65536).
* From: https://graphics.stanford.edu/~seander/bithacks.html#InterleaveBMN
*/
static inline uint64_t interleave64(uint32_t xlo, uint32_t ylo) {
static const uint64_t B[] = {0x5555555555555555ULL, 0x3333333333333333ULL,
0x0F0F0F0F0F0F0F0FULL, 0x00FF00FF00FF00FFULL,
0x0000FFFF0000FFFFULL};
static const unsigned int S[] = {1, 2, 4, 8, 16};
uint64_t x = xlo;
uint64_t y = ylo;
x = (x | (x << S[4])) & B[4];
y = (y | (y << S[4])) & B[4];
x = (x | (x << S[3])) & B[3];
y = (y | (y << S[3])) & B[3];
x = (x | (x << S[2])) & B[2];
y = (y | (y << S[2])) & B[2];
x = (x | (x << S[1])) & B[1];
y = (y | (y << S[1])) & B[1];
x = (x | (x << S[0])) & B[0];
y = (y | (y << S[0])) & B[0];
return x | (y << 1);
}
int geohashEncodeType(double longitude, double latitude, uint8_t step, GeoHashBits *hash) {
//在r中存储经度和纬度的范围
GeoHashRange r[2] = 0;
geohashGetCoordRange(&r[0], &r[1]);
return geohashEncode(&r[0], &r[1], longitude, latitude, step, hash);
}
int geohashEncodeWGS84(double longitude, double latitude, uint8_t step,
GeoHashBits *hash) {
return geohashEncodeType(longitude, latitude, step, hash);
}
//最后的计算过程
int geohashEncode(const GeoHashRange *long_range, const GeoHashRange *lat_range,
double longitude, double latitude, uint8_t step,
GeoHashBits *hash) {
/* Check basic arguments sanity. */
if (hash == NULL || step > 32 || step == 0 ||
RANGEPISZERO(lat_range) || RANGEPISZERO(long_range)) return 0;
/* Return an error when trying to index outside the supported
* constraints. */
//顶点越界,就报错
if (longitude > GEO_LONG_MAX || longitude < GEO_LONG_MIN ||
latitude > GEO_LAT_MAX || latitude < GEO_LAT_MIN) return 0;
hash->bits = 0;
hash->step = step;
//顶点越界,就报错
if (latitude < lat_range->min || latitude > lat_range->max ||
longitude < long_range->min || longitude > long_range->max) {
return 0;
}
//纬度偏移[0,1]
double lat_offset =
(latitude - lat_range->min) / (lat_range->max - lat_range->min);
//经度偏移[0,1]
double long_offset =
(longitude - long_range->min) / (long_range->max - long_range->min);
//乘上一个系数,保留整数的部分
/* convert to fixed point based on the step size */
lat_offset *= (1ULL << step);
long_offset *= (1ULL << step);
hash->bits = interleave64(lat_offset, long_offset);
return 1;
}
在redis中通过zset来保存geohash到地点名称的映射。
/* GEOADD key long lat name [long2 lat2 name2 ... longN latN nameN] */
void geoaddCommand(client *c) {
/* Check arguments number for sanity. */
//参数需要能整除3
if ((c->argc - 2) % 3 != 0) {
/* Need an odd number of arguments if we got this far... */
addReplyError(c, "syntax error. Try GEOADD key [x1] [y1] [name1] "
"[x2] [y2] [name2] ... ");
return;
}
//有多少个元素
int elements = (c->argc - 2) / 3;
int argc = 2+elements*2; /* ZADD key score ele ... */
robj **argv = zcalloc(argc*sizeof(robj*));
//丢到zset中去,这样就能很快找到geohash的前缀为x的所有点了
argv[0] = createRawStringObject("zadd",4);
argv[1] = c->argv[1]; /* key */
incrRefCount(argv[1]);
/* Create the argument vector to call ZADD in order to add all
* the score,value pairs to the requested zset, where score is actually
* an encoded version of lat,long. */
int i;
//按点逐个处理
for (i = 0; i < elements; i++) {
double xy[2];
//读取经纬度
if (extractLongLatOrReply(c, (c->argv+2)+(i*3),xy) == C_ERR) {
for (i = 0; i < argc; i++)
if (argv[i]) decrRefCount(argv[i]);
zfree(argv);
return;
}
/* Turn the coordinates into the score of the element. */
GeoHashBits hash;
//分别使用26位来表示精度和纬度
geohashEncodeWGS84(xy[0], xy[1], GEO_STEP_MAX, &hash);
//搞出哈希值
GeoHashFix52Bits bits = geohashAlign52Bits(hash);
//转字符串
robj *score = createObject(OBJ_STRING, sdsfromlonglong(bits));
//val存的是地点名称
robj *val = c->argv[2 + i * 3 + 2];
argv[2+i*2] = score;
argv[3+i*2] = val;
incrRefCount(val);
}
//最后调用zadd命令,把geohash=>地点存入zset中。
/* Finally call ZADD that will do the work for us. */
replaceClientCommandVector(c,argc,argv);
zaddCommand(c);
}
可以发现在geoadd中不会把地点的经纬度存进去,那么到时候需要点的经纬度的时候如何恢复呢,下面是恢复的代码:
//将bits进行解码
int decodeGeohash(double bits, double *xy) {
GeoHashBits hash = { .bits = (uint64_t)bits, .step = GEO_STEP_MAX };
return geohashDecodeToLongLatWGS84(hash, xy);
}
int geohashDecodeToLongLatWGS84(const GeoHashBits hash, double *xy) {
return geohashDecodeToLongLatType(hash, xy);
}
int geohashDecodeToLongLatType(const GeoHashBits hash, double *xy) {
GeoHashArea area = 0;
if (!xy || !geohashDecodeType(hash, &area))
return 0;
return geohashDecodeAreaToLongLat(&area, xy);
}
//从geohash中搞出geohashrange
int geohashDecodeType(const GeoHashBits hash, GeoHashArea *area) {
GeoHashRange r[2] = 0;
geohashGetCoordRange(&r[0], &r[1]);
return geohashDecode(r[0], r[1], hash, area);
}
//真正的解码代码,搞出范围
int geohashDecode(const GeoHashRange long_range, const GeoHashRange lat_range,
const GeoHashBits hash, GeoHashArea *area) {
if (HASHISZERO(hash) || NULL == area || RANGEISZERO(lat_range) ||
RANGEISZERO(long_range)) {
return 0;
}
area->hash = hash;
uint8_t step = hash.step;
uint64_t hash_sep = deinterleave64(hash.bits); /* hash = [LAT][LONG] */
double lat_scale = lat_range.max - lat_range.min;
double long_scale = long_range.max - long_range.min;
//纬度的哈希值
uint32_t ilato = hash_sep; /* get lat part of deinterleaved hash */
//经度的哈希值
uint32_t ilono = hash_sep >> 32; /* shift over to get long part of hash */
/* divide by 2**step.
* Then, for 0-1 coordinate, multiply times scale and add
to the min to get the absolute coordinate. */
//恢复出四方边界
area->latitude.min =
lat_range.min + (ilato * 1.0 / (1ull << step)) * lat_scale;
area->latitude.max =
lat_range.min + ((ilato + 1) * 1.0 / (1ull << step)) * lat_scale;
area->longitude.min =
long_range.min + (ilono * 1.0 / (1ull << step)) * long_scale;
area->longitude.max =
long_range.min + ((ilono + 1) * 1.0 / (1ull << step)) * long_scale;
return 1;
}
//通过范围搞出中心
int geohashDecodeAreaToLongLat(const GeoHashArea *area, double *xy) {
if (!xy) return 0;
//取中心加裁剪
xy[0] = (area->longitude.min + area->longitude.max) / 2;
if (xy[0] > GEO_LONG_MAX) xy[0] = GEO_LONG_MAX;
if (xy[0] < GEO_LONG_MIN) xy[0] = GEO_LONG_MIN;
xy[1] = (area->latitude.min + area->latitude.max) / 2;
if (xy[1] > GEO_LAT_MAX) xy[1] = GEO_LAT_MAX;
if (xy[1] < GEO_LAT_MIN) xy[1] = GEO_LAT_MIN;
return 1;
}
接下来看看geodist方法的实现。redis计算geodist的时候用了一个haversin great circle distance formula来计算圆上两点的最短距离。
/* GEODIST key ele1 ele2 [unit]
*
* Return the distance, in meters by default, otherwise accordig to "unit",
* between points ele1 and ele2. If one or more elements are missing NULL
* is returned. */
//计算两点的距离
void geodistCommand(client *c) {
double to_meter = 1;
/* Check if there is the unit to extract, otherwise assume meters. */
if (c->argc == 5) {
//如果需要取单位
to_meter = extractUnitOrReply(c,c->argv[4]);
if (to_meter < 0) return;
} else if (c->argc > 5) {
//参数不可能超过5
addReply(c,shared.syntaxerr);
return;
}
/* Look up the requested zset */
robj *zobj = NULL;
if ((zobj = lookupKeyReadOrReply(c, c->argv[1], shared.null[c->resp]))
== NULL || checkType(c, zobj, OBJ_ZSET)) return;
/* Get the scores. We need both otherwise NULL is returned. */
double score1, score2, xyxy[4];
if (zsetScore(zobj, c->argv[2]->ptr, &score1) == C_ERR ||
zsetScore(zobj, c->argv[3]->ptr, &score2) == C_ERR)
{
addReplyNull(c);
return;
}
/* Decode & compute the distance. */
if (!decodeGeohash(score1,xyxy) || !decodeGeohash(score2,xyxy+2))
addReplyNull(c);
else
//搞出两个点的坐标
addReplyDoubleDistance(
c, geohashGetDistance(xyxy[0], xyxy[1], xyxy[2], xyxy[3]) / to_meter);
}
/* Calculate distance using haversin great circle distance formula. */
//用haversin great circle distance formula来算给定经纬度圆上最短距离
double geohashGetDistance(double lon1d, double lat1d, double lon2d, double lat2d) {
double lat1r, lon1r, lat2r, lon2r, u, v;
//搞出经纬度弧度
lat1r = deg_rad(lat1d);
lon1r = deg_rad(lon1d);
lat2r = deg_rad(lat2d);
lon2r = deg_rad(lon2d);
u = sin((lat2r - lat1r) / 2);
v = sin((lon2r - lon1r) / 2);
return 2.0 * EARTH_RADIUS_IN_METERS *
asin(sqrt(u * u + cos(lat1r) * cos(lat2r) * v * v));
}
最后来看一下georadius命令:
/* GEORADIUS key x y radius unit [WITHDIST] [WITHHASH] [WITHCOORD] [ASC|DESC]
* [COUNT count] [STORE key] [STOREDIST key]
* GEORADIUSBYMEMBER key member radius unit ... options ... */
void georadiusGeneric(client *c, int flags) {
robj *key = c->argv[1];
robj *storekey = NULL;
int storedist = 0; /* 0 for STORE, 1 for STOREDIST. */
/* Look up the requested zset */
//先找到zset
robj *zobj = NULL;
if ((zobj = lookupKeyReadOrReply(c, key, shared.emptyarray)) == NULL ||
checkType(c, zobj, OBJ_ZSET)) {
return;
}
/* Find long/lat to use for radius search based on inquiry type */
int base_args;
double xy[2] = { 0 };
if (flags & RADIUS_COORDS) {
//如果圆心是给定的坐标
base_args = 6;
if (extractLongLatOrReply(c, c->argv + 2, xy) == C_ERR)
return;
} else if (flags & RADIUS_MEMBER) {
//如果圆心是某个zset中存着的点
base_args = 5;
robj *member = c->argv[2];
if (longLatFromMember(zobj, member, xy) == C_ERR) {
addReplyError(c, "could not decode requested zset member");
return;
}
} else {
addReplyError(c, "Unknown georadius search type");
return;
}
/* Extract radius and units from arguments */
double radius_meters = 0, conversion = 1;
if ((radius_meters = extractDistanceOrReply(c, c->argv + base_args - 2,
&conversion)) < 0) {
return;
}
/* Discover and populate all optional parameters. */
int withdist = 0, withhash = 0, withcoords = 0;
int sort = SORT_NONE;
long long count = 0;
if (c->argc > base_args) {
//搞出剩余的参数
int remaining = c->argc - base_args;
for (int i = 0; i < remaining; i++) {
char *arg = c->argv[base_args + i]->ptr;
if (!strcasecmp(arg, "withdist")) {
withdist = 1;
} else if (!strcasecmp(arg, "withhash")) {
withhash = 1;
} else if (!strcasecmp(arg, "withcoord")) {
withcoords = 1;
} else if (!strcasecmp(arg, "asc")) {
sort = SORT_ASC;
} else if (!strcasecmp(arg, "desc")) {
sort = SORT_DESC;
} else if (!strcasecmp(arg, "count") && (i+1) < remaining) {
//top 几,这个挺重要的
if (getLongLongFromObjectOrReply(c, c->argv[base_args+i+1],
&count, NULL) != C_OK) return;
if (count <= 0) {
addReplyError(c,"COUNT must be > 0");
return;
}
i++;
} else if (!strcasecmp(arg, "store") &&
(i+1) < remaining &&
!(flags & RADIUS_NOSTORE))
{
storekey = c->argv[base_args+i+1];
storedist = 0;
i++;
} else if (!strcasecmp(arg, "storedist") &&
(i+1) < remaining &&
!(flags & RADIUS_NOSTORE))
{
storekey = c->argv[base_args+i+1];
storedist = 1;
i++;
} else {
addReply(c, shared.syntaxerr);
return;
}
}
}
/* Trap options not compatible with STORE and STOREDIST. */
if (storekey && (withdist || withhash || withcoords)) {
addReplyError(c,
"STORE option in GEORADIUS is not compatible with "
"WITHDIST, WITHHASH and WITHCOORDS options");
return;
}
/* COUNT without ordering does not make much sense, force ASC
* ordering if COUNT was specified but no sorting was requested. */
//在选top的时候设置默认排序
if (count != 0 && sort == SORT_NONE) sort = SORT_ASC;
/* Get all neighbor geohash boxes for our radius search */
//取四周
GeoHashRadius georadius =
geohashGetAreasByRadiusWGS84(xy[0], xy[1], radius_meters);
//找到所有圆内的点
/* Search the zset for all matching points */
geoArray *ga = geoArrayCreate();
membersOfAllNeighbors(zobj, georadius, xy[0], xy[1], radius_meters, ga);
/* If no matching results, the user gets an empty reply. */
if (ga->used == 0 && storekey == NULL) {
addReply(c,shared.emptyarray);
geoArrayFree(ga);
return;
}
long result_length = ga->used;
//保留长度
long returned_items = (count == 0 || result_length < count) ?
result_length : count;
long option_length = 0;
/* Process [optional] requested sorting */
//排个序
if (sort == SORT_ASC) {
qsort(ga->array, result_length, sizeof(geoPoint), sort_gp_asc);
} else if (sort == SORT_DESC) {
qsort(ga->array, result_length, sizeof(geoPoint), sort_gp_desc);
}
if (storekey == NULL) {
//返回找到的点给客户端
/* No target key, return results to user. */
/* Our options are self-contained nested multibulk replies, so we
* only need to track how many of those nested replies we return. */
if (withdist)
option_length++;
if (withcoords)
option_length++;
if (withhash)
option_length++;
/* The array len we send is exactly result_length. The result is
* either all strings of just zset members *or* a nested multi-bulk
* reply containing the zset member string _and_ all the additional
* options the user enabled for this request. */
addReplyArrayLen(c, returned_items);
/* Finally send results back to the caller */
int i;
for (i = 0; i < returned_items; i++) {
geoPoint *gp = ga->array+i;
gp->dist /= conversion; /* Fix according to unit. */
/* If we have options in option_length, return each sub-result
* as a nested multi-bulk. Add 1 to account for result value
* itself. */
if (option_length)
addReplyArrayLen(c, option_length + 1);
addReplyBulkSds(c,gp->member);
gp->member = NULL;
if (withdist)
addReplyDoubleDistance(c, gp->dist);
if (withhash)
addReplyLongLong(c, gp->score);
if (withcoords) {
addReplyArrayLen(c, 2);
addReplyHumanLongDouble(c, gp->longitude);
addReplyHumanLongDouble(c, gp->latitude);
}
}
} else {
//将结果存到某个key中
/* Target key, create a sorted set with the results. */
robj *zobj;
zset *zs;
int i;
size_t maxelelen = 0;
if (returned_items) {
zobj = createZsetObject();
zs = zobj->ptr;
}
for (i = 0; i < returned_items; i++) {
zskiplistNode *znode;
geoPoint *gp = ga->array+i;
gp->dist /= conversion; /* Fix according to unit. */
//存距离还是坐标
double score = storedist ? gp->dist : gp->score;
size_t elelen = sdslen(gp->member);
if (maxelelen < elelen) maxelelen = elelen;
znode = zslInsert(zs->zsl,score,gp->member);
serverAssert(dictAdd(zs->dict,gp->member,&znode->score) == DICT_OK);
gp->member = NULL;
}
if (returned_items) {
zsetConvertToZiplistIfNeeded(zobj,maxelelen);
setKey(c,c->db,storekey,zobj);
decrRefCount(zobj);
notifyKeyspaceEvent(NOTIFY_ZSET,"georadiusstore",storekey,
c->db->id);
server.dirty += returned_items;
} else if (dbDelete(c->db,storekey)) {
signalModifiedKey(c,c->db,storekey);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",storekey,c->db->id);
server.dirty++;
}
addReplyLongLong(c, returned_items);
}
geoArrayFree(ga);
}