🏗️ 整体架构:单线程事件驱动
🎯 核心设计思想
Redis采用单线程 + 事件循环模式,通过I/O多路复用监听多个客户端连接,所有命令在单线程中串行执行,避免了锁竞争和上下文切换开销。
🔄 Reactor模式
Redis实现了经典的Reactor模式:注册感兴趣的事件类型和回调函数,当事件就绪时由事件分发器调用相应处理函数。
🌐 两类事件
文件事件:Socket读写就绪
时间事件:定时任务(如serverCron)
🔁 事件循环处理流程
📦 核心数据结构
aeEventLoop - 事件循环核心结构
aeFileEvent - 文件事件
typedef struct aeFileEvent {
int mask; // AE_READABLE | AE_WRITABLE
aeFileProc *rfileProc; // 读回调函数
aeFileProc *wfileProc; // 写回调函数
void *clientData; // 客户端私有数据
} aeFileEvent;
aeFiredEvent - 已触发事件
typedef struct aeFiredEvent {
int fd; // 触发的文件描述符
int mask; // 事件掩码
} aeFiredEvent;
aeTimeEvent - 时间事件
typedef struct aeTimeEvent {
long long id; // 事件ID
monotime when; // 触发时间
aeTimeProc *timeProc; // 处理函数
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
} aeTimeEvent;
🔄 事件循环 aeProcessEvents 源码
📄 src/ae.c - 事件处理核心函数
C/* aeProcessEvents - 处理所有待执行的事件 */ /* flags: AE_FILE_EVENTS | AE_TIME_EVENTS | AE_DONT_WAIT */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0; aeFileEvent *fe; aeTimeEvent *te; long long maxId; /* === 步骤1: 计算阻塞等待时间 === */ long long tv = AE_DONT_WAIT; long tvp = AE_DONT_WAIT; if (!(flags & AE_DONT_WAIT)) { if (flags & AE_TIME_EVENTS) tv = usecUntilTimer(eventLoop); // 计算到下次定时器的时间 if (tv != AE_DONT_WAIT && tv < 0) tv = 0; } /* === 步骤2: 调用 beforesleep 回调 === */ if (eventLoop->beforesleep != NULL) eventLoop->beforesleep(eventLoop); /* === 步骤3: 等待文件事件 === */ int numevents = aeApiPoll(eventLoop, tvp); /* === 步骤4: 处理已触发的事件 === */ for (int j = 0; j < numevents; j++) { aeFileEvent *e = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; /* 正常顺序: 先读后写 */ if (e->mask & mask & AE_READABLE) { rfileProc(eventLoop, e->clientData, fd); // 调用读处理函数 } if (e->mask & mask & AE_WRITABLE) { wfileProc(eventLoop, e->clientData, fd); // 调用写处理函数 } processed++; } /* === 步骤5: 处理时间事件 === */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; }
📝 关键点解析
1. 时间计算:只有设置了AE_TIME_EVENTS才计算定时器剩余时间,用于设置阻塞超时
2. beforesleep:在等待前调用,用于处理阻塞期间的紧急任务
3. 事件处理:先处理文件事件,再处理时间事件;读写处理取决于AE_BARRIER标志
🎯 aeMain 主循环
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { aeProcessEvents(eventLoop, AE_FILE_EVENTS | AE_TIME_EVENTS | AE_DONT_WAIT); } }
🔌 多路复用器对比
| 特性 | epoll (Linux) | kqueue (macOS/BSD) | select (跨平台) |
|---|---|---|---|
| 时间复杂度 | O(1) - 只返回活跃fd | O(1) - 只返回活跃fd | O(n) - 需遍历所有fd |
| 最大连接数 | 无硬性限制 (~10万+) | 无硬性限制 | FD_SETSIZE (默认1024) |
| 内存拷贝 | 只拷贝fd信息 | 只拷贝fd信息 | 每次全量fd_set拷贝 |
| 触发模式 | LT + ET | LT + ET | 仅LT |
| Redis优先级 | 第2优先级 | 第1优先级 (macOS) | 最后兜底 |
Solaris独有的事件端口,性能最优,Redis优先级最高
Linux 2.6+内核,红黑树管理fd,时间复杂度O(1)
跨平台兜底方案,select有连接数限制
⚡ epoll 源码实现
📄 src/ae_epoll.c - Linux epoll 封装
C/* === aeApiCreate: 创建 epoll 实例 === */ static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); // 创建 epoll 实例,1024是提示值非限制 state->epfd = epoll_create(1024); anetCloexec(state->epfd); // 设置O_CLOEXEC // 分配事件数组 state->events = zmalloc(sizeof(struct epoll_event) * eventLoop->setsize); eventLoop->apidata = state; return 0; } /* === aeApiPoll: 等待事件 === */ /* 关键:将 epoll 事件转换为 Redis 事件 */ static int aeApiPoll(aeEventLoop *eventLoop, long tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; // 调用 epoll_wait 等待事件 retval = epoll_wait(state->epfd, state->events, eventLoop->nevents, tvp); if (retval > 0) { numevents = retval; for (int j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = &state->events[j]; // EPOLLIN → AE_READABLE if (e->events & EPOLLIN) mask |= AE_READABLE; // EPOLLOUT → AE_WRITABLE if (e->events & EPOLLOUT) mask |= AE_WRITABLE; // 错误/挂起也标记为可读写 if (e->events & EPOLLERR | e->events & EPOLLHUP) mask |= AE_READABLE | AE_WRITABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } return numevents; } /* === aeApiAddEvent: 添加/修改监听 === */ /* 智能判断: 新fd用ADD,已有fd用MOD */ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; // 消除valgrind警告 int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; // 合并新旧事件掩码 int events = mask | eventLoop->events[fd].mask; // Redis掩码 → epoll事件 if (mask & AE_READABLE) events |= EPOLLIN; if (mask & AE_WRITABLE) events |= EPOLLOUT; ee.events = events; ee.data.fd = fd; if (epoll_ctl(state->epfd, op, fd, &ee) == -1) return -1; return 0; }
📊 select 源码实现(对比学习)
📄 src/ae_select.c - select 封装(性能较差)
C/* === aeApiState: select需要保存两份fd_set === */ /* 原因: select返回后原fd_set会被内核修改 */ typedef struct { fd_set rfds, wfds; // 当前监听的读写集合 fd_set _rfds, _wfds; // 备份,select前拷贝 } aeApiState; /* === aeApiPoll: select 实现 === */ /* 性能痛点: 每次都要memcpy + O(n)遍历 */ static int aeApiPoll(aeEventLoop *eventLoop, long tvp) { aeApiState *state = eventLoop->apidata; int retval, i, count = 0; // 拷贝fd_set (用户态↔内核态 数据传输) memcpy(&state->_rfds, &state->rfds, sizeof(fd_set)); memcpy(&state->_wfds, &state->wfds, sizeof(fd_set)); long sec = tvp / 1000; long usec = (tvp % 1000) * 1000; struct timeval tv = {sec, usec}; // 调用 select (maxfd+1,因为fd从0开始) retval = select(eventLoop->maxfd + 1, &state->_rfds, &state->_wfds, NULL, &tv); if (retval > 0) { // O(n) 遍历: 必须检查所有fd for (i = 0; i <= eventLoop->maxfd; i++) { int mask = 0; aeFileEvent *fe = &eventLoop->events[i]; if (fe->mask == AE_NONE) continue; // FD_ISSET 逐个检查 if (FD_ISSET(i, &state->_rfds)) mask |= AE_READABLE; if (FD_ISSET(i, &state->_wfds)) mask |= AE_WRITABLE; if (mask) { eventLoop->fired[count].fd = i; eventLoop->fired[count++].mask = mask; } } } return count; } /* === 连接数限制 === */ /* select 默认 FD_SETSIZE = 1024 */ static int aeApiResize(aeEventLoop *eventLoop, int setsize) { if (setsize >= FD_SETSIZE) return AE_ERR; /* ... 重新分配 fd_set ... */ return 0; }
⚠️ select 的性能瓶颈
1. 全量拷贝:每次调用select,fd_set需要从用户态拷贝到内核态,即使没有任何变化
2. O(n) 遍历:select返回后,必须从0到maxfd逐个FD_ISSET检查哪个fd有事件
3. 连接数限制:FD_SETSIZE通常为1024,无法支撑高并发场景
4. 可重入性差:select返回后fd_set已被修改,需要下次调用前重新设置
🎮 交互演示:epoll vs select
客户端连接状态
epoll 红黑树结构
事件日志
🔴 epoll 工作流程
1. 注册:epoll_ctl(ADD, fd) 将fd加入红黑树
2. 等待:epoll_wait 阻塞,直到有事件就绪
3. 返回:直接返回活跃fd列表(O(1))
4. 处理:只遍历活跃的fd
🟠 select 工作流程
1. 设置:每次调用前重新FD_SET所有fd
2. 拷贝:fd_set 用户态→内核态
3. 遍历:内核遍历所有fd检查状态
4. 返回:返回总数,需要FD_ISSET逐个检查
📝 核心要点总结
单线程高效
避免了锁竞争和上下文切换,通过事件驱动实现高并发
多路复用
自动选择最优I/O模型:evport > epoll > kqueue > select
Reactor模式
注册事件和回调,事件就绪时自动分发处理
O(1) 效率
epoll只返回活跃fd,避免了全量遍历的开销
📄 src/server.c - Redis主循环入口
C/* Redis 服务器主循环 */ void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; serverLog(LL_NOTICE, "Redis ready to accept connections"); /* 事件循环 - 永不退出 */ while (!eventLoop->stop) { /* 处理文件事件和时间事件 */ aeProcessEvents(eventLoop, AE_FILE_EVENTS | // 处理客户端命令 AE_TIME_EVENTS | // 处理定时任务 AE_DONT_WAIT); // 控制阻塞时间 } } /* main() 中初始化事件循环 */ /* aeCreateEventLoop(setsize) - setsize默认10032 */