csdn推荐
事件循环怎么实现的
首先我将连接包装为一个 Connect 类,它包含了 socket fd,读写缓冲区,连接状态(这个连接是发送数据还是接收数据)等成员属性
我会在全局维护一个从 socket fd 到它对应的 Connect 对象指针的 map,叫做 fd2conn
在 main 函数中首先 socket、bind、listen 创建一个 listen fd,然后创建一个 vector poll_args,接着进入事件循环(一个死循环)
在事件循环中首先将 poll_args 清空,然后压入 listen fd 对应的 pollfd;然后遍历 fd2conn,根据每个 Connect 对象的状态(发送状态还是接收状态)决定 pollfd 中要监听的事件(POLLIN | POLLOUT),把 fd2conn 中的每一项都压入 poll_args 数组,然后调用 poll,传入 poll_args 监听这些文件描述符
等到 poll 返回后,再遍历 poll_args,根据其 fd 在 fd2conn 中找到对应的 Connect 对象,执行 IO 操作。如果 IO 操作结束后 Connect 的状态变为 END,则关闭这个连接并从 map 中删除
最后再判断一下 0 号 poll_args 是否有就绪事件(listen fd),如果有,则说明有新连接到来,再去调用 accept 函数创建新的连接。accept 的结果会在 fd2conn 这个 map 中放入对应的 socket fd 和 Conn 对象
连接定时器是如何实现的
连接定时器是通过双向循环链表串起来的,我在 Connect 结构中还会维护一个 us 级的时间戳 idle_start,它的意义是“上一次操作这个连接的时刻”,也就是开始空闲的时刻,在 Connect 的构造函数里初始化为当前时刻,构造完成后把它挂到链表末尾
idle_start 在每一次处理 IO 时都会被刷新为当前时刻(说明它当前活跃),然后从链表中取下来,再挂到末尾,这样就保证了双向链表自动是递增有序的,这个序就是每个连接的空闲开始时刻,越位于前面的,空闲开始的越早,越可能超时
在每一次 poll 之前,我都去从链表首部获取最小的那个时间戳,这意味着这个连接空闲开始的最早,拥有最小的超时时刻,把这个时间戳加上一个 timeout 值,例如 5ms,然后求这个值减去当前时刻,就是需要最小阻塞等待的时长,然后 poll 的第三个超时参数传入这个值即可
在每一次事件循环的末尾,我都会去从前往后处理这个双向链表管理的定时器:如果空闲开始时间戳 + timeout < 当前时刻,说明超时了,就把这个定时器取下,然后关闭这个连接。直到遇到一个不超时的连接,此时退出循环即可,因为链表是自增有序的
支持哪些命令
set k1 v1
get k1
del k1
keys
# zadd zset_name score name
zadd zs1 1.0 abc
zadd zs1 1.5 def
zrem zs1 def
zscore zs1 def
zquery zs1 1.5 def $offset $limit
pexpire k1 5000
pttl k1
解释客户端命令发送到服务端的过程
客户端从命令行参数获取到 “set k1 v1” 这个命令,然后调用 socket、connect 建立连接;服务端在本轮事件循环末尾 accept 这个连接,将它放入全局的 fd2conn 的 map 中,设置 idle_start 为当前时刻,把它插入 idle_list 链表的末尾
客户端建立连接后,紧接着调用 write 发送数据,也就是 “set k1 v1” 这个字符串;服务端这时已经进入到下一轮事件循环,在这轮循环中,poll 监听到 fd 可读,返回,然后轮询 poll_args,轮询到该客户端 fd 的可读,进而处理该客户端发来的数据
解释 set 命令的执行过程
客户端发来 “set k1 v1”
服务端首先从 fd 读取数据到 read buffer 中,然后将这3个单词解析到一个 vector,然后根据第一个单词是 set,进入到 do_set 函数
在 do_set 中首先去全局的 hashmap 中查找对应的 key 是否存在,查找的过程是在2个 hashtable 都查询(因为渐进式哈希用到 2 个哈希表),在 hashtable 中查询的过程是先根据哈希值 % 哈希桶长度定位到哈希桶(实际上哈希桶长度永远是 2 ^ n,故只需要 hashcode & (n - 1) 即可),然后遍历哈希桶后面的链表,如果存在 hashcode 一样且判等函数返回 true 的节点,则返回该节点
如果在全局 hashmap 中查到了 k1 已经存在,则将它对应的 value 改为 v1;否则新建一个 Entry,把它的挂到全局 hashmap
HashMap 怎么实现的
// hashtable node, should be embedded into the payload
struct HNode {
HNode *next = NULL;
uint64_t hcode = 0;
};
// a simple fixed-sized hashtable
struct HTab {
HNode **tab = NULL;
size_t mask = 0;
size_t size = 0;
};
// the real hashtable interface.
// it uses 2 hashtables for progressive resizing.
struct HMap {
HTab ht1; // newer
HTab ht2; // older
size_t resizing_pos = 0;
};
HashMap 也采用侵入式容器,我采用拉链法解决哈希冲突,每个链表节点(HNode)只存储一个 hashcode 和指向下一个节点的指针,并不存储真正的 key、value 数据
然后定义一个 HTable 结构,它有3个成员:HNode* 的数组 tab;数组长度 - 1 的 mask,因为限制了数组长度只能为 2 ^ n,所以定位哈希桶时就避免了 mod 运算;还有标志着当前哈希表总节点数量的 size
最后定义真正的 HMap 结构,它采用渐进式哈希的策略,拥有 2 个 HTable,还有一个标志着 rehash 进度位置的 pos 域
解释 HashMap 的插入过程
insert() 的参数是 HNode*,一个 HashMap 会管理2个哈希桶,ht1 和 ht2,插入操作都是在 ht1 中进行
插入 ht1 的过程是先用 hnode 的 hashcode & mask 求得哈希桶的位置,然后执行单链表头插,最后 size++
在 ht1 中插入之后判断 ht2 是否为 null,若为 null,再判断 ht1 的负载因子是否过高,如果过高,则将 ht1 转移给 ht2,ht1.resize,然后设置 HashMap 的 resizing_pos = 0,这个过程可以看作是开启 rehash 过程
解释 rehash 的过程
渐进式 rehash 就是不一次性 rehash 全部节点,而是只在每一步操作 HashTable 的过程中 rehash 固定数目的节点
rehash 开始的标志是在向 ht1 中插入一个节点后,判断 ht2 为空且负载因子 > 8,此时将 ht1 转移给 ht2,ht1.resize,然后设置 HashMap 的 resizing_pos = 0,resize_pos 表示在 ht2 中 rehash 的进度
然后在每一次操作哈希表的函数中,包括查询,新增,删除,都会执行一次 rehash 函数,它会循环执行 128 次:从 ht2 的哈希桶的 resizing_pos 位置摘下头节点,将其 detach 下来,再插入到 ht1 的 HashTable。直到循环次数满或 ht2 被摘空
一旦 ht2 被摘空,就意味着 rehash 终止(因为他是作为 rehash 的循环条件之一)
rehash 的过程中,对哈希表的插入照常在 ht1 中执行,且只有在 ht2 被摘空后,才会再次计算负载因子,开始下一次 rehash
解释 HashMap 的删除过程
pop 传入的是一个 HNode* key 和一个判等函数 eq。首先按需执行 rehash,结束后先在 ht1 中 lookup:先根据 key 中的 hcode & mask 定位到哈希桶,然后在它后面的单链表中遍历查找,遇到 hcode 一致且 eq 函数返回 true 的节点,将其返回
如果在 ht1 中查找结果不为空,则将其 detach 返回(从链表中摘下);否则在 ht2 中执行同样的步骤
如果 ht1 和 ht2 查找都为空,说明待删除的节点不存在,返回 null
哈希冲突你怎么解决的
我会在每一个涉及到在 HashTab 中查找操作的函数都传入一个判等函数 eq,它的参数是2个 HNode*,用于真正判断这两个节点是否相等,例如当使用 HashMap 存储 KV 时,它会根据 HNode* 调用 container_of,找到对应的 Entry 结构(真正存储 string KV)的,然后判断 string key 是否相等
HashMap 如何是如何管理 KV 的
struct Entry {
HNode node;
std::string key;
std::string val;
uint32_t type = 0;
ZSet* zset = NULL;
size_t heap_idx = -1;
Entry(const std::string& k) {
key = k;
node.hcode = str_hash(k);
}
};
我定义了一个 Entry 结构,它包含 string 类型的 KV,同时包含了一个 HNode 类型(就是哈希表中单链表的节点类型)
HNode 中只有一个 hachode 和指向下一个 HNode 的指针,hashcode 在 Entry 构造时被初始化
这样,查找某个 string key 时,只需要将 key 构造为 Entry(栈内存),然后把这个 Entry 的 hnode 交给 HMap 的 lookup 方法查找即可。lookup 返回的是一个 HNode*,就是真正存储了这个 key 的 hashcode 的 HNode,然后使用 container_of 即可获得包裹了这个 HNode 的 Entry 指针
set k v 时,也是先将 key 构造为 Entry,然后把这个 Entry 的 hnode 交给 HMap 的 lookup 方法查找。如果返回 NULL,就说明这对 KV 不存在,需要 new Entry,然后将它的 hnode 成员插入全局的 HashMap 中;否则就使用 container_of 定位到包含这个 hnode 的 Entry,然后修改它的 value
换句话说,KV 数据只是被简单地存储在堆内存中,将它的 HNode 域挂到 HashMap 上进行管理和查找
哈希函数如何实现的
哈希函数的作用是将 string 转化为 int64,采用的策略是先初始化一个较大的素数h,然后不断地将字符串中的字节数据累加到 h 上,并乘以一个魔法值,最终得到一个 int64 的魔法值
解释 get 命令的执行过程
客户端发来“get k”命令,服务器将其解析为 vector,判断其第一个单词是 get,调用 do_get 函数进行处理
在 do_get 中,首先用发来的 k 构造一个 Entry(初始化 Entry 中 HNode 的 hcode),然后将这个 Entry 的 HNode 丢给 HashMap 的 lookup 方法进行查找(和判等函数),如果未找到,序列化一个 nil 给客户端
如果找到节点,调用 container_of,定位到包裹这个 HNode 的 Entry 对象指针,获取其 string val 字段,序列化给客户端
解释 del 命令的执行过程(异步)
客户端发来“del k”命令,服务器将其解析为 vector,判断其第一个单词是 del,调用 do_del 函数进行处理
在 do_del 中,首先用发来的 k 构造一个 Entry(初始化 Entry 中 HNode 的 hcode),然后将这个 Entry 的 HNode 丢给 HashMap 的 pop 方法,在 HashMap 中删除这个 HNode
如果这个 HNode 在哈希表中存在,则使用 container_of 找到包裹该 HNode 的对象指针,执行删除
在删除函数中,判断该对象是否为 ZSet,如果是,再判断它是否 too_big,如果是,将其压入线程池的任务队列执行异步删除;否则执行同步删除即可(因为线程调度是有开销的,所以只有对大块数据才执行异步删除)
解释 keys 命令的执行过程
使用 h_scan 函数扫描哈希桶,然后在扫描每个哈希桶后面的单链表,在每个节点上调用 container_of 获取包裹这个 HNode 的节点,然后获取它的 key 值,序列化给客户端
解释一下侵入式数据结构(以双向链表为例)
侵入式双向链表节点只包含 prev 和 next 两个指针,没有数据域。然后基于这个节点定义实现链表的操作,比如 insert 和 detach
对比传统容器,将被容器管理的对象“塞入”容器(也就是 STL 的风格);侵入式容器需要在被管理的对象内增加一个数据域,也就是链表节点,相当于被管理的对象包裹住了链表节点
我在项目中是用双向链表管理连接对象 Connect 的,所以我的 Connect 类里面有一个 DList 类型的数据成员 idle_list(因为链表管理的是 Connect 的超时时长),向容器中插入对象就是 new Connect,然后将这个对象的链表节点数据域挂到链表上;从容器中删除对象就是将这个对象的链表节点从链表上摘除,然后 delete 这个对象
侵入式容器的工作过程需要一个 container_of 指针计算函数,一般用宏函数实现(因为涉及到传入类型、字段名作为参数,不符合 C++ 的函数语法)
它的参数有3个:链表节点指针,包裹链表节点的数据类型(也就是 Connect 类型),链表节点在被管理对象中的字段名字(也就是 idle_list)
它的功能是根据这三者定位到包裹链表节点的数据对象的指针
它的原理是用链表节点指针减去链表节点数据域在整个 Connect 对象中的 offset,就得到了 Connect 对象的地址
这个 container_of 函数在所有的侵入式容器中都是通用的
怎么用 HashMap + AVLTree 实现的 ZSet
struct AVLNode {
uint32_t depth;
uint32_t cnt;
AVLNode *left;
AVLNode *right;
AVLNode *parent;
AVLNode()
: depth(1), cnt(1), left(nullptr), right(nullptr), parent(nullptr) {}
void init() {
depth = cnt = 1;
left = right = parent = nullptr;
}
void update();
AVLNode *rot_left();
AVLNode *rot_right();
AVLNode *fix_left();
AVLNode *fix_right();
};
AVLNode *avl_fix(AVLNode *node);
AVLNode *avl_del(AVLNode *node);
AVLNode *avl_offset(AVLNode *node, int64_t offset);
struct ZNode {
AVLNode tree;
HNode hmap;
double score = 0;
std::string name;
ZNode(std::string name, double score) {
this->name = name;
hmap.hcode = str_hash(name);
this->score = score;
}
ZNode* offset(int64_t offset) {
auto tnode = avl_offset(&tree, offset);
return tnode ? container_of(tnode, ZNode, tree) : nullptr;
}
};
struct ZSet {
AVLNode* tree = NULL;
HMap hmap;
public:
bool add(const std::string& name, double score);
ZNode* look_up(const std::string& name);
ZNode* pop(const std::string& name);
ZNode* query(double score, const std::string& name);
~ZSet();
private:
void tree_add(ZNode* node);
void update(ZNode* node, double score);
};
解释 zadd 命令的执行过程
// the structure for the key
struct Entry {
HNode node;
std::string key;
std::string val;
uint32_t type = 0;
ZSet* zset = NULL;
size_t heap_idx = -1;
Entry(const std::string& k) {
key = k;
node.hcode = str_hash(k);
}
};
在 Entry 中存储了一个 type 用于区分 key 代表的是 KV 键值对还是 ZSet
在 do_zadd 函数中,首先使用 zset 的名字构造一个 Entry(初始化它的 HNode 中的 hcode),然后在全局的 HashMap 中查找这个 Entry 中的 HNode。如果未找到,则 new 一个 Entry,设置它的 type 为 ZSET,并将它的 HNode 插入到 HashMap;否则如果能找到,则使用 container_of 找到包裹这个 HNode 的 Entry*,判定其为 ZSET
ZSet 有一个 AVL 树结构和一个 HMap,前者用于存储 score,后者用于存储 name;还有增加、查询、删除、范围查询方法
然后执行 ZSet 的 add 方法,它的参数是 (name, score)
在 add 方法中,首先在 ZSet 的 HMap 中查询 name 是否存在,若存在,则执行更新操作:将 ZNode 中的 AVLNode 节点从 AVL 树上摘除,将 score 改为新值,再插入到 AVL 树;否则说明 name 不存在,则 new ZNode,将它的 name 插入到当前 ZSet 持有的 HMap,再把当前节点的 AVLNode 成员挂到 AVL 树上
解释 zrem 的执行过程
zrem zs1 kk1
首先在全局的 HashMap 中根据 zset_name 查找对应的 Entry 是否存在,若存在,再判定其 type == ZSET
然后执行 ZSet 的 pop 方法:首先根据 name 构造一个 HKey(类似于 Entry),在当前 ZSet 管理的 HMap 中 pop
若 pop 返回非 null,说明该节点存在,则使用 container_of 找到包裹它的 ZNode,再将它的 AVLNode tree 从树上删除
解释 zscore 的执行过程
zscore zs1 kk1
首先在全局的 HashMap 中根据 zset_name 查找对应的 Entry 是否存在,若存在,再判定其 type == ZSET
然后执行 ZSet 的 lookup 方法:根据 name 构造一个 HKey(类似于 Entry),在当前 ZSet 管理的 HMap 中 lookup,最后使用 container_of 返回包裹它的 ZNode
如果返回非 null,则解引用 ZNode 的 score 字段将其作为 double 序列化给客户端;否则序列化给客户端 nil
解释 zquery 的执行过程(范围查询) 小顶堆是怎么实现的
// heap.h
struct HeapItem {
uint64_t val = 0;
size_t *ref = NULL;
};
// 节点上滤或下滤
void heap_update(HeapItem *a, size_t pos, size_t len);
// 在 server.cpp 中定义一个全局的 heap
std::vector<HeapItem> heap;
由于堆在逻辑上是一颗完全二叉树,所以可以借助一个数组来实现
解释 pexpire 的执行过程(如何使用小顶堆实现 TTL)
pexpire kk1 5000
pexpire 命令的第二个参数为一个 int 类型的毫秒值,代表过期时长
在 Entry 中,存储了一个 heap_idx,作为堆的数组下标
在 do_expire 中,首先根据传入的 kk1 构造一个 Entry,将其 HNode 丢给全局的 HMap 的 lookup 查找,如果存在,再调用 container_of 找到包裹这个 HNode 的 Entry 对象指针,再将这个指针和 ttl_ms 传入 entry_set_ttl 设置 TTL
在 entry_set_ttl 中,首先根据 ent->heap_idx 定位到堆中的 HeapItem,如果 pos = heap_idx == -1,则需要新建一个 HeapItem,让其 ref 指向 ent->heap_idx,然后将其 pushback 进堆末尾,设置 pos = heap.size() - 1
然后 heap[pos].val = get_monotonic_usec() + (uint64_t)ttl_ms * 1000;,再调整堆
解释 pttl 的执行过程
pttl kk1
在 Entry 中,存储了一个 heap_idx,作为堆的数组下标
在 do_ttl 中,首先根据传入的 kk1 构造一个 Entry,将其 HNode 丢给全局的 HMap 的 lookup 查找,如果存在,再调用 container_of 找到包裹这个 HNode 的 Entry 对象指针 ent
然后根据 ent->heap_idx 在堆中定位到 HeapItem,读出它的 val 字段,也就是过期时刻 expire_at
然后用这个时刻减去当前时刻,就是 ttl,将它序列化给客户端即可
过期 key 是怎么实现自动删除的
我会在每个事件循环的末尾处理定时器,包括连接的定时器还有堆定时器
在处理堆定时器时,因为它是小顶堆,所以堆顶的元素是过期时刻最小的,是最可能超时的
while 循环判断堆顶定时器超时时刻是否小于当前时刻,如果小于,则在全局的 HMap 中将这个 key pop 掉,然后在 entry_set_ttl 中将其 ttl 设置为 -1,一旦设置为 -1,就会被从堆中删除
数据序列化和反序列化如何实现的
tiny-redis 支持多种数据类型的序列化,比如 nil、str、int64、double、err 等
先定义一系列表示数据类型的枚举
enum {
SER_NIL = 0,
SER_ERR = 1,
SER_STR = 2,
SER_INT = 3,
SER_DBL = 4,
SER_ARR = 5,
};
序列化时首先 push_back 数据类型枚举值,然后 append 进真正需要序列化的值,对于字符串需要先 append 进字符串长度
static void out_nil(std::string &out) {
out.push_back(SER_NIL);
}
static void out_str(std::string &out, const std::string &val) {
out.push_back(SER_STR);
uint32_t len = (uint32_t)val.size();
out.append((char *)&len, 4);
out.append(val);
}
static void out_int(std::string &out, int64_t val) {
out.push_back(SER_INT);
out.append((char *)&val, 8);
}
static void out_dbl(std::string &out, double val) {
out.push_back(SER_DBL);
out.append((char *)&val, 8);
}
static void out_err(std::string &out, int32_t code, const std::string &msg) {
out.push_back(SER_ERR);
out.append((char *)&code, 4);
uint32_t len = (uint32_t)msg.size();
out.append((char *)&len, 4);
out.append(msg);
}
客户端遵循同样的协议进行反序列化:先读取数据类型,然后读取数据内容
线程池是怎么实现的
struct Work {
void (*f)(void *) = NULL;
void *arg = NULL;
};
struct ThreadPool {
std::vector<pthread_t> threads;
std::deque<Work> queue;
pthread_mutex_t mu;
pthread_cond_t not_empty;
ThreadPool(size num_threads);
void enqueue(void (*f)(void *), void *arg);
}
线程池中包含如下的成员:
创建线程池时 pthread_create() 创建指定数目的线程,并初始化锁和条件变量
向任务队列添加任务时,先获取锁,然后 push_back,然后唤醒条件变量,最后解锁
每个任务是一个死循环:先解锁,判断任务队列是否为空,若为空则循环等待条件变量;否则从任务队列中取出任务,然后解锁,最后执行任务
压测结果如何
文章来源:https://blog.csdn.net/DanielSYC/article/details/139807968
微信扫描下方的二维码阅读本文
暂无评论内容