Redis IO模型与多路复用实现

从源码层面深入剖析Redis如何利用事件驱动 + I/O多路复用实现单线程高性能

Redis 7.x Unstable C Language Source Code Analysis

🏗️ 整体架构:单线程事件驱动

应用层:Redis Server(单线程)
事件库:ae.c(跨平台抽象层)
多路复用:epoll / kqueue / select
操作系统内核

🎯 核心设计思想

Redis采用单线程 + 事件循环模式,通过I/O多路复用监听多个客户端连接,所有命令在单线程中串行执行,避免了锁竞争和上下文切换开销。

🔄 Reactor模式

Redis实现了经典的Reactor模式:注册感兴趣的事件类型和回调函数,当事件就绪时由事件分发器调用相应处理函数。

🌐 两类事件

文件事件:Socket读写就绪
时间事件:定时任务(如serverCron)

🔁 事件循环处理流程

等待请求
有事件就绪?
处理文件事件
处理时间事件
返回继续等待

📦 核心数据结构

aeEventLoop - 事件循环核心结构

struct aeEventLoop
int maxfd int 当前已注册的最大文件描述符
int setsize int 跟踪的最大fd数量(默认10032)
aeFileEvent *events 指针 文件事件数组,按fd索引
aeFiredEvent *fired 指针 已触发事件数组
aeTimeEvent *timeEventHead 指针 时间事件链表头
void *apidata void* 底层多路复用器的私有数据
aeBeforeSleepProc *beforesleep 函数指针 休眠前回调

aeFileEvent - 文件事件

typedef struct aeFileEvent {
    int mask;              // AE_READABLE | AE_WRITABLE
    aeFileProc *rfileProc; // 读回调函数
    aeFileProc *wfileProc; // 写回调函数
    void *clientData;      // 客户端私有数据
} aeFileEvent;

aeFiredEvent - 已触发事件

typedef struct aeFiredEvent {
    int fd;   // 触发的文件描述符
    int mask; // 事件掩码
} aeFiredEvent;

aeTimeEvent - 时间事件

typedef struct aeTimeEvent {
    long long id;          // 事件ID
    monotime when;         // 触发时间
    aeTimeProc *timeProc;  // 处理函数
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
} aeTimeEvent;

🔄 事件循环 aeProcessEvents 源码

📄 src/ae.c - 事件处理核心函数

C
/* aeProcessEvents - 处理所有待执行的事件 */
/* flags: AE_FILE_EVENTS | AE_TIME_EVENTS | AE_DONT_WAIT */
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
    int processed = 0;
    aeFileEvent *fe;
    aeTimeEvent *te;
    long long maxId;

    /* === 步骤1: 计算阻塞等待时间 === */
    long long tv = AE_DONT_WAIT;
    long tvp = AE_DONT_WAIT;
    if (!(flags & AE_DONT_WAIT)) {
        if (flags & AE_TIME_EVENTS)
            tv = usecUntilTimer(eventLoop);  // 计算到下次定时器的时间
        if (tv != AE_DONT_WAIT && tv < 0) tv = 0;
    }

    /* === 步骤2: 调用 beforesleep 回调 === */
    if (eventLoop->beforesleep != NULL)
        eventLoop->beforesleep(eventLoop);

    /* === 步骤3: 等待文件事件 === */
    int numevents = aeApiPoll(eventLoop, tvp);

    /* === 步骤4: 处理已触发的事件 === */
    for (int j = 0; j < numevents; j++) {
        aeFileEvent *e = &eventLoop->events[eventLoop->fired[j].fd];
        int mask = eventLoop->fired[j].mask;
        int fd = eventLoop->fired[j].fd;

        /* 正常顺序: 先读后写 */
        if (e->mask & mask & AE_READABLE) {
            rfileProc(eventLoop, e->clientData, fd);  // 调用读处理函数
        }
        if (e->mask & mask & AE_WRITABLE) {
            wfileProc(eventLoop, e->clientData, fd);  // 调用写处理函数
        }
        processed++;
    }

    /* === 步骤5: 处理时间事件 === */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed;
}

📝 关键点解析

1. 时间计算:只有设置了AE_TIME_EVENTS才计算定时器剩余时间,用于设置阻塞超时

2. beforesleep:在等待前调用,用于处理阻塞期间的紧急任务

3. 事件处理:先处理文件事件,再处理时间事件;读写处理取决于AE_BARRIER标志

🎯 aeMain 主循环

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, 
            AE_FILE_EVENTS | 
            AE_TIME_EVENTS |
            AE_DONT_WAIT);
    }
}

🔌 多路复用器对比

特性 epoll (Linux) kqueue (macOS/BSD) select (跨平台)
时间复杂度 O(1) - 只返回活跃fd O(1) - 只返回活跃fd O(n) - 需遍历所有fd
最大连接数 无硬性限制 (~10万+) 无硬性限制 FD_SETSIZE (默认1024)
内存拷贝 只拷贝fd信息 只拷贝fd信息 每次全量fd_set拷贝
触发模式 LT + ET LT + ET 仅LT
Redis优先级 第2优先级 第1优先级 (macOS) 最后兜底
evport (Solaris)

Solaris独有的事件端口,性能最优,Redis优先级最高

epoll (Linux)

Linux 2.6+内核,红黑树管理fd,时间复杂度O(1)

select / kqueue

跨平台兜底方案,select有连接数限制

epoll 源码实现

📄 src/ae_epoll.c - Linux epoll 封装

C
/* === aeApiCreate: 创建 epoll 实例 === */
static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));
    
    // 创建 epoll 实例,1024是提示值非限制
    state->epfd = epoll_create(1024);
    anetCloexec(state->epfd);  // 设置O_CLOEXEC
    
    // 分配事件数组
    state->events = zmalloc(sizeof(struct epoll_event) * eventLoop->setsize);
    eventLoop->apidata = state;
    return 0;
}

/* === aeApiPoll: 等待事件 === */
/* 关键:将 epoll 事件转换为 Redis 事件 */
static int aeApiPoll(aeEventLoop *eventLoop, long tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    
    // 调用 epoll_wait 等待事件
    retval = epoll_wait(state->epfd, state->events, 
                  eventLoop->nevents, tvp);
    
    if (retval > 0) {
        numevents = retval;
        for (int j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = &state->events[j];
            
            // EPOLLIN → AE_READABLE
            if (e->events & EPOLLIN) mask |= AE_READABLE;
            // EPOLLOUT → AE_WRITABLE
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            // 错误/挂起也标记为可读写
            if (e->events & EPOLLERR | e->events & EPOLLHUP)
                mask |= AE_READABLE | AE_WRITABLE;
            
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

/* === aeApiAddEvent: 添加/修改监听 === */
/* 智能判断: 新fd用ADD,已有fd用MOD */
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0};  // 消除valgrind警告
    
    int op = eventLoop->events[fd].mask == AE_NONE ?
           EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    
    // 合并新旧事件掩码
    int events = mask | eventLoop->events[fd].mask;
    
    // Redis掩码 → epoll事件
    if (mask & AE_READABLE) events |= EPOLLIN;
    if (mask & AE_WRITABLE) events |= EPOLLOUT;
    
    ee.events = events;
    ee.data.fd = fd;
    
    if (epoll_ctl(state->epfd, op, fd, &ee) == -1) return -1;
    return 0;
}

📊 select 源码实现(对比学习)

📄 src/ae_select.c - select 封装(性能较差)

C
/* === aeApiState: select需要保存两份fd_set === */
/* 原因: select返回后原fd_set会被内核修改 */
typedef struct {
    fd_set rfds, wfds;     // 当前监听的读写集合
    fd_set _rfds, _wfds;    // 备份,select前拷贝
} aeApiState;

/* === aeApiPoll: select 实现 === */
/* 性能痛点: 每次都要memcpy + O(n)遍历 */
static int aeApiPoll(aeEventLoop *eventLoop, long tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, i, count = 0;
    
    // 拷贝fd_set (用户态↔内核态 数据传输)
    memcpy(&state->_rfds, &state->rfds, sizeof(fd_set));
    memcpy(&state->_wfds, &state->wfds, sizeof(fd_set));
    
    long sec = tvp / 1000;
    long usec = (tvp % 1000) * 1000;
    struct timeval tv = {sec, usec};
    
    // 调用 select (maxfd+1,因为fd从0开始)
    retval = select(eventLoop->maxfd + 1, 
              &state->_rfds, &state->_wfds, NULL, &tv);
    
    if (retval > 0) {
        // O(n) 遍历: 必须检查所有fd
        for (i = 0; i <= eventLoop->maxfd; i++) {
            int mask = 0;
            aeFileEvent *fe = &eventLoop->events[i];
            
            if (fe->mask == AE_NONE) continue;
            
            // FD_ISSET 逐个检查
            if (FD_ISSET(i, &state->_rfds)) mask |= AE_READABLE;
            if (FD_ISSET(i, &state->_wfds)) mask |= AE_WRITABLE;
            
            if (mask) {
                eventLoop->fired[count].fd = i;
                eventLoop->fired[count++].mask = mask;
            }
        }
    }
    return count;
}

/* === 连接数限制 === */
/* select 默认 FD_SETSIZE = 1024 */
static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
    if (setsize >= FD_SETSIZE) return AE_ERR;
    /* ... 重新分配 fd_set ... */
    return 0;
}

⚠️ select 的性能瓶颈

1. 全量拷贝:每次调用select,fd_set需要从用户态拷贝到内核态,即使没有任何变化

2. O(n) 遍历:select返回后,必须从0到maxfd逐个FD_ISSET检查哪个fd有事件

3. 连接数限制:FD_SETSIZE通常为1024,无法支撑高并发场景

4. 可重入性差:select返回后fd_set已被修改,需要下次调用前重新设置

🎮 交互演示:epoll vs select

👇 点击下方按钮模拟事件处理

客户端连接状态

Client #1 - fd:3 - 等待中
Client #2 - fd:4 - 等待中
Client #3 - fd:5 - 等待中

epoll 红黑树结构

epoll fd:7
fd:3
fd:4
fd:5
事件日志
[00:00] 系统就绪,等待事件...

🔴 epoll 工作流程

1. 注册:epoll_ctl(ADD, fd) 将fd加入红黑树

2. 等待:epoll_wait 阻塞,直到有事件就绪

3. 返回:直接返回活跃fd列表(O(1))

4. 处理:只遍历活跃的fd

🟠 select 工作流程

1. 设置:每次调用前重新FD_SET所有fd

2. 拷贝:fd_set 用户态→内核态

3. 遍历:内核遍历所有fd检查状态

4. 返回:返回总数,需要FD_ISSET逐个检查

📝 核心要点总结

单线程高效

避免了锁竞争和上下文切换,通过事件驱动实现高并发

🔌

多路复用

自动选择最优I/O模型:evport > epoll > kqueue > select

🔄

Reactor模式

注册事件和回调,事件就绪时自动分发处理

📊

O(1) 效率

epoll只返回活跃fd,避免了全量遍历的开销

📄 src/server.c - Redis主循环入口

C
/* Redis 服务器主循环 */
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    serverLog(LL_NOTICE, "Redis ready to accept connections");
    
    /* 事件循环 - 永不退出 */
    while (!eventLoop->stop) {
        /* 处理文件事件和时间事件 */
        aeProcessEvents(eventLoop, 
            AE_FILE_EVENTS |   // 处理客户端命令
            AE_TIME_EVENTS |   // 处理定时任务
            AE_DONT_WAIT);    // 控制阻塞时间
    }
}

/* main() 中初始化事件循环 */
/* aeCreateEventLoop(setsize) - setsize默认10032 */