手写Redis事件循环器AE:原理与实现

Posted by WGrape.github.io on June 2, 2026

文章内容更新请以 WGrape GitHub博客 : 手写Redis事件循环器AE:原理与实现 为准

手写Redis事件循环器AE:原理与实现

本文结合 Redis 源码(ae.c)与手写实践,深入讲解 Redis 事件循环器 AE(Another Event Loop)的设计思路、核心数据结构、I/O 多路复用后端抽象,以及实现过程中遇到的典型问题与解决方案。


1. 为什么 Redis 是单线程却这么快?

很多人第一次听说 Redis 是单线程模型时都会有疑惑:单线程意味着同一时刻只有一个任务在执行,怎么可能支撑每秒数十万的 QPS?

答案在于 I/O 多路复用(I/O Multiplexing)事件驱动(Event-Driven) 的完美结合。

Redis 的性能瓶颈从来不在 CPU,而在网络 I/O。当 Redis 在等待一个客户端发来数据时,CPU 是空闲的。传统多线程方案会为每个连接分配一个线程去阻塞等待,线程上下文切换开销极大。而 Redis 的做法是:

用一个线程,通过 epoll/kqueue/select 同时监听成千上万个连接,哪个连接有数据来了就处理哪个,CPU 永远不闲着。

这套机制在 Redis 内部被封装成一个独立的事件循环库,叫做 AE


2. AE 是什么?

AE 全称 Another Event Loop,是 Redis 作者 Salvatore Sanfilippo(antirez)为 Redis 量身打造的轻量级事件驱动库。整个库只有不到 800 行 C 代码,却驱动着 Redis 所有的网络 I/O 和定时任务。

AE 的核心设计哲学:

  • 极简主义:没有 Libevent 那样复杂的抽象层,代码直接、清晰;
  • 可替换后端:通过编译时宏选择 epoll / kqueue / /dev/poll / select,对上层完全透明;
  • 两类事件:文件事件(网络 I/O)+ 时间事件(定时任务),覆盖 Redis 所有需求。

3. AE 实现

(1) 整体架构

┌─────────────────────────────────────────────────────────────┐
│                        aeMain()                             │
│                    (主循环 / Main Loop)                     │
└──────────────────────────┬──────────────────────────────────┘
                           │  每轮迭代 / each iteration
                           ▼
┌─────────────────────────────────────────────────────────────┐
│                   aeProcessEvents()                         │
│                  (核心调度 / Dispatcher)                    │
│                                                             │
│  ① 计算最近时间事件的到期时间 → 作为 poll 超时                   │
│  ② beforesleep() 钩子                                       │
│  ③ aeApiPoll()  ← 阻塞等待 I/O 就绪                          │
│  ④ aftersleep() 钩子                                        │
│  ⑤ 遍历 fired[],分发文件事件回调                              │
│  ⑥ processTimeEvents() 处理到期时间事件                       │
└─────────────────────────────────────────────────────────────┘
         │                              │
         ▼                              ▼
┌─────────────────┐           ┌──────────────────────┐
│  文件事件        │           │  时间事件              │
│  aeFileEvent[]  │           │  aeTimeEvent 链表     │
│  (按 fd 索引)    │           │  (无序双向链表)        │
└────────┬────────┘           └──────────────────────┘
         │
         ▼
┌─────────────────────────────────────────────────────────────┐
│               I/O 多路复用后端(可替换)                        │
│   Linux: ae_epoll.c    macOS/BSD: ae_kqueue.c               │
│   Solaris: ae_evport.c    通用回退: ae_select.c              │
└─────────────────────────────────────────────────────────────┘

(2) 核心流程

Redis 启动
    │
    ▼
aeCreateEventLoop(10240)
    │  分配 events[10240]、fired[10240]
    │  初始化 epoll/kqueue/select 后端
    │
    ▼
注册监听 socket(fd=5)
aeCreateFileEvent(el, 5, AE_READABLE, acceptHandler, NULL)
    │
    ▼
注册 serverCron
aeCreateTimeEvent(el, 1000/hz ms, serverCron, NULL, NULL)
    │
    ▼
aeMain(el)
    │
    └──► 循环开始 ◄────────────────────────────────────────┐
              │                                          │
              ▼                                          │
    aeSearchNearestTimer → 计算超时 tvp                   │
              │                                          │
              ▼                                          │
    beforesleep() → AOF flush、快速 I/O 等                │
              │                                          │
              ▼                                          │
    aeApiPoll(el, tvp)  ←── 阻塞,等待网络事件或超时         │
              │                                          │
              ▼                                          │
    aftersleep()                                         │
              │                                          │
              ▼                                          │
    遍历 fired[]                                          │
    ├─ fd=5 可读 → acceptHandler → 建立新连接              │
    ├─ fd=7 可读 → readQueryFromClient → 处理命令          │
    └─ fd=7 可写 → sendReplyToClient → 发送响应            │
              │                                          │
              ▼                                          │
    processTimeEvents()                                  │
    └─ serverCron 到期 → 执行定期清理、统计、持久化等         │
              │                                          │
              └──────────────────────────────────────────┘

(3) 源代码

/**
 * redis_ae.c
 *
 * A simple event-driven programming library.
 * Redis AE (Another Event Loop) 事件驱动库核心设计实现
 * Redis AE (Another Event Loop) - Core Event-Driven Library Design
 *
 * 本文件还原了 Redis ae 事件循环的核心架构,包含:
 * This file reproduces the core architecture of Redis ae event loop, including:
 *   1. 文件事件(File Events)—— I/O 多路复用
 *   2. 时间事件(Time Events)—— 定时器/周期任务
 *   3. 事件循环主体(aeMain)—— 驱动整个服务运行
 *
 * 参考来源 / Reference: https://github.com/redis/redis/blob/unstable/src/ae.c
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <sys/time.h>

/* =========================================================================
 * 常量定义 / Constants
 * ========================================================================= */

/* 每次 epoll/kqueue/select 最多处理的事件数
 * Max number of events processed per poll call */
#define AE_SETSIZE      (1024 * 10)

/* 文件事件类型掩码 / File event type masks */
#define AE_NONE         0       /* 未注册任何事件 / No event registered      */
#define AE_READABLE     1       /* 可读事件 / Readable event                 */
#define AE_WRITABLE     2       /* 可写事件 / Writable event                 */
#define AE_BARRIER      4       /* 写屏障:确保写在读之后触发
                                 * Barrier: force write-after-read ordering  */

/* 时间事件 ID 哨兵,表示"已删除" / Sentinel id meaning "deleted" */
#define AE_DELETED_EVENT_ID     (-1)

/* aeProcessEvents 标志位 / aeProcessEvents flags */
#define AE_FILE_EVENTS      (1<<0)  /* 处理文件事件 / Process file events    */
#define AE_TIME_EVENTS      (1<<1)  /* 处理时间事件 / Process time events    */
#define AE_ALL_EVENTS       (AE_FILE_EVENTS | AE_TIME_EVENTS)
#define AE_DONT_WAIT        (1<<2)  /* 不阻塞等待 / Non-blocking poll        */
#define AE_CALL_BEFORE_SLEEP (1<<3) /* 睡前回调 / Before-sleep callback      */
#define AE_CALL_AFTER_SLEEP  (1<<4) /* 醒后回调 / After-sleep callback       */

/* =========================================================================
 * 类型前置声明 / Forward declarations
 * ========================================================================= */

typedef struct aeEventLoop aeEventLoop;

/* =========================================================================
 * 回调函数原型 / Callback prototypes
 * ========================================================================= */

/**
 * 文件事件回调 / File event handler
 * @param el    事件循环 / event loop
 * @param fd    触发事件的文件描述符 / file descriptor that fired
 * @param clientData  注册时绑定的用户数据 / user data bound at registration
 * @param mask  触发的事件掩码 / event mask that fired
 */
typedef void aeFileProc(aeEventLoop *el, int fd, void *clientData, int mask);

/**
 * 时间事件回调 / Time event handler
 * 返回值:
 *   > 0  → 重新调度,距下次触发的毫秒数 (reschedule after N ms)
 *   = 0  → 立即再次触发 (fire again immediately)
 *  AE_NOMORE (-1) → 不再重复,删除该事件 (delete after this call)
 */
#define AE_NOMORE   (-1)
typedef int  aeTimeProc(aeEventLoop *el, long long id, void *clientData);

/**
 * 时间事件删除回调 / Time event finalizer
 * 当时间事件被删除时调用,用于释放 clientData
 * Called when a time event is deleted; used to free clientData.
 */
typedef void aeEventFinalizerProc(aeEventLoop *el, void *clientData);

/**
 * 睡眠前 / 睡眠后回调 / Before/after sleep callbacks
 */
typedef void aeBeforeSleepProc(aeEventLoop *el);

/* =========================================================================
 * 文件事件结构 / File Event Structure
 *
 * Redis 中每个 fd 对应一个 aeFileEvent,由数组直接按 fd 索引,O(1) 查找。
 * In Redis, one aeFileEvent per fd; stored in an array indexed by fd (O(1)).
 * ========================================================================= */
typedef struct aeFileEvent {
    int             mask;           /* 已注册的事件掩码 AE_READABLE|AE_WRITABLE
                                     * Registered event mask                 */
    aeFileProc     *rfileProc;      /* 可读回调 / Read handler               */
    aeFileProc     *wfileProc;      /* 可写回调 / Write handler              */
    void           *clientData;     /* 用户私有数据 / User private data       */
} aeFileEvent;

/* =========================================================================
 * 就绪事件结构(poll 返回结果) / Fired Event (poll result)
 * ========================================================================= */
typedef struct aeFiredEvent {
    int fd;     /* 就绪的文件描述符 / Ready file descriptor */
    int mask;   /* 就绪的事件掩码   / Ready event mask      */
} aeFiredEvent;

/* =========================================================================
 * 时间事件结构 / Time Event Structure
 *
 * 采用无序链表(Redis 实际实现)。时间事件较少(通常只有 serverCron),
 * 遍历开销可忽略,链表插入为 O(1)。
 *
 * Implemented as an unsorted linked list (matches Redis). Time events are few
 * (typically only serverCron), so linear scan is negligible; O(1) insertion.
 * ========================================================================= */
typedef struct aeTimeEvent {
    long long               id;             /* 唯一 ID / Unique event ID     */
    long long               when_sec;       /* 触发时间:秒 / Fire time (sec) */
    long long               when_ms;        /* 触发时间:毫秒 / Fire time (ms)*/
    aeTimeProc             *timeProc;       /* 到期回调 / Expiry callback     */
    aeEventFinalizerProc   *finalizerProc;  /* 删除时回调 / Deletion callback */
    void                   *clientData;     /* 用户私有数据 / User data        */
    struct aeTimeEvent     *prev;           /* 双向链表前驱 / Prev in list    */
    struct aeTimeEvent      *next;          /* 双向链表后继 / Next in list    */
    int                     refcount;       /* 引用计数,防止回调中删除自身
                                             * Ref-count; prevents self-delete
                                             * inside callback               */
} aeTimeEvent;

/* =========================================================================
 * 事件循环主结构 / Event Loop Main Structure
 * ========================================================================= */
struct aeEventLoop {
    int             maxfd;          /* 当前已注册的最大 fd
                                     * Highest fd currently registered       */
    int             setsize;        /* fd 数组大小(最大 fd 数量)
                                     * Size of fd array (max fd count)       */
    long long       timeEventNextId;/* 下一个时间事件的唯一 ID
                                     * Next time event unique ID             */
    aeFileEvent    *events;         /* 已注册的文件事件数组(按 fd 索引)
                                     * Registered file events (indexed by fd)*/
    aeFiredEvent   *fired;          /* poll 返回的就绪事件缓冲
                                     * Buffer of events fired by poll        */
    aeTimeEvent    *timeEventHead;  /* 时间事件链表头
                                     * Head of time event linked list        */
    int             stop;           /* 非 0 则退出主循环
                                     * Non-zero → exit main loop             */
    void           *apidata;        /* I/O 多路复用后端私有数据(epoll/kqueue/select)
                                     * Backend-private data (epoll/kqueue/…) */
    aeBeforeSleepProc *beforesleep; /* 每次 poll 前调用
                                     * Called before every poll              */
    aeBeforeSleepProc *aftersleep;  /* 每次 poll 后调用
                                     * Called after every poll               */
    int             flags;          /* AE_DONT_WAIT 等标志
                                     * Flags (e.g. AE_DONT_WAIT)             */
};

/* =========================================================================
 * I/O 多路复用后端(模拟 select 实现)
 * I/O Multiplexing Backend — simulated with select(2)
 *
 * 真实 Redis 优先选择 epoll(Linux)、kqueue(macOS/BSD)、/dev/poll(Solaris),
 * 最后才回退到 select。此处为可移植演示,使用 select 模拟同样的接口。
 *
 * Real Redis prefers epoll (Linux) > kqueue (macOS/BSD) > /dev/poll > select.
 * For portability in this demo we implement the same interface using select.
 * ========================================================================= */
#include <sys/select.h>

typedef struct aeApiState {
    fd_set  rfds;   /* 监听可读的 fd 集合 / fd set for readable events */
    fd_set  wfds;   /* 监听可写的 fd 集合 / fd set for writable events */
    /* 工作副本(select 会修改参数,每次需重新赋值)
     * Working copies — select() modifies its arguments on return,
     * so we restore from these each iteration.                      */
    fd_set  _rfds;
    fd_set  _wfds;
} aeApiState;

/* 初始化多路复用后端 / Initialize multiplexing backend */
static int aeApiCreate(aeEventLoop *el) {
    aeApiState *state = (aeApiState *)malloc(sizeof(aeApiState));
    if (!state) return -1;
    FD_ZERO(&state->rfds);
    FD_ZERO(&state->wfds);
    el->apidata = state;
    return 0;
}

/* 释放多路复用后端 / Free multiplexing backend */
static void aeApiFree(aeEventLoop *el) {
    free(el->apidata);
}

/**
 * 向后端添加/修改对某 fd 的监听
 * Add or modify a fd watch in the backend.
 *
 * Redis 的 epoll 实现中,若 fd 已存在则用 EPOLL_CTL_MOD,否则 EPOLL_CTL_ADD。
 * In Redis's epoll backend: EPOLL_CTL_MOD if already present, else EPOLL_CTL_ADD.
 */
static int aeApiAddEvent(aeEventLoop *el, int fd, int mask) {
    aeApiState *state = (aeApiState *)el->apidata;
    if (mask & AE_READABLE) FD_SET(fd, &state->rfds);
    if (mask & AE_WRITABLE) FD_SET(fd, &state->wfds);
    return 0;
}

/**
 * 从后端移除对某 fd 的监听
 * Remove a fd watch from the backend.
 */
static void aeApiDelEvent(aeEventLoop *el, int fd, int delmask) {
    aeApiState *state = (aeApiState *)el->apidata;
    if (delmask & AE_READABLE) FD_CLR(fd, &state->rfds);
    if (delmask & AE_WRITABLE) FD_CLR(fd, &state->wfds);
}

/**
 * 等待事件就绪(核心阻塞/非阻塞 poll)
 * Wait for events to become ready — the core blocking/non-blocking poll.
 *
 * @param tvp  NULL → 永久阻塞 / NULL means block forever
 *             tvp->tv_sec==0 && tvp->tv_usec==0 → 立即返回(非阻塞)
 * @return 就绪事件数 / number of ready events
 */
static int aeApiPoll(aeEventLoop *el, struct timeval *tvp) {
    aeApiState *state = (aeApiState *)el->apidata;
    int retval, numevents = 0;

    /* 每次 poll 前复制 fd_set,因为 select 会修改入参
     * Copy fd sets before each call since select() modifies them in place. */
    state->_rfds = state->rfds;
    state->_wfds = state->wfds;

    /* 调用 select,等待 fd 就绪或超时 / Call select: wait for ready fds or timeout */
    retval = select(el->maxfd + 1, &state->_rfds, &state->_wfds, NULL, tvp);

    if (retval > 0) {
        for (int j = 0; j <= el->maxfd; j++) {
            int mask = 0;
            aeFileEvent *fe = &el->events[j];
            if (fe->mask == AE_NONE) continue;

            /* 将 select 结果映射回 ae 掩码
             * Map select result back to ae mask. */
            if (fe->mask & AE_READABLE && FD_ISSET(j, &state->_rfds))
                mask |= AE_READABLE;
            if (fe->mask & AE_WRITABLE && FD_ISSET(j, &state->_wfds))
                mask |= AE_WRITABLE;

            if (mask) {
                el->fired[numevents].fd   = j;
                el->fired[numevents].mask = mask;
                numevents++;
            }
        }
    } else if (retval == -1 && errno != EINTR) {
        /* EINTR 表示被信号中断,正常忽略 / EINTR means interrupted by signal — ignore */
        perror("select");
    }
    return numevents;
}

/* =========================================================================
 * 工具函数:获取当前时间 / Utility: get current time
 * ========================================================================= */
static void aeGetTime(long long *seconds, long long *milliseconds) {
    struct timeval tv;
    gettimeofday(&tv, NULL);
    *seconds      = tv.tv_sec;
    *milliseconds = tv.tv_usec / 1000;
}

/* 将当前时间加上 milliseconds 毫秒,写入 *sec / *ms
 * Add `milliseconds` to now; write result into *sec / *ms. */
static void aeAddMillisecondsToNow(long long milliseconds,
                                   long long *sec, long long *ms) {
    long long cur_sec, cur_ms, when_sec, when_ms;
    aeGetTime(&cur_sec, &cur_ms);
    when_sec = cur_sec + milliseconds / 1000;
    when_ms  = cur_ms  + milliseconds % 1000;
    if (when_ms >= 1000) {          /* 毫秒进位 / carry milliseconds into seconds */
        when_sec++;
        when_ms -= 1000;
    }
    *sec = when_sec;
    *ms  = when_ms;
}

/* =========================================================================
 * 事件循环生命周期 / Event Loop Lifecycle
 * ========================================================================= */

/**
 * 创建事件循环 / Create an event loop.
 *
 * 分配 events[] 和 fired[] 数组,初始化多路复用后端。
 * Allocate events[] and fired[] arrays, initialize the multiplexing backend.
 */
aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *el;

    el = (aeEventLoop *)malloc(sizeof(aeEventLoop));
    if (!el) goto err;

    el->events = (aeFileEvent *)malloc(sizeof(aeFileEvent) * setsize);
    el->fired  = (aeFiredEvent *)malloc(sizeof(aeFiredEvent) * setsize);
    if (!el->events || !el->fired) goto err;

    el->setsize        = setsize;
    el->maxfd          = -1;
    el->timeEventHead  = NULL;
    el->timeEventNextId = 0;
    el->stop           = 0;
    el->beforesleep    = NULL;
    el->aftersleep     = NULL;
    el->flags          = 0;

    /* 所有文件事件初始化为 AE_NONE(未注册)
     * Mark all file event slots as unregistered. */
    for (int i = 0; i < setsize; i++)
        el->events[i].mask = AE_NONE;

    if (aeApiCreate(el) == -1) goto err;
    return el;

err:
    if (el) {
        free(el->events);
        free(el->fired);
        free(el);
    }
    return NULL;
}

/**
 * 停止事件循环 / Stop the event loop.
 * 设置 stop 标志,aeMain 在当前迭代结束后退出。
 * Sets stop flag; aeMain exits after current iteration.
 */
void aeStop(aeEventLoop *el) {
    el->stop = 1;
}

/**
 * 删除事件循环,释放所有资源
 * Delete the event loop and release all resources.
 */
void aeDeleteEventLoop(aeEventLoop *el) {
    aeApiFree(el);

    /* 释放所有时间事件链表节点 / Free all time event nodes */
    aeTimeEvent *te = el->timeEventHead;
    while (te) {
        aeTimeEvent *next = te->next;
        free(te);
        te = next;
    }

    free(el->events);
    free(el->fired);
    free(el);
}

/* =========================================================================
 * 文件事件注册 / File Event Registration
 * ========================================================================= */

/**
 * 注册文件事件 / Register a file event.
 *
 * mask 可以是 AE_READABLE、AE_WRITABLE 或两者的 OR。
 * mask can be AE_READABLE, AE_WRITABLE, or their OR.
 *
 * 同一个 fd 可以多次调用以追加事件类型(读/写分开注册)。
 * The same fd can be registered multiple times to add event types.
 */
int aeCreateFileEvent(aeEventLoop *el, int fd, int mask,
                      aeFileProc *proc, void *clientData) {
    if (fd >= el->setsize) {
        errno = ERANGE;
        return -1;
    }

    aeFileEvent *fe = &el->events[fd];

    /* 通知后端监听新的事件类型 / Tell backend to watch the new event type */
    if (aeApiAddEvent(el, fd, mask) == -1) return -1;

    fe->mask |= mask;   /* 合并掩码 / Merge mask bits */

    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;

    /* 更新最大 fd,供 select 使用 / Update maxfd for select */
    if (fd > el->maxfd) el->maxfd = fd;

    return 0;
}

/**
 * 注销文件事件 / Unregister a file event.
 */
void aeDeleteFileEvent(aeEventLoop *el, int fd, int mask) {
    if (fd >= el->setsize) return;

    aeFileEvent *fe = &el->events[fd];
    if (fe->mask == AE_NONE) return;    /* fd 未注册,忽略 / Not registered */

    /* AE_BARRIER 不单独删除,随读/写一起清理
     * AE_BARRIER is not deleted alone; cleared together with read/write. */
    mask &= ~AE_BARRIER;

    aeApiDelEvent(el, fd, mask);
    fe->mask = fe->mask & (~mask);  /* 清除对应位 / Clear the bit(s) */

    /* 如果 maxfd 正好是被清空的 fd,向下扫描重新定位 maxfd
     * If maxfd was the just-cleared fd, scan down to find the new maxfd. */
    if (fd == el->maxfd && fe->mask == AE_NONE) {
        for (int j = el->maxfd - 1; j >= 0; j--) {
            if (el->events[j].mask != AE_NONE) {
                el->maxfd = j;
                break;
            }
        }
    }
}

/* =========================================================================
 * 时间事件注册 / Time Event Registration
 * ========================================================================= */

/**
 * 创建时间事件 / Create a time event.
 *
 * @param milliseconds  多少毫秒后首次触发 / first fire delay in ms
 * @param proc          到期回调 / expiry callback
 * @param clientData    用户数据 / user data
 * @param finalizerProc 删除时调用(可为 NULL)/ called on deletion (may be NULL)
 * @return 新事件的唯一 ID(失败返回 AE_DELETED_EVENT_ID)
 *         unique ID of the new event (AE_DELETED_EVENT_ID on failure)
 */
long long aeCreateTimeEvent(aeEventLoop *el, long long milliseconds,
                            aeTimeProc *proc, void *clientData,
                            aeEventFinalizerProc *finalizerProc) {
    long long id = el->timeEventNextId++;

    aeTimeEvent *te = (aeTimeEvent *)malloc(sizeof(aeTimeEvent));
    if (!te) return AE_DELETED_EVENT_ID;

    te->id             = id;
    te->refcount       = 0;
    te->timeProc       = proc;
    te->finalizerProc  = finalizerProc;
    te->clientData     = clientData;

    /* 计算首次触发的绝对时间戳
     * Compute the absolute timestamp for the first fire. */
    aeAddMillisecondsToNow(milliseconds, &te->when_sec, &te->when_ms);

    /* 插入到链表头(O(1))
     * Insert at list head — O(1). */
    te->prev = NULL;
    te->next = el->timeEventHead;
    if (el->timeEventHead)
        el->timeEventHead->prev = te;
    el->timeEventHead = te;

    return id;
}

/**
 * 删除时间事件 / Delete a time event.
 *
 * 只将 id 标记为 AE_DELETED_EVENT_ID;实际释放在下一轮 processTimeEvents 中。
 * Only marks id as AE_DELETED_EVENT_ID; actual free happens in next
 * processTimeEvents pass (safe even if called from within the callback).
 */
int aeDeleteTimeEvent(aeEventLoop *el, long long id) {
    aeTimeEvent *te = el->timeEventHead;
    while (te) {
        if (te->id == id) {
            te->id = AE_DELETED_EVENT_ID;   /* 惰性删除 / Lazy deletion */
            return 0;   /* 成功 / success */
        }
        te = te->next;
    }
    return -1;  /* 未找到 / not found */
}

/* =========================================================================
 * 核心调度逻辑 / Core Dispatch Logic
 * ========================================================================= */

/**
 * 处理所有到期的时间事件 / Process all expired time events.
 *
 * 遍历链表,调用已到期事件的回调。回调返回 AE_NOMORE 则删除该事件;
 * 否则按返回值重新计算下次触发时间。
 *
 * Walk the list, firing callbacks for expired events.
 * If callback returns AE_NOMORE the event is deleted;
 * otherwise its next fire time is updated using the return value (ms).
 */
static int processTimeEvents(aeEventLoop *el) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;

    te    = el->timeEventHead;
    maxId = el->timeEventNextId - 1;

    /* 记录本轮最大 id,跳过本轮新注册的事件(避免无限递归)
     * Snapshot maxId to skip events created during this pass. */
    long long now_sec, now_ms;
    aeGetTime(&now_sec, &now_ms);

    while (te) {
        long long id;

        /* ---------- 惰性删除 / Lazy deletion ---------- */
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            /* 若还有其他地方引用(refcount),暂不释放
             * If still referenced (refcount > 0), defer free. */
            if (te->refcount == 0) {
                if (te->prev) te->prev->next = te->next;
                else          el->timeEventHead = te->next;
                if (te->next) te->next->prev = te->prev;
                if (te->finalizerProc)
                    te->finalizerProc(el, te->clientData);
                free(te);
            }
            te = next;
            continue;
        }

        /* 跳过本轮新增的事件 / Skip events created this pass */
        if (te->id > maxId) {
            te = te->next;
            continue;
        }

        /* ---------- 检查是否到期 / Check expiry ---------- */
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;
            id = te->id;
            te->refcount++;                     /* 防止回调中删除自身 / guard self-delete */
            retval = te->timeProc(el, id, te->clientData);
            te->refcount--;
            processed++;

            if (retval != AE_NOMORE) {
                /* 重新调度:计算下次触发时间
                 * Reschedule: compute next fire time. */
                aeAddMillisecondsToNow(retval, &te->when_sec, &te->when_ms);
            } else {
                /* 标记为删除,下轮清理 / Mark for deletion; cleaned next pass */
                aeDeleteTimeEvent(el, id);
            }
        }
        te = te->next;
    }
    return processed;
}

/**
 * 搜索最近要触发的时间事件
 * Search for the nearest time event (earliest fire time).
 *
 * 用于计算 poll 的超时值:若没有时间事件则永久阻塞;
 * 若有则最多阻塞到最近一个时间事件到期。
 *
 * Used to compute the poll timeout: block forever if no time events exist;
 * otherwise block at most until the nearest expiry.
 */
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *el) {
    aeTimeEvent *te  = el->timeEventHead;
    aeTimeEvent *nearest = NULL;

    while (te) {
        if (!nearest ||
            te->when_sec < nearest->when_sec ||
            (te->when_sec == nearest->when_sec && te->when_ms < nearest->when_ms))
        {
            nearest = te;
        }
        te = te->next;
    }
    return nearest;
}

/**
 * aeProcessEvents — 单轮事件处理
 * aeProcessEvents — single iteration of event processing.
 *
 * 这是 Redis 事件循环的心脏。每次调用:
 *   1. 计算 poll 超时(由最近时间事件决定)
 *   2. 可选调用 beforesleep 回调
 *   3. 调用后端 poll(select/epoll/kqueue)
 *   4. 可选调用 aftersleep 回调
 *   5. 依次分发所有就绪的文件事件回调
 *   6. 处理所有到期的时间事件
 *
 * This is the heart of the Redis event loop. Each call:
 *   1. Computes poll timeout (driven by nearest time event)
 *   2. Optionally calls beforesleep
 *   3. Calls backend poll (select/epoll/kqueue)
 *   4. Optionally calls aftersleep
 *   5. Dispatches all ready file event callbacks
 *   6. Processes all expired time events
 *
 * @return 本轮处理的事件总数 / total events processed this iteration
 */
int aeProcessEvents(aeEventLoop *el, int flags) {
    int processed = 0, numevents;

    /* 若既不处理文件事件也不处理时间事件,直接返回
     * If neither file nor time events are requested, return immediately. */
    if (!(flags & AE_FILE_EVENTS) && !(flags & AE_TIME_EVENTS)) return 0;

    /* ----------------------------------------------------------------
     * 计算 poll 超时 / Compute poll timeout
     * ---------------------------------------------------------------- */
    struct timeval tv, *tvp = NULL;

    if (flags & AE_DONT_WAIT) {
        /* 非阻塞模式:超时立即设为 0 / Non-blocking: set timeout to 0 immediately */
        tv.tv_sec  = 0;
        tv.tv_usec = 0;
        tvp = &tv;
    } else if (el->maxfd != -1 || (flags & AE_TIME_EVENTS)) {
        aeTimeEvent *shortest = aeSearchNearestTimer(el);

        if (shortest) {
            /* 计算距最近时间事件还有多久 / Compute time until nearest event */
            long long now_sec, now_ms;
            aeGetTime(&now_sec, &now_ms);
            tv.tv_sec  = shortest->when_sec - now_sec;
            tv.tv_usec = (shortest->when_ms  - now_ms) * 1000;

            /* 已过期:超时设为 0(立即返回)/ Already expired → timeout 0 */
            if (tv.tv_sec < 0)  tv.tv_sec  = 0;
            if (tv.tv_usec < 0) tv.tv_usec = 0;
            tvp = &tv;
        }
        /* 否则 tvp 保持 NULL → select 永久阻塞(无任何事件)
         * Otherwise tvp stays NULL → select blocks forever (no events at all). */
    }

    /* ----------------------------------------------------------------
     * beforesleep 回调 / beforesleep callback
     * ---------------------------------------------------------------- */
    if (el->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP))
        el->beforesleep(el);

    /* ----------------------------------------------------------------
     * 调用后端 poll / Call backend poll
     * ---------------------------------------------------------------- */
    numevents = aeApiPoll(el, tvp);

    /* ----------------------------------------------------------------
     * aftersleep 回调 / aftersleep callback
     * ---------------------------------------------------------------- */
    if (el->aftersleep != NULL && (flags & AE_CALL_AFTER_SLEEP))
        el->aftersleep(el);

    /* ----------------------------------------------------------------
     * 分发文件事件 / Dispatch file events
     *
     * AE_BARRIER:若同一 fd 同时可读可写,先处理写事件再处理读事件。
     * 用于在回复客户端之前确保数据已持久化(AOF fsync 场景)。
     *
     * AE_BARRIER: if an fd is both readable and writable in the same
     * iteration, fire the write callback BEFORE the read callback.
     * Used to guarantee persistence (AOF fsync) before replying.
     * ---------------------------------------------------------------- */
    for (int j = 0; j < numevents; j++) {
        aeFileEvent *fe = &el->events[el->fired[j].fd];
        int mask  = el->fired[j].mask;
        int fd    = el->fired[j].fd;
        int fired = 0;  /* 本 fd 已触发的事件数 / events fired for this fd */

        /* BARRIER:写优先于读 / With BARRIER: write fires before read */
        int invert = fe->mask & AE_BARRIER;

        if (!invert && (mask & AE_READABLE) && fe->rfileProc) {
            fe->rfileProc(el, fd, fe->clientData, mask);
            fired++;
        }
        if ((mask & AE_WRITABLE) && fe->wfileProc &&
            /* 避免同一回调函数因读写均就绪而被调用两次
             * Avoid calling the same proc twice if both events are ready. */
            (!fired || fe->wfileProc != fe->rfileProc))
        {
            fe->wfileProc(el, fd, fe->clientData, mask);
            fired++;
        }
        /* BARRIER 模式下,读在写之后触发 / With BARRIER, read fires after write */
        if (invert && (mask & AE_READABLE) && fe->rfileProc) {
            if (!fired || fe->rfileProc != fe->wfileProc)
                fe->rfileProc(el, fd, fe->clientData, mask);
        }
        processed++;
    }

    /* ----------------------------------------------------------------
     * 处理时间事件 / Process time events
     * ---------------------------------------------------------------- */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(el);

    return processed;
}

/* =========================================================================
 * 主循环 / Main Loop
 * ========================================================================= */

/**
 * aeMain — 启动事件循环,直到 aeStop() 被调用
 * aeMain — start the event loop; runs until aeStop() is called.
 *
 * 这是 Redis server.c 中 main() 最后调用的函数。
 * This is the last function called in Redis's server.c main().
 */
void aeMain(aeEventLoop *el) {
    el->stop = 0;
    while (!el->stop) {
        /* 每轮同时处理文件事件与时间事件,允许睡眠前/后回调
         * Process both file and time events each iteration,
         * enabling before/after-sleep hooks. */
        aeProcessEvents(el,
            AE_ALL_EVENTS | AE_CALL_BEFORE_SLEEP | AE_CALL_AFTER_SLEEP);
    }
}

/* =========================================================================
 * 演示 / Demo
 * ========================================================================= */

/* 模拟 Redis serverCron 周期性任务回调
 * Simulates Redis's serverCron periodic task callback. */
static int serverCron(aeEventLoop *el, long long id, void *clientData) {
    (void)el; (void)id; (void)clientData;

    static int tick = 0;
    printf("[serverCron] tick #%d — 周期性任务 / periodic task running\n", ++tick);

    /* 返回 100 表示 100ms 后再次触发 / Return 100 → reschedule after 100 ms */
    if (tick >= 5) return AE_NOMORE;   /* 演示:5 次后停止 / stop after 5 ticks */
    return 100;
}

/* beforesleep 回调示意 / beforesleep callback example */
static void beforeSleep(aeEventLoop *el) {
    (void)el;
    /* 真实 Redis 在此处理快速 IO、AOF flush、惰性释放等
     * Real Redis handles fast-path IO, AOF flush, lazy-free, etc. here. */
}

int main(void) {
    printf("========================================\n");
    printf(" Redis AE 事件循环演示 / AE Event Loop Demo\n");
    printf("========================================\n\n");

    /* 创建事件循环 / Create event loop */
    aeEventLoop *el = aeCreateEventLoop(AE_SETSIZE);
    if (!el) {
        fprintf(stderr, "aeCreateEventLoop 失败 / failed\n");
        return 1;
    }

    /* 注册 beforesleep 钩子 / Register beforesleep hook */
    el->beforesleep = beforeSleep;

    /* 注册 serverCron:首次 100ms 后触发 / Register serverCron: first fire in 100 ms */
    long long cronId = aeCreateTimeEvent(el, 100, serverCron, NULL, NULL);
    if (cronId == AE_DELETED_EVENT_ID) {
        fprintf(stderr, "aeCreateTimeEvent 失败 / failed\n");
        aeDeleteEventLoop(el);
        return 1;
    }
    printf("[main] serverCron 已注册,id=%lld / registered, id=%lld\n\n",
           cronId, cronId);

    /*
     * 启动主循环。
     * serverCron 触发 5 次后返回 AE_NOMORE,时间事件链表为空;
     * 此时 poll 将永久阻塞(无 fd 监听、无时间事件)。
     * 为演示方便,我们手动在 serverCron 返回 AE_NOMORE 前停止循环。
     *
     * Start main loop.
     * After serverCron fires 5 times it returns AE_NOMORE; the time event
     * list becomes empty and poll would block forever (no fds, no timers).
     * For this demo we stop the loop manually when serverCron removes itself.
     *
     * 在真实 Redis 中,serverCron 每 1000/hz ms 重新调度自身,永不结束。
     * In real Redis, serverCron reschedules itself every 1000/hz ms forever.
     */

    /* 手动执行几轮,直到 serverCron 完成为止 / Manually pump until serverCron is done */
    for (int i = 0; i < 20; i++) {
        int n = aeProcessEvents(el, AE_ALL_EVENTS | AE_DONT_WAIT);
        if (n == 0 && el->timeEventHead == NULL) {
            printf("\n[main] 无更多事件,退出循环 / No more events, exiting loop\n");
            break;
        }
        /* 小延迟模拟真实时间流逝 / Small delay to simulate real elapsed time */
        struct timespec ts = {0, 110 * 1000 * 1000};  /* 110 ms */
        nanosleep(&ts, NULL);
    }

    aeDeleteEventLoop(el);
    printf("[main] 事件循环已销毁 / Event loop destroyed\n");
    return 0;
}

4. 核心数据结构设计

4.1 文件事件 aeFileEvent

typedef struct aeFileEvent {
    int         mask;        // 已注册的事件类型:AE_READABLE | AE_WRITABLE | AE_BARRIER
    aeFileProc *rfileProc;   // 可读回调函数
    aeFileProc *wfileProc;   // 可写回调函数
    void       *clientData;  // 用户私有数据(通常是 client 指针)
} aeFileEvent;

关键设计:Redis 用一个 aeFileEvent 数组,下标就是 fd 编号。这样通过 fd 查找事件是 O(1),不需要任何哈希表。

events[0]  → fd=0 的事件(通常是 stdin)
events[1]  → fd=1 的事件(通常是 stdout)
events[3]  → fd=3 的事件(通常是 server socket)
events[5]  → fd=5 的事件(某个客户端连接)
...

代价是内存预分配:aeCreateEventLoop(setsize) 时一次性分配 setsize 个槽位。Redis 默认是 10240,在内存充裕的服务器上完全可以接受。

4.2 时间事件 aeTimeEvent

typedef struct aeTimeEvent {
    long long             id;            // 唯一 ID,由 timeEventNextId 自增
    long long             when_sec;      // 触发时间(秒)
    long long             when_ms;       // 触发时间(毫秒,0-999)
    aeTimeProc           *timeProc;      // 到期回调
    aeEventFinalizerProc *finalizerProc; // 删除时的清理回调(可为 NULL)
    void                 *clientData;    // 用户私有数据
    struct aeTimeEvent   *prev;          // 双向链表前驱
    struct aeTimeEvent   *next;          // 双向链表后继
    int                   refcount;      // 引用计数(防止回调中删除自身导致崩溃)
} aeTimeEvent;

时间事件回调的约定

返回值 含义
> 0 重新调度,N 毫秒后再次触发
0 立刻再次触发(下一轮就触发)
-1AE_NOMORE 只触发一次,触发后自动删除

4.3 就绪事件缓冲 aeFiredEvent

typedef struct aeFiredEvent {
    int fd;    // 就绪的 fd
    int mask;  // 就绪的事件类型(AE_READABLE / AE_WRITABLE)
} aeFiredEvent;

这是 aeApiPoll 写入结果的缓冲区,大小同样是 setsize

4.4 事件循环主体 aeEventLoop

struct aeEventLoop {
    int               maxfd;            // 当前已注册的最大 fd(select 需要)
    int               setsize;          // events[] 和 fired[] 的容量
    long long         timeEventNextId;  // 下一个时间事件的 ID(单调递增)
    aeFileEvent      *events;           // 文件事件数组(按 fd 索引)
    aeFiredEvent     *fired;            // poll 就绪事件缓冲
    aeTimeEvent      *timeEventHead;    // 时间事件链表头
    int               stop;             // 非 0 时退出主循环
    void             *apidata;          // I/O 后端私有数据(epoll_fd / kqueue_fd 等)
    aeBeforeSleepProc *beforesleep;     // poll 前钩子
    aeBeforeSleepProc *aftersleep;      // poll 后钩子
    int               flags;            // 运行时标志
};

5. I/O 多路复用后端抽象

5.1 为什么要做后端抽象?

不同操作系统提供的 I/O 多路复用 API 不同:

  • Linuxepoll(最高效,O(1) 事件通知,支持边缘触发)
  • macOS / BSDkqueue(功能更丰富,同样高效)
  • Solaris/dev/pollevent ports
  • 通用select(最古老,fd 数量上限 1024,每次 O(n) 扫描)

Redis 希望同一套业务代码在所有平台上运行,因此把这些差异封装成统一接口,通过 编译时 #include 选择后端:

// ae.c 末尾
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

5.2 统一接口:四个函数

每个后端只需实现以下四个静态函数:

static int  aeApiCreate(aeEventLoop *el);           // 初始化后端(创建 epoll_fd 等)
static void aeApiFree(aeEventLoop *el);             // 释放后端资源
static int  aeApiAddEvent(aeEventLoop *el, int fd, int mask);  // 注册/修改 fd 监听
static void aeApiDelEvent(aeEventLoop *el, int fd, int delmask); // 移除 fd 监听
static int  aeApiPoll(aeEventLoop *el, struct timeval *tvp);   // 等待就绪事件

所有后端实现都放在 el->apidata 里,上层代码完全感知不到差异。

5.3 select 实现详解

typedef struct aeApiState {
    fd_set rfds;   // 关注可读的 fd 集合(永久保存)
    fd_set wfds;   // 关注可写的 fd 集合(永久保存)
    fd_set _rfds;  // 每次 poll 前复制的工作副本
    fd_set _wfds;  // 每次 poll 前复制的工作副本
} aeApiState;

⚠️ 关键细节select()原地修改 fd_set 参数,调用返回后 fd_set 中只保留就绪的 fd。如果不使用副本,下一轮 poll 就会丢失之前注册的所有监听!

static int aeApiPoll(aeEventLoop *el, struct timeval *tvp) {
    aeApiState *state = el->apidata;

    // 每次都要先复制,因为 select 会修改入参
    state->_rfds = state->rfds;
    state->_wfds = state->wfds;

    int retval = select(el->maxfd + 1,
                        &state->_rfds, &state->_wfds, NULL, tvp);
    // ...
}

5.4 epoll vs kqueue vs select 对比

特性 select epoll kqueue
平台 全平台 Linux macOS/BSD
fd 上限 1024(FD_SETSIZE 无(受系统限制)
时间复杂度 O(n)(每次扫描全部 fd) O(1)(内核直接通知) O(1)
触发模式 水平触发 水平/边缘触发 水平/边缘触发
内核拷贝 每次都需要从用户空间拷贝 fd_set 只在注册时拷贝一次 只在注册时拷贝一次
Redis 优先级 最低(回退方案) 最高(Linux) 最高(macOS/BSD)

6. 文件事件注册与注销

6.1 Redis文件事件

以下内容来自 :《Redis 设计与实现》https://redisbook.readthedocs.io/en/latest/internal/ae.html

Redis 服务器通过在多个客户端之间进行多路复用, 从而实现高效的命令请求处理: 多个客户端通过套接字连接到 Redis 服务器中, 但只有在套接字可以无阻塞地进行读或者写时, 服务器才会和这些客户端进行交互。

Redis 将这类因为对套接字进行多路复用而产生的事件称为文件事件(file event), 文件事件可以分为读事件和写事件两类。

(1) 读事件

读事件标志着客户端命令请求的发送状态。

当一个新的客户端连接到服务器时, 服务器会给为该客户端绑定读事件, 直到客户端断开连接之后, 这个读事件才会被移除。

读事件在整个网络连接的生命期内, 都会在等待和就绪两种状态之间切换:

  • 当客户端只是连接到服务器,但并没有向服务器发送命令时,该客户端的读事件就处于等待状态。
  • 当客户端给服务器发送命令请求,并且请求已到达时(相应的套接字可以无阻塞地执行读操作),该客户端的读事件处于就绪状态。 作为例子, 下图展示了三个已连接到服务器、但并没有发送命令的客户端:

Image

这三个客户端的状态如下表: | 客户端 | 读事件状态 | 命令发送状态 | |——|——–|——–| | 客户端 X | 等待 | 未发送 | | 客户端 Y | 等待 | 未发送 | | 客户端 Z | 等待 | 未发送 | 之后, 当客户端 X 向服务器发送命令请求, 并且命令请求已到达时, 客户端 X 的读事件状态变为就绪:

Image

这时, 三个客户端的状态如下表(只有客户端 X 的状态被更新了): |——|——–|——–| | 客户端 | 读事件状态 | 命令发送状态 | | 客户端 X | 就绪 | 已发送,并且已到达 | | 客户端 Y | 等待 | 未发送 | | 客户端 Z | 等待 | 未发送 | 当事件处理器被执行时, 就绪的文件事件会被识别到, 相应的命令请求会被发送到命令执行器, 并对命令进行求值。

(2) 写事件

写事件标志着客户端对命令结果的接收状态。

和客户端自始至终都关联着读事件不同, 服务器只会在有命令结果要传回给客户端时, 才会为客户端关联写事件, 并且在命令结果传送完毕之后, 客户端和写事件的关联就会被移除。

一个写事件会在两种状态之间切换:

  • 当服务器有命令结果需要返回给客户端,但客户端还未能执行无阻塞写,那么写事件处于等待状态。
  • 当服务器有命令结果需要返回给客户端,并且客户端可以进行无阻塞写,那么写事件处于就绪状态。 当客户端向服务器发送命令请求, 并且请求被接受并执行之后, 服务器就需要将保存在缓存内的命令执行结果返回给客户端, 这时服务器就会为客户端关联写事件。

作为例子, 下图展示了三个连接到服务器的客户端, 其中服务器正等待客户端 X 变得可写, 从而将命令的执行结果返回给它:

Image

此时三个客户端的事件状态分别如下表:

客户端 读事件状态 写事件状态 客户端 X 等待 等待 客户端 Y 等待 无 客户端 Z 等待 无 当客户端 X 的套接字可以进行无阻塞写操作时, 写事件就绪, 服务器将保存在缓存内的命令执行结果返回给客户端:

Image

此时三个客户端的事件状态分别如下表(只有客户端 X 的状态被更新了):

客户端 读事件状态 写事件状态 客户端 X 等待 已就绪 客户端 Y 等待 无 客户端 Z 等待 无 当命令执行结果被传送回客户端之后, 客户端和写事件之间的关联会被解除(只剩下读事件), 至此, 返回命令执行结果的动作执行完毕:

Image

读事件只有在客户端断开和服务器的连接时,才会被移除。 这也就是说,当客户端关联写事件的时候,实际上它在同时关联读/写两种事件。 因为在同一次文件事件处理器的调用中, 单个客户端只能执行其中一种事件(要么读,要么写,但不能又读又写),当出现读事件和写事件同时就绪的情况时,事件处理器优先处理读事件。 这也就是说,当服务器有命令结果要返回客户端,而客户端又有新命令请求进入时,服务器先处理新命令请求。

注册:aeCreateFileEvent

int aeCreateFileEvent(aeEventLoop *el, int fd, int mask,
                      aeFileProc *proc, void *clientData) {
    if (fd >= el->setsize) return -1;  // fd 超出容量

    aeFileEvent *fe = &el->events[fd];

    // 1. 通知后端(epoll_ctl ADD/MOD)
    if (aeApiAddEvent(el, fd, mask) == -1) return -1;

    // 2. 合并掩码位(同一 fd 可分别注册读/写)
    fe->mask |= mask;

    // 3. 绑定回调
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;

    // 4. 更新 maxfd(select 需要知道最大 fd)
    if (fd > el->maxfd) el->maxfd = fd;

    return 0;
}

注意:读和写可以绑定不同的回调函数,也可以绑定同一个(Redis 中客户端读写通常是不同的函数)。

注销:aeDeleteFileEvent

void aeDeleteFileEvent(aeEventLoop *el, int fd, int mask) {
    aeFileEvent *fe = &el->events[fd];

    aeApiDelEvent(el, fd, mask);     // 通知后端
    fe->mask &= ~mask;               // 清除对应掩码位

    // 如果 maxfd 恰好是被清空的 fd,需要向下扫描重新定位
    if (fd == el->maxfd && fe->mask == AE_NONE) {
        for (int j = el->maxfd - 1; j >= 0; j--) {
            if (el->events[j].mask != AE_NONE) {
                el->maxfd = j;
                break;
            }
        }
    }
}

6.1 读事件状态切换(等待 <-> 就绪)

结合 Redis 设计与实现中的图示,可以把客户端读事件理解成下面两种状态循环:

  • 等待:客户端已连接,但还没有命令到达;
  • 就绪:客户端命令已经到达,套接字可无阻塞读取。
客户端连接建立
    ↓
绑定 AE_READABLE
    ↓
[等待] --(命令到达)--> [就绪] --(读取并处理)--> [等待]
    ↑                                         |
    └--------------(连接关闭)------------------┘ 解绑读事件

这也是为什么 Redis 客户端在整个生命周期中几乎一直带着读事件,直到断开连接才移除。

6.2 写事件状态切换(按需注册、发送后移除)

写事件和读事件最大的区别是:写事件不是常驻的

  • 有回复要发,但暂时不可写:写事件处于等待;
  • 套接字可写:写事件就绪,发送输出缓冲区;
  • 缓冲区清空:立即移除写事件。
命令执行完成,产生回复
    ↓
绑定 AE_WRITABLE
    ↓
[等待可写] --(fd 可写)--> [就绪并发送] --(发送完毕)--> 解绑 AE_WRITABLE

这套机制避免了长期监听 “总是可写” 的无意义事件,减少事件循环负担。


7. 时间事件注册与调度

注册:aeCreateTimeEvent

long long aeCreateTimeEvent(aeEventLoop *el, long long milliseconds,
                             aeTimeProc *proc, void *clientData,
                             aeEventFinalizerProc *finalizerProc) {
    long long id = el->timeEventNextId++;   // 分配唯一 ID

    aeTimeEvent *te = malloc(sizeof(aeTimeEvent));
    te->id            = id;
    te->timeProc      = proc;
    te->finalizerProc = finalizerProc;
    te->clientData    = clientData;
    te->refcount      = 0;

    // 计算首次触发的绝对时间戳
    aeAddMillisecondsToNow(milliseconds, &te->when_sec, &te->when_ms);

    // 插入链表头(O(1))
    te->prev = NULL;
    te->next = el->timeEventHead;
    if (el->timeEventHead)
        el->timeEventHead->prev = te;
    el->timeEventHead = te;

    return id;
}

7.1 为什么用链表而不是堆?

从算法角度看,最小堆(优先队列)查找最近到期事件是 O(log n),而无序链表是 O(n)。堆似乎更优?

Redis 的选择是无序链表,原因很实际:

  1. Redis 的时间事件极少。在正式版本中,整个 Redis 进程通常只有 1 个时间事件serverCron)。O(n) 遍历的开销几乎为零。
  2. 链表插入是 O(1),而堆插入是 O(log n)。对于极少量元素,链表完全够用。
  3. 代码更简单,维护成本低。

如果未来 Redis 时间事件数量大幅增加,可以轻松换成最小堆,上层接口无需改变。

7.2 惰性删除与引用计数

调用 aeDeleteTimeEvent 时,Redis 不立即释放内存,只是将 id 标记为 AE_DELETED_EVENT_ID(-1)。真正的释放发生在下一次 processTimeEvents 遍历时:

// 在 processTimeEvents 中
if (te->id == AE_DELETED_EVENT_ID) {
    if (te->refcount == 0) {    // 确认没有在执行中
        // 从链表中摘除并 free
    }
    continue;
}

为什么需要 refcount

考虑这个场景:时间事件的回调函数在执行过程中调用了 aeDeleteTimeEvent(el, id)(删除自身)。如果立即 free,processTimeEvents 在回调返回后继续访问 te->next 就是野指针,程序崩溃。

refcount 解决了这个问题:

te->refcount++;            // 进入回调前加一
retval = te->timeProc(...);
te->refcount--;            // 回调返回后减一
// 此时如果 te->id 已被标记删除,下次扫描到时 refcount 为 0,才真正 free

8. 核心调度:aeProcessEvents

这是整个 AE 的心脏,每次调用完成一轮完整的事件处理。

int aeProcessEvents(aeEventLoop *el, int flags) {
    // ① 计算 poll 超时
    // ② beforesleep 钩子
    // ③ aeApiPoll(阻塞等待)
    // ④ aftersleep 钩子
    // ⑤ 分发文件事件
    // ⑥ 处理时间事件
}

8.1 超时时间的计算

poll 的超时时间决定了 Redis 最多等待多久才去检查时间事件:

超时 = min(所有时间事件中最近的到期时间 - 当前时间, ∞)

具体逻辑:

if (flags & AE_DONT_WAIT) {
    // 非阻塞模式:立即返回
    tv = {0, 0};
    tvp = &tv;
} else {
    aeTimeEvent *shortest = aeSearchNearestTimer(el);
    if (shortest) {
        // 计算距到期还有多久
        tv.tv_sec  = shortest->when_sec - now_sec;
        tv.tv_usec = (shortest->when_ms - now_ms) * 1000;
        if (tv.tv_sec < 0 || tv.tv_usec < 0) tv = {0, 0};  // 已过期
        tvp = &tv;
    }
    // 否则 tvp = NULL → 永久阻塞(没有任何时间事件)
}

这个设计保证了:时间事件的精度取决于 poll 阻塞的时长,最大误差约等于 poll 超时时间。Redis serverCron 默认每 1000/hz ms 执行一次,hz 默认为 10,即每 100ms 一次,精度完全够用。

8.2 AE_BARRIER:写优先于读

通常情况下,AE 按照”先处理读、再处理写”的顺序分发事件。但 Redis 引入了 AE_BARRIER 标志,强制写在读之前触发

正常模式:  READ  →  WRITE
BARRIER 模式:WRITE  →  READ

使用场景:AOF(Append Only File)持久化。

Redis 在回复客户端之前,必须确保数据已经写入磁盘(fsync)。如果先处理读(接受新命令),再处理写(发送回复),可能在回复还没发出去时就崩溃,客户端误以为命令没有执行。

设置了 AE_BARRIER 的 fd,AE 会先调用 wfileProc(发送回复 / AOF fsync),再调用 rfileProc(读取新命令)。

int invert = fe->mask & AE_BARRIER;

if (!invert && (mask & AE_READABLE)) {
    fe->rfileProc(el, fd, fe->clientData, mask);  // 先读
}
if (mask & AE_WRITABLE) {
    fe->wfileProc(el, fd, fe->clientData, mask);  // 后写
}
if (invert && (mask & AE_READABLE)) {
    fe->rfileProc(el, fd, fe->clientData, mask);  // BARRIER:读在写之后
}

8.3 beforesleep / aftersleep 钩子

Redis 在 server.c 中注册了这两个钩子:

aeSetBeforeSleepProc(server.el, beforeSleep);
aeSetAfterSleepProc(server.el, afterSleep);

beforeSleep(poll 前) 做的事情:

  • 处理 pending 的命令(pipeline 场景)
  • AOF 缓冲区刷盘
  • 惰性释放(lazyfree
  • 向 replica 发送积压的复制数据

afterSleep(poll 后) 做的事情:

  • 处理 module 事件
  • 更新服务器时钟缓存(server.unixtime

8.4 同时可读可写时如何选择?

在 Redis 的常规语义里,同一个 fd 既可读又可写时,默认优先处理读事件。可以理解为:

  • 先尽快把新命令读进来,保持请求吞吐;
  • 再处理发送回复。

这与 Redis 设计与实现中的描述一致:同一次分发里不会对同一连接无限制地交错抢占,而是按既定顺序执行。你在 8.2 看到的 AE_BARRIER 则是特殊场景下的顺序改写机制。

8.5 调度语义:合作执行,不保证精确定时

文件事件和时间事件是合作关系,不是抢占关系,核心语义有三条:

  1. 一次循环里,先处理文件事件,再处理时间事件;
  2. poll 最大阻塞时间由最近到期时间事件决定;
  3. 时间事件可能晚于计划时间执行(因为前面可能在处理文件事件)。

对循环时间事件(例如 serverCron)来说,Redis 保证的是”至少执行”,而不是”严格每隔 t 毫秒精确执行”。更准确地说:

  • 如果两次处理之间已经过去 >= t,那么该事件至少会被处理一次;
  • 但不承诺每个精确时间片都准点触发。

这也是单线程、非抢占事件循环在高吞吐场景下的正常行为。


9. 主循环 aeMain

void aeMain(aeEventLoop *el) {
    el->stop = 0;
    while (!el->stop) {
        aeProcessEvents(el,
            AE_ALL_EVENTS | AE_CALL_BEFORE_SLEEP | AE_CALL_AFTER_SLEEP);
    }
}

就这么简单。Redis 的 server.cmain() 函数的最后两行是:

aeSetBeforeSleepProc(server.el, beforeSleep);
aeMain(server.el);   // 进入事件循环,永不返回(直到 SHUTDOWN)

aeStop(el) 被调用后(收到 SHUTDOWN 命令时),el->stop 置为 1,下一轮迭代结束后 aeMain 返回,Redis 进程优雅退出。


10. 与 Libevent / libuv 的对比

维度 Redis AE Libevent libuv
代码量 ~800 行 ~30,000 行 ~30,000 行
依赖 无(纯 C 标准库)
事件类型 文件事件 + 时间事件 文件/信号/定时/自定义 I/O/定时/信号/进程/线程池
线程安全 否(单线程设计) 部分
平台后端 epoll/kqueue/select epoll/kqueue/IOCP/select 同 Libevent + IOCP
适用场景 Redis 内嵌,专用 通用网络库 Node.js 底层,跨平台
性能 极高(零额外开销)

AE 的核心优势是极简:没有线程安全锁、没有复杂的事件优先级,为单线程高性能网络服务量身打造。


11. 总结

通过手写 Redis AE 事件循环器,我们深入理解了以下几个核心设计:

  1. fd 数组直接索引:O(1) 查找文件事件,用内存换时间。

  2. I/O 多路复用后端抽象:通过编译时 #include 选择最优后端,上层代码零感知。四个统一接口函数是整个抽象的精髓。

  3. 时间事件用无序链表:在事件数量极少时,简单的链表比复杂的堆更实用,符合 KISS 原则。

  4. 惰性删除 + 引用计数:解决了”在回调中删除自身”这个经典的链表并发安全问题(单线程版本的安全问题)。

  5. AE_BARRIER 写屏障:确保 AOF 数据持久化之后再回复客户端,是 Redis 数据安全的重要保障。

  6. 超时时间的精确计算:poll 超时与时间事件的协同,是 AE 同时驱动文件事件和时间事件的关键机制。

AE 的设计哲学体现了 Unix 的传统美学:做一件事,做到极致。它不是一个通用的事件驱动库,而是专门为 Redis 这种单线程、高并发、网络密集型服务设计的,在极简的代码中蕴含了深刻的工程智慧。


参考资料


关注