tiny-redis 项目可能的问题

csdn推荐

事件循环怎么实现的

首先我将连接包装为一个 Connect 类,它包含了 socket fd,读写缓冲区,连接状态(这个连接是发送数据还是接收数据)等成员属性

我会在全局维护一个从 socket fd 到它对应的 Connect 对象指针的 map,叫做 fd2conn

在 main 函数中首先 socket、bind、listen 创建一个 listen fd,然后创建一个 vector poll_args,接着进入事件循环(一个死循环)

在事件循环中首先将 poll_args 清空,然后压入 listen fd 对应的 pollfd;然后遍历 fd2conn,根据每个 Connect 对象的状态(发送状态还是接收状态)决定 pollfd 中要监听的事件(POLLIN | POLLOUT),把 fd2conn 中的每一项都压入 poll_args 数组,然后调用 poll,传入 poll_args 监听这些文件描述符

等到 poll 返回后,再遍历 poll_args,根据其 fd 在 fd2conn 中找到对应的 Connect 对象,执行 IO 操作。如果 IO 操作结束后 Connect 的状态变为 END,则关闭这个连接并从 map 中删除

最后再判断一下 0 号 poll_args 是否有就绪事件(listen fd),如果有,则说明有新连接到来,再去调用 accept 函数创建新的连接。accept 的结果会在 fd2conn 这个 map 中放入对应的 socket fd 和 Conn 对象

连接定时器是如何实现的

连接定时器是通过双向循环链表串起来的,我在 Connect 结构中还会维护一个 us 级的时间戳 idle_start,它的意义是“上一次操作这个连接的时刻”,也就是开始空闲的时刻,在 Connect 的构造函数里初始化为当前时刻,构造完成后把它挂到链表末尾

idle_start 在每一次处理 IO 时都会被刷新为当前时刻(说明它当前活跃),然后从链表中取下来,再挂到末尾,这样就保证了双向链表自动是递增有序的,这个序就是每个连接的空闲开始时刻,越位于前面的,空闲开始的越早,越可能超时

在每一次 poll 之前,我都去从链表首部获取最小的那个时间戳,这意味着这个连接空闲开始的最早,拥有最小的超时时刻,把这个时间戳加上一个 timeout 值,例如 5ms,然后求这个值减去当前时刻,就是需要最小阻塞等待的时长,然后 poll 的第三个超时参数传入这个值即可

在每一次事件循环的末尾,我都会去从前往后处理这个双向链表管理的定时器:如果空闲开始时间戳 + timeout < 当前时刻,说明超时了,就把这个定时器取下,然后关闭这个连接。直到遇到一个不超时的连接,此时退出循环即可,因为链表是自增有序的

支持哪些命令

set k1 v1
get k1
del k1
keys
# zadd zset_name score name
zadd zs1 1.0 abc
zadd zs1 1.5 def
zrem zs1 def
zscore zs1 def
zquery zs1 1.5 def $offset $limit
pexpire k1 5000
pttl k1

解释客户端命令发送到服务端的过程

客户端从命令行参数获取到 “set k1 v1” 这个命令,然后调用 socket、connect 建立连接;服务端在本轮事件循环末尾 accept 这个连接,将它放入全局的 fd2conn 的 map 中,设置 idle_start 为当前时刻,把它插入 idle_list 链表的末尾

客户端建立连接后,紧接着调用 write 发送数据,也就是 “set k1 v1” 这个字符串;服务端这时已经进入到下一轮事件循环,在这轮循环中,poll 监听到 fd 可读,返回,然后轮询 poll_args,轮询到该客户端 fd 的可读,进而处理该客户端发来的数据

解释 set 命令的执行过程

客户端发来 “set k1 v1”

服务端首先从 fd 读取数据到 read buffer 中,然后将这3个单词解析到一个 vector,然后根据第一个单词是 set,进入到 do_set 函数

在 do_set 中首先去全局的 hashmap 中查找对应的 key 是否存在,查找的过程是在2个 hashtable 都查询(因为渐进式哈希用到 2 个哈希表),在 hashtable 中查询的过程是先根据哈希值 % 哈希桶长度定位到哈希桶(实际上哈希桶长度永远是 2 ^ n,故只需要 hashcode & (n - 1) 即可),然后遍历哈希桶后面的链表,如果存在 hashcode 一样且判等函数返回 true 的节点,则返回该节点

如果在全局 hashmap 中查到了 k1 已经存在,则将它对应的 value 改为 v1;否则新建一个 Entry,把它的挂到全局 hashmap

HashMap 怎么实现的

// hashtable node, should be embedded into the payload
struct HNode {
    HNode *next = NULL;
    uint64_t hcode = 0;
};
// a simple fixed-sized hashtable
struct HTab {
    HNode **tab = NULL;
    size_t mask = 0;
    size_t size = 0;
};
// the real hashtable interface.
// it uses 2 hashtables for progressive resizing.
struct HMap {
    HTab ht1;   // newer
    HTab ht2;   // older
    size_t resizing_pos = 0;
};

HashMap 也采用侵入式容器,我采用拉链法解决哈希冲突,每个链表节点(HNode)只存储一个 hashcode 和指向下一个节点的指针,并不存储真正的 key、value 数据

然后定义一个 HTable 结构,它有3个成员:HNode* 的数组 tab;数组长度 - 1 的 mask,因为限制了数组长度只能为 2 ^ n,所以定位哈希桶时就避免了 mod 运算;还有标志着当前哈希表总节点数量的 size

最后定义真正的 HMap 结构,它采用渐进式哈希的策略,拥有 2 个 HTable,还有一个标志着 rehash 进度位置的 pos 域

解释 HashMap 的插入过程

insert() 的参数是 HNode*,一个 HashMap 会管理2个哈希桶,ht1 和 ht2,插入操作都是在 ht1 中进行

插入 ht1 的过程是先用 hnode 的 hashcode & mask 求得哈希桶的位置,然后执行单链表头插,最后 size++

在 ht1 中插入之后判断 ht2 是否为 null,若为 null,再判断 ht1 的负载因子是否过高,如果过高,则将 ht1 转移给 ht2,ht1.resize,然后设置 HashMap 的 resizing_pos = 0,这个过程可以看作是开启 rehash 过程

解释 rehash 的过程

渐进式 rehash 就是不一次性 rehash 全部节点,而是只在每一步操作 HashTable 的过程中 rehash 固定数目的节点

rehash 开始的标志是在向 ht1 中插入一个节点后,判断 ht2 为空且负载因子 > 8,此时将 ht1 转移给 ht2,ht1.resize,然后设置 HashMap 的 resizing_pos = 0,resize_pos 表示在 ht2 中 rehash 的进度

然后在每一次操作哈希表的函数中,包括查询,新增,删除,都会执行一次 rehash 函数,它会循环执行 128 次:从 ht2 的哈希桶的 resizing_pos 位置摘下头节点,将其 detach 下来,再插入到 ht1 的 HashTable。直到循环次数满或 ht2 被摘空

一旦 ht2 被摘空,就意味着 rehash 终止(因为他是作为 rehash 的循环条件之一)

rehash 的过程中,对哈希表的插入照常在 ht1 中执行,且只有在 ht2 被摘空后,才会再次计算负载因子,开始下一次 rehash

解释 HashMap 的删除过程

pop 传入的是一个 HNode* key 和一个判等函数 eq。首先按需执行 rehash,结束后先在 ht1 中 lookup:先根据 key 中的 hcode & mask 定位到哈希桶,然后在它后面的单链表中遍历查找,遇到 hcode 一致且 eq 函数返回 true 的节点,将其返回

如果在 ht1 中查找结果不为空,则将其 detach 返回(从链表中摘下);否则在 ht2 中执行同样的步骤

如果 ht1 和 ht2 查找都为空,说明待删除的节点不存在,返回 null

哈希冲突你怎么解决的

我会在每一个涉及到在 HashTab 中查找操作的函数都传入一个判等函数 eq,它的参数是2个 HNode*,用于真正判断这两个节点是否相等,例如当使用 HashMap 存储 KV 时,它会根据 HNode* 调用 container_of,找到对应的 Entry 结构(真正存储 string KV)的,然后判断 string key 是否相等

HashMap 如何是如何管理 KV 的

struct Entry {
    HNode node;
    std::string key;
    std::string val;
    uint32_t type = 0;
    ZSet* zset = NULL;
    size_t heap_idx = -1;
    Entry(const std::string& k) {
        key = k;
        node.hcode = str_hash(k);
    }
};

我定义了一个 Entry 结构,它包含 string 类型的 KV,同时包含了一个 HNode 类型(就是哈希表中单链表的节点类型)

HNode 中只有一个 hachode 和指向下一个 HNode 的指针,hashcode 在 Entry 构造时被初始化

这样,查找某个 string key 时,只需要将 key 构造为 Entry(栈内存),然后把这个 Entry 的 hnode 交给 HMap 的 lookup 方法查找即可。lookup 返回的是一个 HNode*,就是真正存储了这个 key 的 hashcode 的 HNode,然后使用 container_of 即可获得包裹了这个 HNode 的 Entry 指针

set k v 时,也是先将 key 构造为 Entry,然后把这个 Entry 的 hnode 交给 HMap 的 lookup 方法查找。如果返回 NULL,就说明这对 KV 不存在,需要 new Entry,然后将它的 hnode 成员插入全局的 HashMap 中;否则就使用 container_of 定位到包含这个 hnode 的 Entry,然后修改它的 value

换句话说,KV 数据只是被简单地存储在堆内存中,将它的 HNode 域挂到 HashMap 上进行管理和查找

哈希函数如何实现的

哈希函数的作用是将 string 转化为 int64,采用的策略是先初始化一个较大的素数h,然后不断地将字符串中的字节数据累加到 h 上,并乘以一个魔法值,最终得到一个 int64 的魔法值

解释 get 命令的执行过程

客户端发来“get k”命令,服务器将其解析为 vector,判断其第一个单词是 get,调用 do_get 函数进行处理

在 do_get 中,首先用发来的 k 构造一个 Entry(初始化 Entry 中 HNode 的 hcode),然后将这个 Entry 的 HNode 丢给 HashMap 的 lookup 方法进行查找(和判等函数),如果未找到,序列化一个 nil 给客户端

如果找到节点,调用 container_of,定位到包裹这个 HNode 的 Entry 对象指针,获取其 string val 字段,序列化给客户端

解释 del 命令的执行过程(异步)

客户端发来“del k”命令,服务器将其解析为 vector,判断其第一个单词是 del,调用 do_del 函数进行处理

在 do_del 中,首先用发来的 k 构造一个 Entry(初始化 Entry 中 HNode 的 hcode),然后将这个 Entry 的 HNode 丢给 HashMap 的 pop 方法,在 HashMap 中删除这个 HNode

如果这个 HNode 在哈希表中存在,则使用 container_of 找到包裹该 HNode 的对象指针,执行删除

在删除函数中,判断该对象是否为 ZSet,如果是,再判断它是否 too_big,如果是,将其压入线程池的任务队列执行异步删除;否则执行同步删除即可(因为线程调度是有开销的,所以只有对大块数据才执行异步删除)

解释 keys 命令的执行过程

使用 h_scan 函数扫描哈希桶,然后在扫描每个哈希桶后面的单链表,在每个节点上调用 container_of 获取包裹这个 HNode 的节点,然后获取它的 key 值,序列化给客户端

解释一下侵入式数据结构(以双向链表为例)

侵入式双向链表节点只包含 prev 和 next 两个指针,没有数据域。然后基于这个节点定义实现链表的操作,比如 insert 和 detach

对比传统容器,将被容器管理的对象“塞入”容器(也就是 STL 的风格);侵入式容器需要在被管理的对象内增加一个数据域,也就是链表节点,相当于被管理的对象包裹住了链表节点

我在项目中是用双向链表管理连接对象 Connect 的,所以我的 Connect 类里面有一个 DList 类型的数据成员 idle_list(因为链表管理的是 Connect 的超时时长),向容器中插入对象就是 new Connect,然后将这个对象的链表节点数据域挂到链表上;从容器中删除对象就是将这个对象的链表节点从链表上摘除,然后 delete 这个对象

侵入式容器的工作过程需要一个 container_of 指针计算函数,一般用宏函数实现(因为涉及到传入类型、字段名作为参数,不符合 C++ 的函数语法)

它的参数有3个:链表节点指针,包裹链表节点的数据类型(也就是 Connect 类型),链表节点在被管理对象中的字段名字(也就是 idle_list)

它的功能是根据这三者定位到包裹链表节点的数据对象的指针

它的原理是用链表节点指针减去链表节点数据域在整个 Connect 对象中的 offset,就得到了 Connect 对象的地址

这个 container_of 函数在所有的侵入式容器中都是通用的

怎么用 HashMap + AVLTree 实现的 ZSet

struct AVLNode {
    uint32_t depth;
    uint32_t cnt;
    AVLNode *left;
    AVLNode *right;
    AVLNode *parent;
    AVLNode()
        : depth(1), cnt(1), left(nullptr), right(nullptr), parent(nullptr) {}
    
    void init() {
        depth = cnt = 1;
        left = right = parent = nullptr;
    }
    void update();
    AVLNode *rot_left();
    AVLNode *rot_right();
    AVLNode *fix_left();
    AVLNode *fix_right();
};
AVLNode *avl_fix(AVLNode *node);
AVLNode *avl_del(AVLNode *node);
AVLNode *avl_offset(AVLNode *node, int64_t offset);
struct ZNode {
    AVLNode tree;
    HNode hmap;
    double score = 0;
    std::string name;
    ZNode(std::string name, double score) {
        this->name = name;
        hmap.hcode = str_hash(name);
        this->score = score;
    }
    ZNode* offset(int64_t offset) {
        auto tnode = avl_offset(&tree, offset);
        return tnode ? container_of(tnode, ZNode, tree) : nullptr;
    }
};
struct ZSet {
    AVLNode* tree = NULL;
    HMap hmap;
public:
    bool add(const std::string& name, double score);
    ZNode* look_up(const std::string& name);
    ZNode* pop(const std::string& name);
    ZNode* query(double score, const std::string& name);
    ~ZSet();
private:
    void tree_add(ZNode* node);
    void update(ZNode* node, double score);
};

解释 zadd 命令的执行过程

// the structure for the key
struct Entry {
    HNode node;
    std::string key;
    std::string val;
    uint32_t type = 0;
    ZSet* zset = NULL;
    size_t heap_idx = -1;
    Entry(const std::string& k) {
        key = k;
        node.hcode = str_hash(k);
    }
};

在 Entry 中存储了一个 type 用于区分 key 代表的是 KV 键值对还是 ZSet

在 do_zadd 函数中,首先使用 zset 的名字构造一个 Entry(初始化它的 HNode 中的 hcode),然后在全局的 HashMap 中查找这个 Entry 中的 HNode。如果未找到,则 new 一个 Entry,设置它的 type 为 ZSET,并将它的 HNode 插入到 HashMap;否则如果能找到,则使用 container_of 找到包裹这个 HNode 的 Entry*,判定其为 ZSET

ZSet 有一个 AVL 树结构和一个 HMap,前者用于存储 score,后者用于存储 name;还有增加、查询、删除、范围查询方法

然后执行 ZSet 的 add 方法,它的参数是 (name, score)

在 add 方法中,首先在 ZSet 的 HMap 中查询 name 是否存在,若存在,则执行更新操作:将 ZNode 中的 AVLNode 节点从 AVL 树上摘除,将 score 改为新值,再插入到 AVL 树;否则说明 name 不存在,则 new ZNode,将它的 name 插入到当前 ZSet 持有的 HMap,再把当前节点的 AVLNode 成员挂到 AVL 树上

解释 zrem 的执行过程

zrem zs1 kk1

首先在全局的 HashMap 中根据 zset_name 查找对应的 Entry 是否存在,若存在,再判定其 type == ZSET

然后执行 ZSet 的 pop 方法:首先根据 name 构造一个 HKey(类似于 Entry),在当前 ZSet 管理的 HMap 中 pop

若 pop 返回非 null,说明该节点存在,则使用 container_of 找到包裹它的 ZNode,再将它的 AVLNode tree 从树上删除

解释 zscore 的执行过程

zscore zs1 kk1

首先在全局的 HashMap 中根据 zset_name 查找对应的 Entry 是否存在,若存在,再判定其 type == ZSET

然后执行 ZSet 的 lookup 方法:根据 name 构造一个 HKey(类似于 Entry),在当前 ZSet 管理的 HMap 中 lookup,最后使用 container_of 返回包裹它的 ZNode

如果返回非 null,则解引用 ZNode 的 score 字段将其作为 double 序列化给客户端;否则序列化给客户端 nil

解释 zquery 的执行过程(范围查询) 小顶堆是怎么实现的

// heap.h
struct HeapItem {
    uint64_t val = 0;
    size_t *ref = NULL;
};
// 节点上滤或下滤
void heap_update(HeapItem *a, size_t pos, size_t len);
// 在 server.cpp 中定义一个全局的 heap
std::vector<HeapItem> heap;

由于堆在逻辑上是一颗完全二叉树,所以可以借助一个数组来实现

解释 pexpire 的执行过程(如何使用小顶堆实现 TTL)

pexpire kk1 5000

pexpire 命令的第二个参数为一个 int 类型的毫秒值,代表过期时长

在 Entry 中,存储了一个 heap_idx,作为堆的数组下标

在 do_expire 中,首先根据传入的 kk1 构造一个 Entry,将其 HNode 丢给全局的 HMap 的 lookup 查找,如果存在,再调用 container_of 找到包裹这个 HNode 的 Entry 对象指针,再将这个指针和 ttl_ms 传入 entry_set_ttl 设置 TTL

在 entry_set_ttl 中,首先根据 ent->heap_idx 定位到堆中的 HeapItem,如果 pos = heap_idx == -1,则需要新建一个 HeapItem,让其 ref 指向 ent->heap_idx,然后将其 pushback 进堆末尾,设置 pos = heap.size() - 1

然后 heap[pos].val = get_monotonic_usec() + (uint64_t)ttl_ms * 1000;,再调整堆

解释 pttl 的执行过程

pttl kk1

在 Entry 中,存储了一个 heap_idx,作为堆的数组下标

在 do_ttl 中,首先根据传入的 kk1 构造一个 Entry,将其 HNode 丢给全局的 HMap 的 lookup 查找,如果存在,再调用 container_of 找到包裹这个 HNode 的 Entry 对象指针 ent

然后根据 ent->heap_idx 在堆中定位到 HeapItem,读出它的 val 字段,也就是过期时刻 expire_at

然后用这个时刻减去当前时刻,就是 ttl,将它序列化给客户端即可

过期 key 是怎么实现自动删除的

我会在每个事件循环的末尾处理定时器,包括连接的定时器还有堆定时器

在处理堆定时器时,因为它是小顶堆,所以堆顶的元素是过期时刻最小的,是最可能超时的

while 循环判断堆顶定时器超时时刻是否小于当前时刻,如果小于,则在全局的 HMap 中将这个 key pop 掉,然后在 entry_set_ttl 中将其 ttl 设置为 -1,一旦设置为 -1,就会被从堆中删除

数据序列化和反序列化如何实现的

tiny-redis 支持多种数据类型的序列化,比如 nil、str、int64、double、err 等

先定义一系列表示数据类型的枚举

enum {
    SER_NIL = 0,
    SER_ERR = 1,
    SER_STR = 2,
    SER_INT = 3,
    SER_DBL = 4,
    SER_ARR = 5,
};

序列化时首先 push_back 数据类型枚举值,然后 append 进真正需要序列化的值,对于字符串需要先 append 进字符串长度

static void out_nil(std::string &out) {
    out.push_back(SER_NIL);
}
static void out_str(std::string &out, const std::string &val) {
    out.push_back(SER_STR);
    uint32_t len = (uint32_t)val.size();
    out.append((char *)&len, 4);
    out.append(val);
}
static void out_int(std::string &out, int64_t val) {
    out.push_back(SER_INT);
    out.append((char *)&val, 8);
}
static void out_dbl(std::string &out, double val) {
    out.push_back(SER_DBL);
    out.append((char *)&val, 8);
}
static void out_err(std::string &out, int32_t code, const std::string &msg) {
    out.push_back(SER_ERR);
    out.append((char *)&code, 4);
    uint32_t len = (uint32_t)msg.size();
    out.append((char *)&len, 4);
    out.append(msg);
}

客户端遵循同样的协议进行反序列化:先读取数据类型,然后读取数据内容

线程池是怎么实现的

struct Work {
    void (*f)(void *) = NULL;
    void *arg = NULL;
};
struct ThreadPool {
    std::vector<pthread_t> threads;
    std::deque<Work> queue;
    pthread_mutex_t mu;
    pthread_cond_t not_empty;
    ThreadPool(size num_threads);
    void enqueue(void (*f)(void *), void *arg);
}

线程池中包含如下的成员:

创建线程池时 pthread_create() 创建指定数目的线程,并初始化锁和条件变量

向任务队列添加任务时,先获取锁,然后 push_back,然后唤醒条件变量,最后解锁

每个任务是一个死循环:先解锁,判断任务队列是否为空,若为空则循环等待条件变量;否则从任务队列中取出任务,然后解锁,最后执行任务

压测结果如何

文章来源:https://blog.csdn.net/DanielSYC/article/details/139807968



微信扫描下方的二维码阅读本文

© 版权声明
THE END
喜欢就支持一下吧
点赞10 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容