📌 ae.h — 宏定义与常量
ae.h 是整个事件库的"合同",定义了所有对外暴露的类型、宏和函数声明。读懂它就读懂了 ae 的全部能力边界。
ae.h
/* 文件事件类型掩码 */ #define AE_NONE 0 /* 未注册任何事件 */ #define AE_READABLE 1 /* fd 可读(有数据到来 / 新连接)*/ #define AE_WRITABLE 2 /* fd 可写(输出缓冲区有空间)*/ #define AE_BARRIER 4 /* 强制先写后读(AOF fsync 时用)*/ /* 事件类型标志 */ #define AE_FILE_EVENTS (1<<0) /* 处理文件事件 */ #define AE_TIME_EVENTS (1<<1) /* 处理时间事件 */ #define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) #define AE_DONT_WAIT (1<<2) /* 不阻塞,立即返回 */ #define AE_CALL_BEFORE_SLEEP (1<<3) #define AE_CALL_AFTER_SLEEP (1<<4) /* 时间事件返回值:返回 AE_NOMORE 表示一次性事件,执行后删除 */ #define AE_NOMORE -1 /* 已被标记删除的时间事件 ID */ #define AE_DELETED_EVENT_ID -1
!
AE_BARRIER(值为 4)是一个特殊标志位。正常情况 ae 先处理读再处理写,设了 AE_BARRIER 后强制先写后读。Redis 在 AOF 每次写命令后需要先把响应发给客户端再 fsync,就靠这个标志控制顺序。
✓
AE_DONT_WAIT:把 epoll_wait 的 timeout 设为 0,让 ae 非阻塞轮询一次就立即返回。Redis 在某些内部调用(如 Cluster 处理)会用到这个模式。
🗂️ ae.h — 数据结构
ae.h — aeFileEvent
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); typedef struct aeFileEvent { int mask; /* AE_READABLE | AE_WRITABLE | AE_BARRIER */ aeFileProc *rfileProc; /* 可读时的回调函数指针 */ aeFileProc *wfileProc; /* 可写时的回调函数指针 */ void *clientData; /* 回调时透传的用户数据(通常是 client* 指针)*/ } aeFileEvent;
1
aeFileEvent 以 fd 为下标存在数组里:
eventLoop->events[fd] 就是这个 fd 的文件事件。注册时直接写数组,O(1) 存取,无需哈希表。2
rfileProc 在 Redis 中通常是 readQueryFromClient;wfileProc 通常是 sendReplyToClient。函数签名固定为 (eventLoop, fd, clientData, mask)。ae.h — aeFiredEvent
typedef struct aeFiredEvent { int fd; /* 就绪的文件描述符编号 */ int mask; /* 就绪类型:AE_READABLE / AE_WRITABLE */ } aeFiredEvent;
✓
这是临时数组
eventLoop->fired[]。每次 aeApiPoll()(即 epoll_wait)返回后,底层实现把就绪的 fd 和 mask 填入这里。aeProcessEvents 再遍历它,调用对应的 handler。下一轮直接覆盖,不需要清空。ae.h — aeTimeEvent
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); typedef struct aeTimeEvent { long long id; /* 唯一 ID,单调递增 */ monotime when; /* 触发时间点(绝对时间戳,ms)*/ aeTimeProc *timeProc; /* 触发回调,返回值决定是否重复 */ aeEventFinalizerProc *finalizerProc; /* 删除时的清理函数(可为 NULL)*/ void *clientData; struct aeTimeEvent *prev; /* 双向链表:前驱 */ struct aeTimeEvent *next; /* 双向链表:后继 */ int refcount; /* 防止在回调执行中被意外释放 */ } aeTimeEvent;
!
timeProc 返回值语义:返回
-1(AE_NOMORE)= 一次性事件,执行后自动删除;返回 N >= 0 = N 毫秒后再次触发(serverCron 返回 1000/server.hz,即约 100ms)。?
为什么用双向链表而不是堆/优先队列?因为 Redis 的时间事件极少(通常只有
serverCron 一个),链表足够。若事件数量多,最小堆会更高效。ae.h — aeEventLoop(核心结构体)
typedef struct aeEventLoop { int maxfd; /* 当前已注册的最大 fd 编号 */ int setsize; /* 能监听的 fd 总上限(= maxclients + 128)*/ long long timeEventNextId; /* 下一个时间事件的 ID */ aeFileEvent *events; /* 注册的文件事件数组,下标 = fd */ aeFiredEvent *fired; /* epoll_wait 返回后的就绪事件数组 */ aeTimeEvent *timeEventHead; /* 时间事件双向链表头节点 */ int stop; /* 置 1 时 aeMain 退出 */ void *apidata; /* 底层 IO 实现的私有数据(如 epoll fd)*/ aeBeforeSleepProc *beforesleep; /* epoll_wait 前的钩子 */ aeBeforeSleepProc *aftersleep; /* epoll_wait 后的钩子 */ int flags; /* AE_DONT_WAIT 等标志 */ } aeEventLoop;
内存布局关键点:
events[fd] 和 fired[i] 在初始化时一次性分配足够大的数组(大小 = setsize),之后不会 realloc。所以 Redis 如果 maxclients 设得很大,启动时就会占用相应内存。📋 ae.h — 对外 API 一览
ae.h — 函数声明
/* 生命周期 */ aeEventLoop *aeCreateEventLoop(int setsize); void aeDeleteEventLoop(aeEventLoop *eventLoop); void aeStop(aeEventLoop *eventLoop); /* 设 stop=1 */ /* 文件事件 */ int aeCreateFileEvent(aeEventLoop *el, int fd, int mask, aeFileProc *proc, void *clientData); void aeDeleteFileEvent(aeEventLoop *el, int fd, int mask); int aeGetFileEvents(aeEventLoop *el, int fd); /* 时间事件 */ long long aeCreateTimeEvent(aeEventLoop *el, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc); int aeDeleteTimeEvent(aeEventLoop *el, long long id); /* 事件处理 */ int aeProcessEvents(aeEventLoop *el, int flags); /* 处理一轮事件 */ void aeMain(aeEventLoop *el); /* 启动主循环 */ /* 工具 */ int aeWait(int fd, int mask, long long milliseconds); const char *aeGetApiName(void); /* 返回 "epoll"/"kqueue"/"select" */ void aeSetBeforeSleepProc(aeEventLoop *el, aeBeforeSleepProc *beforesleep); void aeSetAfterSleepProc(aeEventLoop *el, aeBeforeSleepProc *aftersleep);
🔨 ae.c — aeCreateEventLoop()
这是整个事件循环的构造函数。Redis 启动时(server.c 的 initServer)调用它,创建唯一的全局
server.el。ae.c — aeCreateEventLoop()
aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i; /* 分配 aeEventLoop 结构体本身 */ if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; /* 分配文件事件数组:events[fd] 直接按 fd 下标访问,O(1) */ eventLoop->events = zmalloc(sizeof(aeFileEvent) * setsize); /* 分配就绪事件数组:epoll_wait 返回后填充 */ eventLoop->fired = zmalloc(sizeof(aeFiredEvent) * setsize); if (!eventLoop->events || !eventLoop->fired) goto err; eventLoop->setsize = setsize; eventLoop->timeEventHead = NULL; /* 时间事件链表为空 */ eventLoop->timeEventNextId = 0; eventLoop->stop = 0; /* 不停止 */ eventLoop->maxfd = -1; /* 尚未注册任何 fd */ eventLoop->beforesleep = NULL; eventLoop->aftersleep = NULL; eventLoop->flags = 0; /* 调用底层 IO 初始化(Linux 下执行 epoll_create)*/ if (aeApiCreate(eventLoop) == -1) goto err; /* 初始化所有 fd 的掩码为 AE_NONE(未注册任何事件)*/ for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; err: if (eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); } return NULL; }
1
zmalloc(aeEventLoop)
分配控制结构体,Redis 使用自己的内存分配器 zmalloc(在 jemalloc 之上封装)
2
zmalloc(events[setsize]) + zmalloc(fired[setsize])
一次性分配足够大的数组。setsize = maxclients + 128(预留内部 fd)。这两个数组贯穿整个生命周期不会 realloc
3
aeApiCreate(eventLoop)
调用底层实现(Linux = ae_epoll.c),执行 epoll_create(1) 获得 epfd,保存在 apidata 里
4
初始化 events[i].mask = AE_NONE
把所有 fd 槽位标记为"未使用",这样 aeProcessEvents 可以跳过未注册的 fd
📁 ae.c — aeCreateFileEvent()
Redis 接受新连接、注册读写事件时调用这个函数。比如
acceptHandler 拿到新 fd 后会立刻调用 aeCreateFileEvent(el, fd, AE_READABLE, readQueryFromClient, c)。ae.c — aeCreateFileEvent()
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { /* 边界检查:fd 不能超过 setsize */ if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } aeFileEvent *fe = &eventLoop->events[fd]; /* O(1) 直接索引 */ /* 调用底层 API 在 epoll/kqueue 中注册这个 fd 的 mask */ if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; /* 用 OR 合并 mask:同一个 fd 可以同时监听读和写 */ fe->mask |= mask; /* 根据 mask 设置对应的回调函数 */ if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; /* 更新 maxfd,用于 aeApiPoll 的遍历上界 */ if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; }
✓
mask 用 OR 合并,这意味着你可以先注册 AE_READABLE,之后再追加 AE_WRITABLE,两次 aeCreateFileEvent 调用不会互相覆盖。aeDeleteFileEvent 同理用 AND NOT 来移除单个 mask 位。
!
aeDeleteFileEvent 的对称逻辑:
fe->mask &= ~mask,然后调用 aeApiDelEvent()(epoll_ctl EPOLL_CTL_DEL)。若 mask 清零则还要更新 maxfd(需要从 maxfd 向前扫描找新的最大值,是 ae 里少见的 O(N) 操作)。⏰ ae.c — aeCreateTimeEvent()
ae.c — aeCreateTimeEvent()
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { /* 分配一个新的时间事件节点 */ long long id = eventLoop->timeEventNextId++; aeTimeEvent *te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; te->id = id; te->timeProc = proc; te->finalizerProc = finalizerProc; te->clientData = clientData; te->refcount = 0; /* when = now + milliseconds(使用单调时钟,不受系统时间调整影响)*/ aeAddMillisecondsToNow(milliseconds, &te->when); /* 头插法:新事件插入链表头部 */ te->next = eventLoop->timeEventHead; te->prev = NULL; if (eventLoop->timeEventHead) eventLoop->timeEventHead->prev = te; eventLoop->timeEventHead = te; return id; /* 返回 ID,调用者可用于后续 aeDeleteTimeEvent */ }
✓
单调时钟:ae 用
getMonotonicUs() 而非 gettimeofday(),这样 NTP 时钟调整、夏令时切换都不会导致定时任务提前或延迟触发。这是 Redis 4.0 之后的改进。?
头插法的意义:最新创建的事件在链表头部,遍历时先检查新事件。但注意链表不按时间排序,每次处理都要完整遍历整条链表找到 when ≤ now 的事件。Redis 时间事件极少所以这没问题。
ae.c — aeProcessTimeEvents()(内部函数)
static int aeProcessTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te, *prev; long long maxId; /* maxId 防止在本轮处理中新创建的事件被立即执行 */ maxId = eventLoop->timeEventNextId - 1; monotime now = getMonotonicUs(); te = eventLoop->timeEventHead; while (te) { long long id; /* 跳过标记删除的节点(id == AE_DELETED_EVENT_ID)*/ if (te->id == AE_DELETED_EVENT_ID) { aeTimeEvent *next = te->next; if (te->refcount) { te = next; continue; } /* 正在回调中,不能释放 */ if (te->prev) te->prev->next = te->next; else eventLoop->timeEventHead = te->next; if (te->next) te->next->prev = te->prev; if (te->finalizerProc) te->finalizerProc(eventLoop, te->clientData); zfree(te); te = next; continue; } /* 跳过本轮新创建的事件 */ if (te->id > maxId) { te = te->next; continue; } /* 检查是否到期:when <= now */ if (te->when <= now) { int retval; id = te->id; te->refcount++; /* 防止回调中被删除 */ retval = te->timeProc(eventLoop, id, te->clientData); te->refcount--; processed++; /* 根据返回值决定重复还是删除 */ if (retval != AE_NOMORE) { /* 重复执行:when += retval(ms) */ aeAddMillisecondsToNow(retval, &te->when); } else { /* 一次性事件:标记删除(不立即 free,下次遍历时清理)*/ te->id = AE_DELETED_EVENT_ID; } } te = te->next; } return processed; }
!
refcount 的作用:如果在
timeProc 回调内部调用了 aeDeleteTimeEvent,节点不会立即被 free(因为 refcount > 0),而是把 id 设为 AE_DELETED_EVENT_ID,等下次遍历时再安全释放,避免 use-after-free。⚙️ ae.c — aeProcessEvents()【核心函数】
这是整个 ae 最重要的函数,一次 Tick 的全部逻辑都在这里:计算 timeout → epoll_wait → 处理文件事件 → 处理时间事件。
ae.c — aeProcessEvents()
int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* flags 里没有文件事件和时间事件标志,直接返回 */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* ① 计算 epoll_wait 的 timeout 原则:既不想无限阻塞(会错过时间事件), 又不想 spin(浪费 CPU)*/ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) shortest = aeSearchNearestTimer(eventLoop); /* 遍历链表找最近事件 */ if (shortest) { /* timeout = 最近时间事件触发时间 - 当前时间 */ long nowMs = (long)(getMonotonicUs() / 1000); long delta = (long)(shortest->when / 1000) - nowMs; if (delta > 0) { tv.tv_sec = delta / 1000; tv.tv_usec = (delta % 1000) * 1000; } else { tv.tv_sec = tv.tv_usec = 0; /* 时间事件已过期,不阻塞 */ } tvp = &tv; } else { /* 没有时间事件:根据 AE_DONT_WAIT 决定是否永久阻塞 */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { tvp = NULL; /* epoll_wait 永久阻塞(直到有 fd 就绪)*/ } } } /* ② 调用 beforesleep 钩子(刷缓冲区等)*/ if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) eventLoop->beforesleep(eventLoop); /* ③ 调用底层 IO 轮询(epoll_wait),填充 fired[] 数组 numevents = 本轮就绪事件数量 */ numevents = aeApiPoll(eventLoop, tvp); /* ④ epoll_wait 返回后,立即调用 aftersleep 钩子 */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); /* ⑤ 遍历 fired[] 处理就绪的文件事件 */ for (int j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int fired = 0; /* AE_BARRIER:先写后读(逆序处理)*/ int invert = fe->mask & AE_BARRIER; /* 正常情况:先读后写 */ if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop, fd, fe->clientData, mask); fired++; fe = &eventLoop->events[fd]; /* 回调可能修改了 fe,重新获取 */ } if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop, fd, fe->clientData, mask); fired++; } } /* AE_BARRIER 时:写完再读 */ if (invert) { fe = &eventLoop->events[fd]; if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) { fe->rfileProc(eventLoop, fd, fe->clientData, mask); fired++; } } processed++; } /* ⑥ 处理时间事件 */ if (flags & AE_TIME_EVENTS) processed += aeProcessTimeEvents(eventLoop); return processed; }
!
AE_BARRIER 的读写顺序翻转:
invert 为真时把写操作提前到读之前。这保证在 AOF 每次写模式下,先把响应发给客户端(写),再进行 fsync(读事件里的逻辑),否则 fsync 耗时会让客户端等待。✓
fe = &eventLoop->events[fd] 在回调后重新获取是关键:因为 rfileProc 回调内部可能调用了 aeDeleteFileEvent,改变了 fe 的 mask 或回调指针,不重新获取会读到脏数据。🔁 ae.c — aeMain()
ae.c — aeMain()
void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { aeProcessEvents(eventLoop, AE_ALL_EVENTS /* 处理文件事件 + 时间事件 */ | AE_CALL_BEFORE_SLEEP /* 调用 beforesleep 钩子 */ | AE_CALL_AFTER_SLEEP); /* 调用 aftersleep 钩子 */ } }
就这 5 行。这就是 Redis "单线程"的全部秘密:一个死循环,不停地调用 aeProcessEvents。简单到令人震撼。
server.c — Redis 启动流程(节选)
/* server.c initServer() */ server.el = aeCreateEventLoop(server.maxclients + CONFIG_FDSET_INCR); /* 注册 TCP 监听 fd 的接受连接事件 */ for (int i = 0; i < server.ipfd.count; i++) aeCreateFileEvent(server.el, server.ipfd.fd[i], AE_READABLE, acceptTcpHandler, NULL); /* 注册 serverCron 定时任务(1000/hz ms 后首次触发)*/ aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL); /* 注册 beforesleep / aftersleep 钩子 */ aeSetBeforeSleepProc(server.el, beforeSleep); aeSetAfterSleepProc(server.el, afterSleep); /* 启动!进入死循环,永不返回(除非收到 SHUTDOWN 信号)*/ aeMain(server.el); aeDeleteEventLoop(server.el); /* 正常关闭时清理资源 */
🐧 ae_epoll.c — aeApiCreate()
ae_epoll.c 是 ae 在 Linux 下的底层实现。它向上提供统一接口(aeApiCreate / aeApiAddEvent / aeApiPoll),向下直接调用 epoll 系统调用。
ae_epoll.c — 私有数据结构 + aeApiCreate()
/* epoll 的私有数据,存储在 eventLoop->apidata 中 */ typedef struct aeApiState { int epfd; /* epoll 实例的文件描述符 */ struct epoll_event *events; /* epoll_wait 的输出缓冲区 */ } aeApiState; static int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = zmalloc(sizeof(aeApiState)); if (!state) return -1; /* 分配 epoll_event 数组,大小 = setsize(最多返回这么多就绪事件)*/ state->events = zmalloc(sizeof(struct epoll_event) * eventLoop->setsize); if (!state->events) { zfree(state); return -1; } /* epoll_create(1024):参数 size 在 Linux 2.6.8+ 已被忽略,传任意正数即可 */ state->epfd = epoll_create(1024); if (state->epfd == -1) { zfree(state->events); zfree(state); return -1; } /* 设置 close-on-exec,fork 后子进程不会继承 epfd */ anetCloexec(state->epfd); /* 保存到 eventLoop->apidata,后续 aeApiAddEvent/aeApiPoll 通过它访问 */ eventLoop->apidata = state; return 0; }
➕ ae_epoll.c — aeApiAddEvent() / aeApiDelEvent()
ae_epoll.c — aeApiAddEvent()
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; /* 判断是 ADD 还是 MOD:若 fd 已有注册事件则用 EPOLL_CTL_MOD */ int op = (eventLoop->events[fd].mask == AE_NONE) ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; /* 将 ae 的 mask 转换为 epoll 的 events 标志 */ ee.events = 0; mask |= eventLoop->events[fd].mask; /* 合并已有 mask */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; /* 调用系统调用,将 fd 注册到内核 epoll 红黑树 */ if (epoll_ctl(state->epfd, op, fd, &ee) == -1) return -1; return 0; } static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; int mask = eventLoop->events[fd].mask & ~delmask; /* 删除指定位 */ ee.events = 0; if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; /* mask 清零说明彻底删除;否则 MOD 更新为剩余 mask */ if (mask != AE_NONE) epoll_ctl(state->epfd, EPOLL_CTL_MOD, fd, &ee); else epoll_ctl(state->epfd, EPOLL_CTL_DEL, fd, &ee); }
✓
ADD vs MOD 的判断:epoll_ctl 中对同一个 fd 不能重复 EPOLL_CTL_ADD,必须用 EPOLL_CTL_MOD 修改。ae 通过检查
events[fd].mask == AE_NONE 来区分首次注册还是追加 mask,非常精妙。🔍 ae_epoll.c — aeApiPoll()【最关键的系统调用入口】
ae_epoll.c — aeApiPoll()
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; /* 核心系统调用:主线程在此阻塞等待就绪事件 tvp == NULL → 永久阻塞 tvp->tv_msec → 最多等待 N 毫秒(最近时间事件触发时间) tvp == {0,0} → 非阻塞(AE_DONT_WAIT)*/ retval = epoll_wait(state->epfd, state->events, /* 输出:就绪 epoll_event 数组 */ eventLoop->setsize, /* 最多返回 setsize 个事件 */ tvp ? (int)(tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { numevents = retval; /* 将 epoll 的就绪事件转换为 ae 的 fired[] 格式 */ for (int j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events + j; /* EPOLLIN → AE_READABLE */ if (e->events & EPOLLIN) mask |= AE_READABLE; /* EPOLLOUT → AE_WRITABLE */ if (e->events & EPOLLOUT) mask |= AE_WRITABLE; /* EPOLLERR / EPOLLHUP:fd 错误,两种事件都通知上层处理 */ if (e->events & EPOLLERR) mask |= AE_READABLE | AE_WRITABLE; if (e->events & EPOLLHUP) mask |= AE_READABLE | AE_WRITABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } else if (retval == -1 && errno != EINTR) { panic("aeApiPoll: epoll_wait, %s", strerror(errno)); } return numevents; }
!
EPOLLERR / EPOLLHUP 映射到 AE_READABLE | AE_WRITABLE:让上层的 rfileProc 和 wfileProc 都有机会处理错误(通常会调用
read() 或 write() 拿到 errno,然后关闭连接)。✓
EINTR 不 panic:信号中断 epoll_wait 返回 -1 且 errno=EINTR 是正常的(如收到 SIGTERM),ae 直接忽略,下次循环继续 epoll_wait。
🗺️ 完整调用链 — 从启动到处理一条命令
完整调用链(伪代码)
/* === 启动阶段 === */ server.c: main() └─ initServer() ├─ ae.c: aeCreateEventLoop(maxclients+128) │ └─ ae_epoll.c: aeApiCreate() → epoll_create(1024) ├─ ae.c: aeCreateFileEvent(listenfd, AE_READABLE, acceptTcpHandler) │ └─ ae_epoll.c: aeApiAddEvent() → epoll_ctl(EPOLL_CTL_ADD) └─ ae.c: aeCreateTimeEvent(1ms, serverCron) └─ ae.c: aeMain(server.el) ← 死循环,永不返回 /* === 每个 Tick === */ ae.c: aeMain() └─ ae.c: aeProcessEvents(ALL_EVENTS | BEFORE_SLEEP | AFTER_SLEEP) │ ├─ [1] aeSearchNearestTimer() → 计算 epoll_wait timeout ├─ [2] beforesleep() → 刷输出缓冲区、AOF 等 ├─ [3] ae_epoll.c: aeApiPoll() → epoll_wait(epfd, events, N, timeout) │ 内核填充就绪 fd 列表 → ae 写入 fired[] ├─ [4] aftersleep() → 通知 IO 线程读数据(6.0+) │ ├─ [5] for j = 0..numevents: │ fd = fired[j].fd │ mask = fired[j].mask │ fe = events[fd] ← O(1) 直接索引 │ ┌─ AE_READABLE → fe->rfileProc() │ │ ├─ acceptTcpHandler() (listenfd 新连接) │ │ └─ readQueryFromClient() (客户端发来数据) │ │ └─ processCommand() → SET/GET/... │ │ └─ addReply() → 写输出缓冲区 │ │ └─ aeCreateFileEvent(fd, AE_WRITABLE, sendReplyToClient) │ └─ AE_WRITABLE → fe->wfileProc() │ └─ sendReplyToClient() → write(fd, buf) │ └─ aeDeleteFileEvent(fd, AE_WRITABLE) │ └─ [6] aeProcessTimeEvents() └─ serverCron() → 过期清理 / 持久化 / 复制心跳...
看懂这张调用链,你就彻底理解了 Redis 的 IO 模型。所有的"单线程"、"高性能"、"事件驱动"都浓缩在这里:
一个 while 循环 → 一次 epoll_wait → 遍历就绪 fd → 调用 handler → 处理定时任务 → 再循环。
一个 while 循环 → 一次 epoll_wait → 遍历就绪 fd → 调用 handler → 处理定时任务 → 再循环。