Redis ae 源代码精讲

逐函数拆解 ae.h / ae.c / ae_epoll.c,每一行都看懂

📌 ae.h — 宏定义与常量
ae.h 是整个事件库的"合同",定义了所有对外暴露的类型、宏和函数声明。读懂它就读懂了 ae 的全部能力边界。
ae.h
/* 文件事件类型掩码 */
#define AE_NONE        0      /* 未注册任何事件 */
#define AE_READABLE    1      /* fd 可读(有数据到来 / 新连接)*/
#define AE_WRITABLE    2      /* fd 可写(输出缓冲区有空间)*/
#define AE_BARRIER     4      /* 强制先写后读(AOF fsync 时用)*/

/* 事件类型标志 */
#define AE_FILE_EVENTS  (1<<0)  /* 处理文件事件 */
#define AE_TIME_EVENTS  (1<<1)  /* 处理时间事件 */
#define AE_ALL_EVENTS   (AE_FILE_EVENTS|AE_TIME_EVENTS)
#define AE_DONT_WAIT    (1<<2)  /* 不阻塞,立即返回 */
#define AE_CALL_BEFORE_SLEEP (1<<3)
#define AE_CALL_AFTER_SLEEP  (1<<4)

/* 时间事件返回值:返回 AE_NOMORE 表示一次性事件,执行后删除 */
#define AE_NOMORE      -1

/* 已被标记删除的时间事件 ID */
#define AE_DELETED_EVENT_ID -1
!
AE_BARRIER(值为 4)是一个特殊标志位。正常情况 ae 先处理读再处理写,设了 AE_BARRIER 后强制先写后读。Redis 在 AOF 每次写命令后需要先把响应发给客户端再 fsync,就靠这个标志控制顺序。
AE_DONT_WAIT:把 epoll_wait 的 timeout 设为 0,让 ae 非阻塞轮询一次就立即返回。Redis 在某些内部调用(如 Cluster 处理)会用到这个模式。
🗂️ ae.h — 数据结构
ae.h — aeFileEvent
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd,
                        void *clientData, int mask);

typedef struct aeFileEvent {
    int mask;               /* AE_READABLE | AE_WRITABLE | AE_BARRIER */
    aeFileProc *rfileProc; /* 可读时的回调函数指针 */
    aeFileProc *wfileProc; /* 可写时的回调函数指针 */
    void *clientData;       /* 回调时透传的用户数据(通常是 client* 指针)*/
} aeFileEvent;
1
aeFileEvent 以 fd 为下标存在数组里eventLoop->events[fd] 就是这个 fd 的文件事件。注册时直接写数组,O(1) 存取,无需哈希表。
2
rfileProc 在 Redis 中通常是 readQueryFromClientwfileProc 通常是 sendReplyToClient。函数签名固定为 (eventLoop, fd, clientData, mask)
ae.h — aeFiredEvent
typedef struct aeFiredEvent {
    int fd;    /* 就绪的文件描述符编号 */
    int mask;  /* 就绪类型:AE_READABLE / AE_WRITABLE */
} aeFiredEvent;
这是临时数组 eventLoop->fired[]。每次 aeApiPoll()(即 epoll_wait)返回后,底层实现把就绪的 fd 和 mask 填入这里。aeProcessEvents 再遍历它,调用对应的 handler。下一轮直接覆盖,不需要清空。
ae.h — aeTimeEvent
typedef int aeTimeProc(struct aeEventLoop *eventLoop,
                       long long id, void *clientData);

typedef struct aeTimeEvent {
    long long id;                     /* 唯一 ID,单调递增 */
    monotime  when;                   /* 触发时间点(绝对时间戳,ms)*/
    aeTimeProc         *timeProc;    /* 触发回调,返回值决定是否重复 */
    aeEventFinalizerProc *finalizerProc; /* 删除时的清理函数(可为 NULL)*/
    void  *clientData;
    struct aeTimeEvent *prev;         /* 双向链表:前驱 */
    struct aeTimeEvent *next;         /* 双向链表:后继 */
    int refcount;                     /* 防止在回调执行中被意外释放 */
} aeTimeEvent;
!
timeProc 返回值语义:返回 -1(AE_NOMORE)= 一次性事件,执行后自动删除;返回 N >= 0 = N 毫秒后再次触发(serverCron 返回 1000/server.hz,即约 100ms)。
?
为什么用双向链表而不是堆/优先队列?因为 Redis 的时间事件极少(通常只有 serverCron 一个),链表足够。若事件数量多,最小堆会更高效。
ae.h — aeEventLoop(核心结构体)
typedef struct aeEventLoop {
    int  maxfd;           /* 当前已注册的最大 fd 编号 */
    int  setsize;         /* 能监听的 fd 总上限(= maxclients + 128)*/
    long long timeEventNextId; /* 下一个时间事件的 ID */
    aeFileEvent  *events;  /* 注册的文件事件数组,下标 = fd */
    aeFiredEvent *fired;   /* epoll_wait 返回后的就绪事件数组 */
    aeTimeEvent  *timeEventHead; /* 时间事件双向链表头节点 */
    int  stop;             /* 置 1 时 aeMain 退出 */
    void *apidata;         /* 底层 IO 实现的私有数据(如 epoll fd)*/
    aeBeforeSleepProc *beforesleep; /* epoll_wait 前的钩子 */
    aeBeforeSleepProc *aftersleep;  /* epoll_wait 后的钩子 */
    int  flags;            /* AE_DONT_WAIT 等标志 */
} aeEventLoop;
内存布局关键点:events[fd]fired[i] 在初始化时一次性分配足够大的数组(大小 = setsize),之后不会 realloc。所以 Redis 如果 maxclients 设得很大,启动时就会占用相应内存。
📋 ae.h — 对外 API 一览
ae.h — 函数声明
/* 生命周期 */
aeEventLoop *aeCreateEventLoop(int setsize);
void          aeDeleteEventLoop(aeEventLoop *eventLoop);
void          aeStop(aeEventLoop *eventLoop);  /* 设 stop=1 */

/* 文件事件 */
int  aeCreateFileEvent(aeEventLoop *el, int fd, int mask,
                        aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *el, int fd, int mask);
int  aeGetFileEvents(aeEventLoop *el, int fd);

/* 时间事件 */
long long aeCreateTimeEvent(aeEventLoop *el, long long milliseconds,
                              aeTimeProc *proc, void *clientData,
                              aeEventFinalizerProc *finalizerProc);
int aeDeleteTimeEvent(aeEventLoop *el, long long id);

/* 事件处理 */
int  aeProcessEvents(aeEventLoop *el, int flags); /* 处理一轮事件 */
void aeMain(aeEventLoop *el);                    /* 启动主循环 */

/* 工具 */
int         aeWait(int fd, int mask, long long milliseconds);
const char *aeGetApiName(void);  /* 返回 "epoll"/"kqueue"/"select" */
void        aeSetBeforeSleepProc(aeEventLoop *el, aeBeforeSleepProc *beforesleep);
void        aeSetAfterSleepProc(aeEventLoop *el, aeBeforeSleepProc *aftersleep);
🔨 ae.c — aeCreateEventLoop()
这是整个事件循环的构造函数。Redis 启动时(server.c 的 initServer)调用它,创建唯一的全局 server.el
ae.c — aeCreateEventLoop()
aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    /* 分配 aeEventLoop 结构体本身 */
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;

    /* 分配文件事件数组:events[fd] 直接按 fd 下标访问,O(1) */
    eventLoop->events = zmalloc(sizeof(aeFileEvent) * setsize);
    /* 分配就绪事件数组:epoll_wait 返回后填充 */
    eventLoop->fired  = zmalloc(sizeof(aeFiredEvent) * setsize);
    if (!eventLoop->events || !eventLoop->fired) goto err;

    eventLoop->setsize       = setsize;
    eventLoop->timeEventHead = NULL;        /* 时间事件链表为空 */
    eventLoop->timeEventNextId = 0;
    eventLoop->stop          = 0;          /* 不停止 */
    eventLoop->maxfd         = -1;         /* 尚未注册任何 fd */
    eventLoop->beforesleep   = NULL;
    eventLoop->aftersleep    = NULL;
    eventLoop->flags         = 0;

    /* 调用底层 IO 初始化(Linux 下执行 epoll_create)*/
    if (aeApiCreate(eventLoop) == -1) goto err;

    /* 初始化所有 fd 的掩码为 AE_NONE(未注册任何事件)*/
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;

    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}
1
zmalloc(aeEventLoop)

分配控制结构体,Redis 使用自己的内存分配器 zmalloc(在 jemalloc 之上封装)

2
zmalloc(events[setsize]) + zmalloc(fired[setsize])

一次性分配足够大的数组。setsize = maxclients + 128(预留内部 fd)。这两个数组贯穿整个生命周期不会 realloc

3
aeApiCreate(eventLoop)

调用底层实现(Linux = ae_epoll.c),执行 epoll_create(1) 获得 epfd,保存在 apidata 里

4
初始化 events[i].mask = AE_NONE

把所有 fd 槽位标记为"未使用",这样 aeProcessEvents 可以跳过未注册的 fd

📁 ae.c — aeCreateFileEvent()
Redis 接受新连接、注册读写事件时调用这个函数。比如 acceptHandler 拿到新 fd 后会立刻调用 aeCreateFileEvent(el, fd, AE_READABLE, readQueryFromClient, c)
ae.c — aeCreateFileEvent()
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd,
                       int mask, aeFileProc *proc, void *clientData)
{
    /* 边界检查:fd 不能超过 setsize */
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }

    aeFileEvent *fe = &eventLoop->events[fd]; /* O(1) 直接索引 */

    /* 调用底层 API 在 epoll/kqueue 中注册这个 fd 的 mask */
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;

    /* 用 OR 合并 mask:同一个 fd 可以同时监听读和写 */
    fe->mask |= mask;

    /* 根据 mask 设置对应的回调函数 */
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;

    fe->clientData = clientData;

    /* 更新 maxfd,用于 aeApiPoll 的遍历上界 */
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;

    return AE_OK;
}
mask 用 OR 合并,这意味着你可以先注册 AE_READABLE,之后再追加 AE_WRITABLE,两次 aeCreateFileEvent 调用不会互相覆盖。aeDeleteFileEvent 同理用 AND NOT 来移除单个 mask 位。
!
aeDeleteFileEvent 的对称逻辑fe->mask &= ~mask,然后调用 aeApiDelEvent()(epoll_ctl EPOLL_CTL_DEL)。若 mask 清零则还要更新 maxfd(需要从 maxfd 向前扫描找新的最大值,是 ae 里少见的 O(N) 操作)。
⏰ ae.c — aeCreateTimeEvent()
ae.c — aeCreateTimeEvent()
long long aeCreateTimeEvent(aeEventLoop *eventLoop,
                              long long milliseconds,
                              aeTimeProc *proc,
                              void *clientData,
                              aeEventFinalizerProc *finalizerProc)
{
    /* 分配一个新的时间事件节点 */
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;

    te->id           = id;
    te->timeProc     = proc;
    te->finalizerProc = finalizerProc;
    te->clientData   = clientData;
    te->refcount     = 0;

    /* when = now + milliseconds(使用单调时钟,不受系统时间调整影响)*/
    aeAddMillisecondsToNow(milliseconds, &te->when);

    /* 头插法:新事件插入链表头部 */
    te->next = eventLoop->timeEventHead;
    te->prev = NULL;
    if (eventLoop->timeEventHead)
        eventLoop->timeEventHead->prev = te;
    eventLoop->timeEventHead = te;

    return id;  /* 返回 ID,调用者可用于后续 aeDeleteTimeEvent */
}
单调时钟:ae 用 getMonotonicUs() 而非 gettimeofday(),这样 NTP 时钟调整、夏令时切换都不会导致定时任务提前或延迟触发。这是 Redis 4.0 之后的改进。
?
头插法的意义:最新创建的事件在链表头部,遍历时先检查新事件。但注意链表不按时间排序,每次处理都要完整遍历整条链表找到 when ≤ now 的事件。Redis 时间事件极少所以这没问题。
ae.c — aeProcessTimeEvents()(内部函数)
static int aeProcessTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te, *prev;
    long long maxId;

    /* maxId 防止在本轮处理中新创建的事件被立即执行 */
    maxId = eventLoop->timeEventNextId - 1;
    monotime now = getMonotonicUs();

    te = eventLoop->timeEventHead;
    while (te) {
        long long id;

        /* 跳过标记删除的节点(id == AE_DELETED_EVENT_ID)*/
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            if (te->refcount) { te = next; continue; } /* 正在回调中,不能释放 */
            if (te->prev) te->prev->next = te->next;
            else eventLoop->timeEventHead = te->next;
            if (te->next) te->next->prev = te->prev;
            if (te->finalizerProc) te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            te = next;
            continue;
        }

        /* 跳过本轮新创建的事件 */
        if (te->id > maxId) { te = te->next; continue; }

        /* 检查是否到期:when <= now */
        if (te->when <= now) {
            int retval;
            id = te->id;
            te->refcount++;   /* 防止回调中被删除 */
            retval = te->timeProc(eventLoop, id, te->clientData);
            te->refcount--;
            processed++;

            /* 根据返回值决定重复还是删除 */
            if (retval != AE_NOMORE) {
                /* 重复执行:when += retval(ms) */
                aeAddMillisecondsToNow(retval, &te->when);
            } else {
                /* 一次性事件:标记删除(不立即 free,下次遍历时清理)*/
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        te = te->next;
    }
    return processed;
}
!
refcount 的作用:如果在 timeProc 回调内部调用了 aeDeleteTimeEvent,节点不会立即被 free(因为 refcount > 0),而是把 id 设为 AE_DELETED_EVENT_ID,等下次遍历时再安全释放,避免 use-after-free。
⚙️ ae.c — aeProcessEvents()【核心函数】
这是整个 ae 最重要的函数,一次 Tick 的全部逻辑都在这里:计算 timeout → epoll_wait → 处理文件事件 → 处理时间事件。
ae.c — aeProcessEvents()
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
    int processed = 0, numevents;

    /* flags 里没有文件事件和时间事件标志,直接返回 */
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* ① 计算 epoll_wait 的 timeout
          原则:既不想无限阻塞(会错过时间事件),
                又不想 spin(浪费 CPU)*/
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT)))
    {
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop); /* 遍历链表找最近事件 */

        if (shortest) {
            /* timeout = 最近时间事件触发时间 - 当前时间 */
            long nowMs = (long)(getMonotonicUs() / 1000);
            long delta = (long)(shortest->when / 1000) - nowMs;
            if (delta > 0) {
                tv.tv_sec  = delta / 1000;
                tv.tv_usec = (delta % 1000) * 1000;
            } else {
                tv.tv_sec = tv.tv_usec = 0; /* 时间事件已过期,不阻塞 */
            }
            tvp = &tv;
        } else {
            /* 没有时间事件:根据 AE_DONT_WAIT 决定是否永久阻塞 */
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                tvp = NULL;  /* epoll_wait 永久阻塞(直到有 fd 就绪)*/
            }
        }
    }

    /* ② 调用 beforesleep 钩子(刷缓冲区等)*/
    if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
        eventLoop->beforesleep(eventLoop);

    /* ③ 调用底层 IO 轮询(epoll_wait),填充 fired[] 数组
          numevents = 本轮就绪事件数量 */
    numevents = aeApiPoll(eventLoop, tvp);

    /* ④ epoll_wait 返回后,立即调用 aftersleep 钩子 */
    if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
        eventLoop->aftersleep(eventLoop);

    /* ⑤ 遍历 fired[] 处理就绪的文件事件 */
    for (int j = 0; j < numevents; j++) {
        aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
        int mask       = eventLoop->fired[j].mask;
        int fd         = eventLoop->fired[j].fd;
        int fired      = 0;

        /* AE_BARRIER:先写后读(逆序处理)*/
        int invert = fe->mask & AE_BARRIER;

        /* 正常情况:先读后写 */
        if (!invert && fe->mask & mask & AE_READABLE) {
            fe->rfileProc(eventLoop, fd, fe->clientData, mask);
            fired++;
            fe = &eventLoop->events[fd]; /* 回调可能修改了 fe,重新获取 */
        }

        if (fe->mask & mask & AE_WRITABLE) {
            if (!fired || fe->wfileProc != fe->rfileProc) {
                fe->wfileProc(eventLoop, fd, fe->clientData, mask);
                fired++;
            }
        }

        /* AE_BARRIER 时:写完再读 */
        if (invert) {
            fe = &eventLoop->events[fd];
            if ((fe->mask & mask & AE_READABLE) &&
                (!fired || fe->wfileProc != fe->rfileProc))
            {
                fe->rfileProc(eventLoop, fd, fe->clientData, mask);
                fired++;
            }
        }
        processed++;
    }

    /* ⑥ 处理时间事件 */
    if (flags & AE_TIME_EVENTS)
        processed += aeProcessTimeEvents(eventLoop);

    return processed;
}
!
AE_BARRIER 的读写顺序翻转invert 为真时把写操作提前到读之前。这保证在 AOF 每次写模式下,先把响应发给客户端(写),再进行 fsync(读事件里的逻辑),否则 fsync 耗时会让客户端等待。
fe = &eventLoop->events[fd] 在回调后重新获取是关键:因为 rfileProc 回调内部可能调用了 aeDeleteFileEvent,改变了 fe 的 mask 或回调指针,不重新获取会读到脏数据。
🔁 ae.c — aeMain()
ae.c — aeMain()
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop,
            AE_ALL_EVENTS        /* 处理文件事件 + 时间事件 */
            | AE_CALL_BEFORE_SLEEP  /* 调用 beforesleep 钩子 */
            | AE_CALL_AFTER_SLEEP); /* 调用 aftersleep 钩子 */
    }
}
就这 5 行。这就是 Redis "单线程"的全部秘密:一个死循环,不停地调用 aeProcessEvents。简单到令人震撼。
server.c — Redis 启动流程(节选)
/* server.c initServer() */
server.el = aeCreateEventLoop(server.maxclients + CONFIG_FDSET_INCR);

/* 注册 TCP 监听 fd 的接受连接事件 */
for (int i = 0; i < server.ipfd.count; i++)
    aeCreateFileEvent(server.el, server.ipfd.fd[i],
                       AE_READABLE, acceptTcpHandler, NULL);

/* 注册 serverCron 定时任务(1000/hz ms 后首次触发)*/
aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);

/* 注册 beforesleep / aftersleep 钩子 */
aeSetBeforeSleepProc(server.el, beforeSleep);
aeSetAfterSleepProc(server.el, afterSleep);

/* 启动!进入死循环,永不返回(除非收到 SHUTDOWN 信号)*/
aeMain(server.el);

aeDeleteEventLoop(server.el); /* 正常关闭时清理资源 */
🐧 ae_epoll.c — aeApiCreate()
ae_epoll.c 是 ae 在 Linux 下的底层实现。它向上提供统一接口(aeApiCreate / aeApiAddEvent / aeApiPoll),向下直接调用 epoll 系统调用。
ae_epoll.c — 私有数据结构 + aeApiCreate()
/* epoll 的私有数据,存储在 eventLoop->apidata 中 */
typedef struct aeApiState {
    int epfd;                      /* epoll 实例的文件描述符 */
    struct epoll_event *events;   /* epoll_wait 的输出缓冲区 */
} aeApiState;

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));
    if (!state) return -1;

    /* 分配 epoll_event 数组,大小 = setsize(最多返回这么多就绪事件)*/
    state->events = zmalloc(sizeof(struct epoll_event) * eventLoop->setsize);
    if (!state->events) { zfree(state); return -1; }

    /* epoll_create(1024):参数 size 在 Linux 2.6.8+ 已被忽略,传任意正数即可 */
    state->epfd = epoll_create(1024);
    if (state->epfd == -1) { zfree(state->events); zfree(state); return -1; }

    /* 设置 close-on-exec,fork 后子进程不会继承 epfd */
    anetCloexec(state->epfd);

    /* 保存到 eventLoop->apidata,后续 aeApiAddEvent/aeApiPoll 通过它访问 */
    eventLoop->apidata = state;
    return 0;
}
➕ ae_epoll.c — aeApiAddEvent() / aeApiDelEvent()
ae_epoll.c — aeApiAddEvent()
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState  *state = eventLoop->apidata;
    struct epoll_event ee = {0};

    /* 判断是 ADD 还是 MOD:若 fd 已有注册事件则用 EPOLL_CTL_MOD */
    int op = (eventLoop->events[fd].mask == AE_NONE)
             ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    /* 将 ae 的 mask 转换为 epoll 的 events 标志 */
    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* 合并已有 mask */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;

    /* 调用系统调用,将 fd 注册到内核 epoll 红黑树 */
    if (epoll_ctl(state->epfd, op, fd, &ee) == -1) return -1;
    return 0;
}

static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
    aeApiState  *state = eventLoop->apidata;
    struct epoll_event ee = {0};
    int mask = eventLoop->events[fd].mask & ~delmask; /* 删除指定位 */

    ee.events = 0;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;

    /* mask 清零说明彻底删除;否则 MOD 更新为剩余 mask */
    if (mask != AE_NONE)
        epoll_ctl(state->epfd, EPOLL_CTL_MOD, fd, &ee);
    else
        epoll_ctl(state->epfd, EPOLL_CTL_DEL, fd, &ee);
}
ADD vs MOD 的判断:epoll_ctl 中对同一个 fd 不能重复 EPOLL_CTL_ADD,必须用 EPOLL_CTL_MOD 修改。ae 通过检查 events[fd].mask == AE_NONE 来区分首次注册还是追加 mask,非常精妙。
🔍 ae_epoll.c — aeApiPoll()【最关键的系统调用入口】
ae_epoll.c — aeApiPoll()
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    /* 核心系统调用:主线程在此阻塞等待就绪事件
       tvp == NULL  → 永久阻塞
       tvp->tv_msec → 最多等待 N 毫秒(最近时间事件触发时间)
       tvp == {0,0} → 非阻塞(AE_DONT_WAIT)*/
    retval = epoll_wait(state->epfd,
                        state->events,          /* 输出:就绪 epoll_event 数组 */
                        eventLoop->setsize,     /* 最多返回 setsize 个事件 */
                        tvp ? (int)(tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);

    if (retval > 0) {
        numevents = retval;
        /* 将 epoll 的就绪事件转换为 ae 的 fired[] 格式 */
        for (int j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events + j;

            /* EPOLLIN  → AE_READABLE */
            if (e->events & EPOLLIN)  mask |= AE_READABLE;
            /* EPOLLOUT → AE_WRITABLE */
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            /* EPOLLERR / EPOLLHUP:fd 错误,两种事件都通知上层处理 */
            if (e->events & EPOLLERR) mask |= AE_READABLE | AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_READABLE | AE_WRITABLE;

            eventLoop->fired[j].fd   = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    } else if (retval == -1 && errno != EINTR) {
        panic("aeApiPoll: epoll_wait, %s", strerror(errno));
    }
    return numevents;
}
!
EPOLLERR / EPOLLHUP 映射到 AE_READABLE | AE_WRITABLE:让上层的 rfileProc 和 wfileProc 都有机会处理错误(通常会调用 read()write() 拿到 errno,然后关闭连接)。
EINTR 不 panic:信号中断 epoll_wait 返回 -1 且 errno=EINTR 是正常的(如收到 SIGTERM),ae 直接忽略,下次循环继续 epoll_wait。
🗺️ 完整调用链 — 从启动到处理一条命令
完整调用链(伪代码)
/* === 启动阶段 === */
server.c: main()
  └─ initServer()
       ├─ ae.c:     aeCreateEventLoop(maxclients+128)
       │              └─ ae_epoll.c: aeApiCreate()  → epoll_create(1024)
       ├─ ae.c:     aeCreateFileEvent(listenfd, AE_READABLE, acceptTcpHandler)
       │              └─ ae_epoll.c: aeApiAddEvent() → epoll_ctl(EPOLL_CTL_ADD)
       └─ ae.c:     aeCreateTimeEvent(1ms, serverCron)

  └─ ae.c: aeMain(server.el)         ← 死循环,永不返回

/* === 每个 Tick === */
ae.c: aeMain()
  └─ ae.c: aeProcessEvents(ALL_EVENTS | BEFORE_SLEEP | AFTER_SLEEP)
       │
       ├─ [1] aeSearchNearestTimer()    → 计算 epoll_wait timeout
       ├─ [2] beforesleep()              → 刷输出缓冲区、AOF 等
       ├─ [3] ae_epoll.c: aeApiPoll()  → epoll_wait(epfd, events, N, timeout)
       │         内核填充就绪 fd 列表 → ae 写入 fired[]
       ├─ [4] aftersleep()               → 通知 IO 线程读数据(6.0+)
       │
       ├─ [5] for j = 0..numevents:
       │         fd   = fired[j].fd
       │         mask = fired[j].mask
       │         fe   = events[fd]       ← O(1) 直接索引
       │         ┌─ AE_READABLE → fe->rfileProc()
       │         │   ├─ acceptTcpHandler()  (listenfd 新连接)
       │         │   └─ readQueryFromClient() (客户端发来数据)
       │         │         └─ processCommand() → SET/GET/...
       │         │               └─ addReply()  → 写输出缓冲区
       │         │                    └─ aeCreateFileEvent(fd, AE_WRITABLE, sendReplyToClient)
       │         └─ AE_WRITABLE → fe->wfileProc()
       │               └─ sendReplyToClient() → write(fd, buf)
       │                     └─ aeDeleteFileEvent(fd, AE_WRITABLE)
       │
       └─ [6] aeProcessTimeEvents()
               └─ serverCron()  → 过期清理 / 持久化 / 复制心跳...
看懂这张调用链,你就彻底理解了 Redis 的 IO 模型。所有的"单线程"、"高性能"、"事件驱动"都浓缩在这里:
一个 while 循环 → 一次 epoll_wait → 遍历就绪 fd → 调用 handler → 处理定时任务 → 再循环。