尚德建设集团网站,wordpress 多模板,代注册公司要多少钱,一个域名可以绑定几个网站Redis3——线程模型与数据结构 本文讲述了redis的单线程模型和IO多线程工作原理#xff0c;以及几个主要数据结构的实现。 1. Redis的单线程模型
redis6.0之前#xff0c;一个redis进程只有一个io线程#xff0c;通过reactor模式可以连接大量客户端#xff1b;redis6.0为了…Redis3——线程模型与数据结构 本文讲述了redis的单线程模型和IO多线程工作原理以及几个主要数据结构的实现。 1. Redis的单线程模型
redis6.0之前一个redis进程只有一个io线程通过reactor模式可以连接大量客户端redis6.0为了高并发下网络IO的性能引入了多个IO线程。但是自始至终redis进程只有一个命令处理线程因此redis仍然可以看作一个单线程模型。
默认情况下redis进程中的线程 1.1 Redis6.0为什么引入多个IO线程
引入多个IO线程以解决高并发场景下的网络IO瓶颈具体来说网络IO中与cpu有关的开销包括
系统调用上下文切换。内存数据拷贝虽然受内存频率和带宽限制但仍需要cpu参与。CPU主导的协议栈处理。
因此即使网络带宽和内存频率固定采用多线程io仍然能够提高整体网络IO性能。
1.2 Redis命令处理为什么采用单线程模型
单线程实现简单不用考虑线程安全和锁竞争。减少了线程切换开销。天然满足事务的原子性、隔离性。得益于数据结构的设计redis基于内存的操作通常非常高效cpu通常不是性能瓶颈。redis不同键值对的数据结构不同多线程场景下加锁操作复杂加锁粒度不好控制。
Memcached是一个redis的竞品采用多线程并发的处理客户请求其支持的数据类型单一因此能够方便的对某个局部加锁。
1.3 单线程的耗时操作处理
单线程的局限在于不能有耗时操作cpu密集型任务、io密集型任务。
磁盘IO
对于磁盘io任务redis要么启用子进程来持久化要么使用其它线程进行异步刷盘。
网络IO
对于网络io任务redis启用多个io线程每个线程都采用reactor网络模型并发处理大量客户端连接。
cpu密集型任务
对于cpu密集型任务redis需要保证其操作高效
数据结构切换redis采用的数据结构会根据数据量进行切换。当数据量少的时候redis更注重空间效率当数据量大时redis更注重时间效率。分治例如间断遍历渐进式的数据迁移增量式rehash
1.4 单线程如何保证高效
基于内存的数据库数据键值对的组织方式hashtable时间复杂度O(1)渐进式的rehash高效的数据结构与数据结构切换高效的reactor模型redis6.0之前
2. Redis的IO多线程工作原理
2.1 理论
对于每一个客户端发送来的命令redis需要进行read、decode、compute、encode、send这五个步骤。 在redis6.0之前的单线程模式下主线程通过一个reactor模型串行地完成这5个操作。
加入IO多线程后主线程会在每次循环时将IO一部分读写任务分配给IO线程执行。主线程仍然运行一个reactor事件循环在客户端可读或者可写时将其分配给IO线程。
也就是说现在read、decode以及encode、send都是并行的了。compute操作仍然时串行。
这样设计地合理性在于一般读写和解析协议操作不会出现竞态条件至于一个客户端连接相关。而执行命令则需要访问全局数据结构串行执行可以减少竞态条件。
2.2 业务流程 在accept建立新连接时调用acceptCommonHandler创建客户端上下文 在createClient中设置读事件的回调函数readQueryFromClient 读事件发生时在readQueryFromClient中读取数据并用一个sds保存到客户端上下中的querybuf中 ...
qblen sdslen(c-querybuf);
if (c-querybuf_peak qblen) c-querybuf_peak qblen;
c-querybuf sdsMakeRoomFor(c-querybuf, readlen);
nread connRead(c-conn, c-querybufqblen, readlen);
...
processInputBuffer(c);在processInputBuffer中处理读取到的数据 分割数据包如果是单行命令执行processInlineBuffer否则执行processMultibulkBuffer if (c-reqtype PROTO_REQ_INLINE) {if (processInlineBuffer(c) ! C_OK) break;...
} else if (c-reqtype PROTO_REQ_MULTIBULK) {if (processMultibulkBuffer(c) ! C_OK) break;
} else {serverPanic(Unknown request type);
}如果在IO线程中则标记其为有命令需要执行的状态否则直接执行 /* If we are in the context of an I/O thread, we cant really execute the command here. All we can do is to flag the client as one that needs to process the command. */
if (c-flags CLIENT_PENDING_READ) {c-flags | CLIENT_PENDING_COMMAND;break;
}
/* We are finally ready to execute the command. */
if (processCommandAndResetClient(c) C_ERR) {return;
}在主线程中事件循环中每次循环会调用handleClientsWithPendingReadsUsingThreads函数它会将仍有数据待读取的客户端分配给IO线程去并行的读取和解析然后等待这些IO线程执行完毕之后再调用processCommand处理所有有待处理的命令的客户端。 处理完毕后调用addReply将输出写入到output buffer 当写事件发生时会调用sendReplyToClient将数据封装成协议然后发送给客户端。 /* Write event handler. Just send data to the client. */
void sendReplyToClient(connection *conn) {client *c connGetPrivateData(conn);writeToClient(c,1);
}另外主线程也会调用handleClientsWithPendingWritesUsingThreads将可写的客户端分配给IO线程完成。
2.3 IO线程池
IO线程的工作非常简单主线程会将待处理的客户端分配个各个io线程当io线程检测到有客户端分配来时就根据主线程设置的操作类型取执行。
while(1) {/* Wait for start */for (int j 0; j 1000000; j) {if (getIOPendingCount(id) ! 0) break;}/* Give the main thread a chance to stop this thread. */if (getIOPendingCount(id) 0) {pthread_mutex_lock(io_threads_mutex[id]);pthread_mutex_unlock(io_threads_mutex[id]);continue;}serverAssert(getIOPendingCount(id) ! 0);/* Process: note that the main thread will never touch our list before we drop the pending count to 0. */listIter li;listNode *ln;listRewind(io_threads_list[id],li);while((ln listNext(li))) {client *c listNodeValue(ln);if (io_threads_op IO_THREADS_OP_WRITE) {writeToClient(c,0);} else if (io_threads_op IO_THREADS_OP_READ) {readQueryFromClient(c-conn);} else {serverPanic(io_threads_op value is unknown);}}listEmpty(io_threads_list[id]);setIOPendingCount(id, 0);}3. Redis数据结构 2.1 字典
redis的字典采用hash表实现。
2.1.1 扩容缩容机制 哈希冲突采用拉链法 负载因子1 扩容机制加倍扩容如果扩容时正在fork在rdb或aof复写则阻止扩容但是若此时负载因子5则利用写时复制原理马上扩容。 // redis v6.2.16
if ((dict_can_resize DICT_RESIZE_ENABLE d-ht[0].used d-ht[0].size) ||(dict_can_resize ! DICT_RESIZE_FORBID d-ht[0].used / d-ht[0].size dict_force_resize_ratio)){return dictExpand(d, d-ht[0].used 1);}缩容机制如果负载因子0.1则恰好大于等于used的2的n次幂为新的容量。例如此时负载因子小于0.1元素总数为17则新的容量为2的5次方 32 17。 // redis v6.2.16
int htNeedsResize(dict *dict) {long long size, used;size dictSlots(dict);used dictSize(dict);return (size DICT_HT_INITIAL_SIZE (used*100/size HASHTABLE_MIN_FILL));
}/* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL* we resize the hash table to save memory */
void tryResizeHashTables(int dbid) {if (htNeedsResize(server.db[dbid].dict))dictResize(server.db[dbid].dict);if (htNeedsResize(server.db[dbid].expires))dictResize(server.db[dbid].expires);
}2.1.2 渐进式rehash
渐进式rehash每个dict都有两个hash表当没有触发扩容时第二个hash表为NULL当进行扩容时则分多次渐进地迁移旧的hash表的每个槽位。有两种迁移策略这两种策略可以并行。
将rehash的操作分摊在之后增删改查操作中。例如当进行查询操作时就迁移一个槽位。当所有的元素都被迁移完毕就将旧的删除用新的hash替换旧的hash表。对于主hash表使用定时器每隔一段时间(100ms)就花费1ms时间执行迁移操作。
增删改查时rehash一个
每次增删改查rehash一个槽位
static void _dictRehashStep(dict *d) {// 判断是否选择这种每次rehash一步的迁移策略if (d-pauserehash 0) dictRehash(d,1);
}// 增
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{long index;dictEntry *entry;dictht *ht;if (dictIsRehashing(d)) _dictRehashStep(d);...
}
// 删
static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {uint64_t h, idx;dictEntry *he, *prevHe;int table;if (d-ht[0].used 0 d-ht[1].used 0) return NULL;if (dictIsRehashing(d)) _dictRehashStep(d);...
}// 查
dictEntry *dictFind(dict *d, const void *key)
{dictEntry *he;uint64_t h, idx, table;if (dictSize(d) 0) return NULL; /* dict is empty */if (dictIsRehashing(d)) _dictRehashStep(d);...
}dictEntry *dictGetRandomKey(dict *d)
{dictEntry *he, *orighe;unsigned long h;int listlen, listele;if (dictSize(d) 0) return NULL;if (dictIsRehashing(d)) _dictRehashStep(d);...
}unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count) {unsigned long j; /* internal hash table id, 0 or 1. */unsigned long tables; /* 1 or 2 tables? */unsigned long stored 0, maxsizemask;unsigned long maxsteps;if (dictSize(d) count) count dictSize(d);maxsteps count*10;/* Try to do a rehashing work proportional to count. */for (j 0; j count; j) {if (dictIsRehashing(d))_dictRehashStep(d);elsebreak;}...
}空闲时迁移1ms
// rehash‘ms’毫秒
int dictRehashMilliseconds(dict *d, int ms) {if (d-pauserehash 0) return 0;long long start timeInMilliseconds();int rehashes 0;while(dictRehash(d,100)) {rehashes 100;if (timeInMilliseconds()-start ms) break;}return rehashes;
}
// 渐进式rehash每次rehash 1ms
int incrementallyRehash(int dbid) {/* Keys dictionary */if (dictIsRehashing(server.db[dbid].dict)) {dictRehashMilliseconds(server.db[dbid].dict,1);return 1; /* already used our millisecond for this loop... */}/* Expires */if (dictIsRehashing(server.db[dbid].expires)) {dictRehashMilliseconds(server.db[dbid].expires,1);return 1; /* already used our millisecond for this loop... */}return 0;
}redis何时执行增量式rehash
在redis的定时任务中会判断是否开启了自动rehash如果开启就执行。
void databasesCron(void) {...// 没有fork子进程来持久化的是否进行rehashif (!hasActiveChildProcess()) {...// 如果开启了rehashif (server.activerehashing) {for (j 0; j dbs_per_call; j) {int work_done incrementallyRehash(rehash_db);...}...}
}在配置文件中可以选择是否开启
activerehashing yes2.1.3 哈希表的遍历
遍历数据库的原则为①不重复出现数据②不遗漏任何数据。熟悉Redis命令的读者应该知道遍历Redis整个数据库主要有两种方式全遍历 例如keys命令、间断遍历 hscan命令这两种方式将在下面进行详细讲解。
2.1.3.1 全遍历
全遍历一次命令执行就遍历完整个数据库。
全遍历使用普通迭代器或者间断迭代器
typedef struct dictIterator {dict *d; // 当前遍历的字典long index; // 当前bucket索引int table, safe; // hash表id和是否是安全迭代器dictEntry *entry, *nextEntry; // 当前元素和下一个元素/* unsafe iterator fingerprint for misuse detection. */unsigned long long fingerprint; // 由hash表中所有元素生成的指纹如果遍历过程中元素发生变化就能够根据此值检查出来
} dictIterator;普通迭代器与安全迭代器的区别在于普通迭代器迭代期间允许rehash但是如果数据元素发生变动导致fingerprint发生变化就会发出异常。而安全迭代器在迭代期间不允许进行渐进式rehash。
一次迭代的操作如下
dictEntry *dictNext(dictIterator *iter)
{while (1) {if (iter-entry NULL) {dictht *ht iter-d-ht[iter-table];if (iter-index -1 iter-table 0) {// 初始时如果时安全迭代器就禁止rehash操作否则就计算指纹if (iter-safe)dictPauseRehashing(iter-d);elseiter-fingerprint dictFingerprint(iter-d);}iter-index;if (iter-index (long) ht-size) {if (dictIsRehashing(iter-d) iter-table 0) {iter-table;iter-index 0;ht iter-d-ht[1];} else {break;}}iter-entry ht-table[iter-index];} else {iter-entry iter-nextEntry;}if (iter-entry) {/* We need to save the next here, the iterator user* may delete the entry we are returning. */iter-nextEntry iter-entry-next;return iter-entry;}}return NULL;
}2.1.3.2 间断遍历
间断遍历每次命令执行只取部分数据分多次遍历。
间断遍历期间允许进行渐进式rehash它通过采用reverse binary iteration来保证不重复数据也不遗漏数据。
由于扩容及缩容正好为整数倍增长或减少的原理根据这个特征很容易就能推导出同一个节点的数据扩容/缩容后在新的Hash表中的分布位置例如一个hash表大小为4索引为0的元素扩容后新的索引可能为0或者4。一个hash表大小为8索引为6的元素缩容后新的索引为2。
根据这个规律可以设计一种迭代顺序规则即游标值v变化的规则
v | ~mask;
v rev(v); // 二进制逆转
v;
v rev(v); // 二进制逆转两次遍历间隔期间发生扩容举例如果size为4则mask为二进制011遍历顺序为00, 10, 01, 11对应十进制索引值0, 2, 1, 3。假如遍历到1时发生了扩容则mask变为0111则遍历顺序变为00, 10, 001, 101, 011, 111对应十进制索引值为0, 2, 1, 5, 3, 7少了4和6。正好扩容前的02中的元素可能被放入46。这样就防止了重复遍历。
同理如果两个遍历地间隔期间发生缩容这种算法也可以避免重复护着遗漏元素。
两次遍历间隔期间发送缩容举例如果size为8则mask为二进制0111遍历顺序为000100010110001101011111对应十进制04261537。如果在遍历1时发生了缩容则mask为011遍历顺序就变为0001000101100111对应十进制为042613少了5和7因为缩容前5和7的元素在缩容后会被放入1和3。这样就防止了重复和遗漏。
这种高位累加的游标变更算法巧妙地利用扩容缩容前后元素索引值地变化规律设计出游标变化规则从而避免了重复遍历或漏遍历。
在redis中可以用scan和hscan进行间断遍历
127.0.0.1:6379 scan 0 match * count 1
1) 2
2) 1) yy2) person
127.0.0.1:6379 scan 2 match * count 1
1) 1
2) 1) dsaa
127.0.0.1:6379 scan 1 match * count 1
1) 3
2) 1) name
127.0.0.1:6379 scan 3 match * count 1
1) 0
2) (empty array)
2.2 跳表
请参考我之前的博客高级数据结构——跳表skiplist。
2.3 压缩列表
压缩列表ziplist本质上就是一个字节数组一段连续的内存是Redis为了节约内存而设计的一种线性数据结构可以包含多个元素每个元素可以是一个字节数组或一个整数。
redis6.0之后逐渐用listpack取代ziplist。redis7.0之后完全移除了ziplist的支持。
Redis的有序集合、散列和列表都直接或者间接使用了压缩列表。当有序集合或散列表的元素个数比较少且元素都是短字符串时Redis便使用压缩列表作为其底层数据存储结构。列表使用快速链表quicklist数据结构存储而快速链表就是双向链表与压缩列表的组合。
2.4 quicklist
quicklist由双向链表adlist和ziplist结合而成。Redis中对quciklist的注释为A doubly linked list of ziplists。顾名思义quicklist是一个双向链表链表中的每个节点是一个ziplist结构。
结构体
typedef struct quicklist {quicklistNode *head;quicklistNode *tail;unsigned long count; /* total count of all entries in all ziplists */unsigned long len; /* number of quicklistNodes */int fill : QL_FILL_BITS; /* fill factor for individual nodes */unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0off */unsigned int bookmark_count: QL_BM_BITS;quicklistBookmark bookmarks[];
} quicklist;typedef struct quicklistNode {struct quicklistNode *prev;struct quicklistNode *next;unsigned char *zl;unsigned int sz; /* ziplist size in bytes */unsigned int count : 16; /* count of items in ziplist */unsigned int encoding : 2; /* RAW1 or LZF2 */unsigned int container : 2; /* NONE1 or ZIPLIST2 */unsigned int recompress : 1; /* was this node previous compressed? */unsigned int attempted_compress : 1; /* node cant compress; too small */unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;2.5 动态字符串
2.5.1 对象结构
Redis没有直接使用C语言传统的字符串表示以空字符结尾的字符数组以下简称C字符串而是自己构建了一种名为简单动态字符串simple dynamic stringSDS的抽象类型并将SDS用作Redis的默认字符串表示。
下面是redis6.2中sds对象结构
// 可以存储小于32字节的字符串
struct __attribute__ ((__packed__)) sdshdr5 {unsigned char flags; /* 3 lsb of type, and 5 msb of string length */char buf[];
};
// 可以存储小于256字节的字符串
struct __attribute__ ((__packed__)) sdshdr8 {uint8_t len; /* used */uint8_t alloc; /* excluding the header and null terminator */unsigned char flags; /* 3 lsb of type, 5 unused bits */char buf[];
};
struct __attribute__ ((__packed__)) sdshdr16 {uint16_t len; /* used */uint16_t alloc; /* excluding the header and null terminator */unsigned char flags; /* 3 lsb of type, 5 unused bits */char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 {uint32_t len; /* used */uint32_t alloc; /* excluding the header and null terminator */unsigned char flags; /* 3 lsb of type, 5 unused bits */char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {uint64_t len; /* used */uint64_t alloc; /* excluding the header and null terminator */unsigned char flags; /* 3 lsb of type, 5 unused bits */char buf[];
};为了节省空间不同大小的字符串对应的头部大小应该不同对于一个小字符串没有必要使用64位整数来保存字符串长度。
sds中的len表示字符串长度alloc表示总的空间大小不包含头部和末尾空白字符。与C风格的字符串相比这样设计的好处是
内存安全不依赖末尾空白字符这样可以防止内存越界等问题。兼容二进制数据
末尾仍然以’\0’结尾可以兼容C风格字符串。
flags字段用来标识类型由于一共有5种类型至少需要3位来存储于是对于最短的字符串来说剩余5位可以用来存储字符串的长度可以存储最长31字节的字符串。
**__attribute__ ((_packed_))**需要注意一般情况下结构体的整体大小必须是其成员最大对齐要求的倍数。最大对齐要求通常是结构体中对齐要求最高的成员的对齐数而用packed修饰后结构体则变为按1字节对齐。
使用该编译器扩展属性后sdshdr32减少了3字节的填充。
struct sdshdr32_no_packed {uint32_t len; /* used */uint32_t alloc; /* excluding the header and null terminator */unsigned char flags; /* 3 lsb of type, 5 unused bits */char buf[];
};int main()
{// sizeof sdshdr32:9printf(sizeof sdshdr32:%lld\n, sizeof (struct sdshdr32));// sizeof sdshdr32_no_packed:12printf(sizeof sdshdr32_no_packed:%lld\n, sizeof (struct sdshdr32_no_packed));return 0;
}2.5.2 柔性数组
buf是一个柔性数组成员flexible array member也叫伸缩性数组成员只能被放在结构体的末尾。包含柔性数组成员的结构体通过malloc函数为柔性数组动态分配内存。
柔性数组的限制
只能是结构体的最后一个成员不能是结构体的唯一一个成员运行时要手动为包含柔性数组的结构体分配足够的连续内存
使用柔性数组的好处 和sds结构体成员共用一个连续内存减少动态内存分配和间接访问效率更高。 通过首地址偏移方便得到结构体的首地址进而获取其它成员变量。 首地址偏移示例 /* Free an sds string. No operation is performed if s is NULL. */
void sdsfree(sds s) {if (s NULL) return;s_free((char*)s-sdsHdrSize(s[-1]));
}2.5.3 相关面试题 为什么sds小于等于44字节时采用embstr编码大于44字节时采用raw编码 embstr编码将redisObject和sds存储在一个连续内存中而raw编码将两者分开存储。redisObject的大小为16字节44字节对应sdshdr8占用3字节还有1字节的空白字符所以44字节的sds实际占用64字节。 首先cache line大小为64字节如果整个sds结构体小于等于64字节就能够利用缓存加快访问速度此时采用压缩编码效率最高。 其次大多数内存分配器都是按照2的整数次幂的大小分配内存选择64字节可以减少内存碎片。 综上当字符串小于等于44字节时采用嵌入编码能够利用缓存行和减少内存碎片。 为什么其它类型中数据结构切换都有一个元素长度不超过64字节这个分界线 首先当字符串总长度小于等于64字节时采用的是embstr编码将sds嵌入到redisObject中整个元素占用一段连续的内存空间。这可以节省内存空间减少内存分配开销。 此外cache line大小为64字节元素长度不超过64字节可以更高的利用缓存行加速访问。 数据结构切换之前为了节约内存通常采用压缩列表而元素长度不超过64字节的字符串采用embstr也是为了节约内存。
学习参考
学习更多相关知识请参考零声 github。