文章内容更新请以 WGrape GitHub博客 : 手写Redis事件循环器AE:原理与实现 为准
手写Redis事件循环器AE:原理与实现
本文结合 Redis 源码(
ae.c)与手写实践,深入讲解 Redis 事件循环器 AE(Another Event Loop)的设计思路、核心数据结构、I/O 多路复用后端抽象,以及实现过程中遇到的典型问题与解决方案。
1. 为什么 Redis 是单线程却这么快?
很多人第一次听说 Redis 是单线程模型时都会有疑惑:单线程意味着同一时刻只有一个任务在执行,怎么可能支撑每秒数十万的 QPS?
答案在于 I/O 多路复用(I/O Multiplexing) 和 事件驱动(Event-Driven) 的完美结合。
Redis 的性能瓶颈从来不在 CPU,而在网络 I/O。当 Redis 在等待一个客户端发来数据时,CPU 是空闲的。传统多线程方案会为每个连接分配一个线程去阻塞等待,线程上下文切换开销极大。而 Redis 的做法是:
用一个线程,通过 epoll/kqueue/select 同时监听成千上万个连接,哪个连接有数据来了就处理哪个,CPU 永远不闲着。
这套机制在 Redis 内部被封装成一个独立的事件循环库,叫做 AE。
2. AE 是什么?
AE 全称 Another Event Loop,是 Redis 作者 Salvatore Sanfilippo(antirez)为 Redis 量身打造的轻量级事件驱动库。整个库只有不到 800 行 C 代码,却驱动着 Redis 所有的网络 I/O 和定时任务。
AE 的核心设计哲学:
- 极简主义:没有 Libevent 那样复杂的抽象层,代码直接、清晰;
- 可替换后端:通过编译时宏选择 epoll / kqueue / /dev/poll / select,对上层完全透明;
- 两类事件:文件事件(网络 I/O)+ 时间事件(定时任务),覆盖 Redis 所有需求。
3. AE 实现
(1) 整体架构
┌─────────────────────────────────────────────────────────────┐
│ aeMain() │
│ (主循环 / Main Loop) │
└──────────────────────────┬──────────────────────────────────┘
│ 每轮迭代 / each iteration
▼
┌─────────────────────────────────────────────────────────────┐
│ aeProcessEvents() │
│ (核心调度 / Dispatcher) │
│ │
│ ① 计算最近时间事件的到期时间 → 作为 poll 超时 │
│ ② beforesleep() 钩子 │
│ ③ aeApiPoll() ← 阻塞等待 I/O 就绪 │
│ ④ aftersleep() 钩子 │
│ ⑤ 遍历 fired[],分发文件事件回调 │
│ ⑥ processTimeEvents() 处理到期时间事件 │
└─────────────────────────────────────────────────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌──────────────────────┐
│ 文件事件 │ │ 时间事件 │
│ aeFileEvent[] │ │ aeTimeEvent 链表 │
│ (按 fd 索引) │ │ (无序双向链表) │
└────────┬────────┘ └──────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ I/O 多路复用后端(可替换) │
│ Linux: ae_epoll.c macOS/BSD: ae_kqueue.c │
│ Solaris: ae_evport.c 通用回退: ae_select.c │
└─────────────────────────────────────────────────────────────┘
(2) 核心流程
Redis 启动
│
▼
aeCreateEventLoop(10240)
│ 分配 events[10240]、fired[10240]
│ 初始化 epoll/kqueue/select 后端
│
▼
注册监听 socket(fd=5)
aeCreateFileEvent(el, 5, AE_READABLE, acceptHandler, NULL)
│
▼
注册 serverCron
aeCreateTimeEvent(el, 1000/hz ms, serverCron, NULL, NULL)
│
▼
aeMain(el)
│
└──► 循环开始 ◄────────────────────────────────────────┐
│ │
▼ │
aeSearchNearestTimer → 计算超时 tvp │
│ │
▼ │
beforesleep() → AOF flush、快速 I/O 等 │
│ │
▼ │
aeApiPoll(el, tvp) ←── 阻塞,等待网络事件或超时 │
│ │
▼ │
aftersleep() │
│ │
▼ │
遍历 fired[] │
├─ fd=5 可读 → acceptHandler → 建立新连接 │
├─ fd=7 可读 → readQueryFromClient → 处理命令 │
└─ fd=7 可写 → sendReplyToClient → 发送响应 │
│ │
▼ │
processTimeEvents() │
└─ serverCron 到期 → 执行定期清理、统计、持久化等 │
│ │
└──────────────────────────────────────────┘
(3) 源代码
/**
* redis_ae.c
*
* A simple event-driven programming library.
* Redis AE (Another Event Loop) 事件驱动库核心设计实现
* Redis AE (Another Event Loop) - Core Event-Driven Library Design
*
* 本文件还原了 Redis ae 事件循环的核心架构,包含:
* This file reproduces the core architecture of Redis ae event loop, including:
* 1. 文件事件(File Events)—— I/O 多路复用
* 2. 时间事件(Time Events)—— 定时器/周期任务
* 3. 事件循环主体(aeMain)—— 驱动整个服务运行
*
* 参考来源 / Reference: https://github.com/redis/redis/blob/unstable/src/ae.c
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <time.h>
#include <sys/time.h>
/* =========================================================================
* 常量定义 / Constants
* ========================================================================= */
/* 每次 epoll/kqueue/select 最多处理的事件数
* Max number of events processed per poll call */
#define AE_SETSIZE (1024 * 10)
/* 文件事件类型掩码 / File event type masks */
#define AE_NONE 0 /* 未注册任何事件 / No event registered */
#define AE_READABLE 1 /* 可读事件 / Readable event */
#define AE_WRITABLE 2 /* 可写事件 / Writable event */
#define AE_BARRIER 4 /* 写屏障:确保写在读之后触发
* Barrier: force write-after-read ordering */
/* 时间事件 ID 哨兵,表示"已删除" / Sentinel id meaning "deleted" */
#define AE_DELETED_EVENT_ID (-1)
/* aeProcessEvents 标志位 / aeProcessEvents flags */
#define AE_FILE_EVENTS (1<<0) /* 处理文件事件 / Process file events */
#define AE_TIME_EVENTS (1<<1) /* 处理时间事件 / Process time events */
#define AE_ALL_EVENTS (AE_FILE_EVENTS | AE_TIME_EVENTS)
#define AE_DONT_WAIT (1<<2) /* 不阻塞等待 / Non-blocking poll */
#define AE_CALL_BEFORE_SLEEP (1<<3) /* 睡前回调 / Before-sleep callback */
#define AE_CALL_AFTER_SLEEP (1<<4) /* 醒后回调 / After-sleep callback */
/* =========================================================================
* 类型前置声明 / Forward declarations
* ========================================================================= */
typedef struct aeEventLoop aeEventLoop;
/* =========================================================================
* 回调函数原型 / Callback prototypes
* ========================================================================= */
/**
* 文件事件回调 / File event handler
* @param el 事件循环 / event loop
* @param fd 触发事件的文件描述符 / file descriptor that fired
* @param clientData 注册时绑定的用户数据 / user data bound at registration
* @param mask 触发的事件掩码 / event mask that fired
*/
typedef void aeFileProc(aeEventLoop *el, int fd, void *clientData, int mask);
/**
* 时间事件回调 / Time event handler
* 返回值:
* > 0 → 重新调度,距下次触发的毫秒数 (reschedule after N ms)
* = 0 → 立即再次触发 (fire again immediately)
* AE_NOMORE (-1) → 不再重复,删除该事件 (delete after this call)
*/
#define AE_NOMORE (-1)
typedef int aeTimeProc(aeEventLoop *el, long long id, void *clientData);
/**
* 时间事件删除回调 / Time event finalizer
* 当时间事件被删除时调用,用于释放 clientData
* Called when a time event is deleted; used to free clientData.
*/
typedef void aeEventFinalizerProc(aeEventLoop *el, void *clientData);
/**
* 睡眠前 / 睡眠后回调 / Before/after sleep callbacks
*/
typedef void aeBeforeSleepProc(aeEventLoop *el);
/* =========================================================================
* 文件事件结构 / File Event Structure
*
* Redis 中每个 fd 对应一个 aeFileEvent,由数组直接按 fd 索引,O(1) 查找。
* In Redis, one aeFileEvent per fd; stored in an array indexed by fd (O(1)).
* ========================================================================= */
typedef struct aeFileEvent {
int mask; /* 已注册的事件掩码 AE_READABLE|AE_WRITABLE
* Registered event mask */
aeFileProc *rfileProc; /* 可读回调 / Read handler */
aeFileProc *wfileProc; /* 可写回调 / Write handler */
void *clientData; /* 用户私有数据 / User private data */
} aeFileEvent;
/* =========================================================================
* 就绪事件结构(poll 返回结果) / Fired Event (poll result)
* ========================================================================= */
typedef struct aeFiredEvent {
int fd; /* 就绪的文件描述符 / Ready file descriptor */
int mask; /* 就绪的事件掩码 / Ready event mask */
} aeFiredEvent;
/* =========================================================================
* 时间事件结构 / Time Event Structure
*
* 采用无序链表(Redis 实际实现)。时间事件较少(通常只有 serverCron),
* 遍历开销可忽略,链表插入为 O(1)。
*
* Implemented as an unsorted linked list (matches Redis). Time events are few
* (typically only serverCron), so linear scan is negligible; O(1) insertion.
* ========================================================================= */
typedef struct aeTimeEvent {
long long id; /* 唯一 ID / Unique event ID */
long long when_sec; /* 触发时间:秒 / Fire time (sec) */
long long when_ms; /* 触发时间:毫秒 / Fire time (ms)*/
aeTimeProc *timeProc; /* 到期回调 / Expiry callback */
aeEventFinalizerProc *finalizerProc; /* 删除时回调 / Deletion callback */
void *clientData; /* 用户私有数据 / User data */
struct aeTimeEvent *prev; /* 双向链表前驱 / Prev in list */
struct aeTimeEvent *next; /* 双向链表后继 / Next in list */
int refcount; /* 引用计数,防止回调中删除自身
* Ref-count; prevents self-delete
* inside callback */
} aeTimeEvent;
/* =========================================================================
* 事件循环主结构 / Event Loop Main Structure
* ========================================================================= */
struct aeEventLoop {
int maxfd; /* 当前已注册的最大 fd
* Highest fd currently registered */
int setsize; /* fd 数组大小(最大 fd 数量)
* Size of fd array (max fd count) */
long long timeEventNextId;/* 下一个时间事件的唯一 ID
* Next time event unique ID */
aeFileEvent *events; /* 已注册的文件事件数组(按 fd 索引)
* Registered file events (indexed by fd)*/
aeFiredEvent *fired; /* poll 返回的就绪事件缓冲
* Buffer of events fired by poll */
aeTimeEvent *timeEventHead; /* 时间事件链表头
* Head of time event linked list */
int stop; /* 非 0 则退出主循环
* Non-zero → exit main loop */
void *apidata; /* I/O 多路复用后端私有数据(epoll/kqueue/select)
* Backend-private data (epoll/kqueue/…) */
aeBeforeSleepProc *beforesleep; /* 每次 poll 前调用
* Called before every poll */
aeBeforeSleepProc *aftersleep; /* 每次 poll 后调用
* Called after every poll */
int flags; /* AE_DONT_WAIT 等标志
* Flags (e.g. AE_DONT_WAIT) */
};
/* =========================================================================
* I/O 多路复用后端(模拟 select 实现)
* I/O Multiplexing Backend — simulated with select(2)
*
* 真实 Redis 优先选择 epoll(Linux)、kqueue(macOS/BSD)、/dev/poll(Solaris),
* 最后才回退到 select。此处为可移植演示,使用 select 模拟同样的接口。
*
* Real Redis prefers epoll (Linux) > kqueue (macOS/BSD) > /dev/poll > select.
* For portability in this demo we implement the same interface using select.
* ========================================================================= */
#include <sys/select.h>
typedef struct aeApiState {
fd_set rfds; /* 监听可读的 fd 集合 / fd set for readable events */
fd_set wfds; /* 监听可写的 fd 集合 / fd set for writable events */
/* 工作副本(select 会修改参数,每次需重新赋值)
* Working copies — select() modifies its arguments on return,
* so we restore from these each iteration. */
fd_set _rfds;
fd_set _wfds;
} aeApiState;
/* 初始化多路复用后端 / Initialize multiplexing backend */
static int aeApiCreate(aeEventLoop *el) {
aeApiState *state = (aeApiState *)malloc(sizeof(aeApiState));
if (!state) return -1;
FD_ZERO(&state->rfds);
FD_ZERO(&state->wfds);
el->apidata = state;
return 0;
}
/* 释放多路复用后端 / Free multiplexing backend */
static void aeApiFree(aeEventLoop *el) {
free(el->apidata);
}
/**
* 向后端添加/修改对某 fd 的监听
* Add or modify a fd watch in the backend.
*
* Redis 的 epoll 实现中,若 fd 已存在则用 EPOLL_CTL_MOD,否则 EPOLL_CTL_ADD。
* In Redis's epoll backend: EPOLL_CTL_MOD if already present, else EPOLL_CTL_ADD.
*/
static int aeApiAddEvent(aeEventLoop *el, int fd, int mask) {
aeApiState *state = (aeApiState *)el->apidata;
if (mask & AE_READABLE) FD_SET(fd, &state->rfds);
if (mask & AE_WRITABLE) FD_SET(fd, &state->wfds);
return 0;
}
/**
* 从后端移除对某 fd 的监听
* Remove a fd watch from the backend.
*/
static void aeApiDelEvent(aeEventLoop *el, int fd, int delmask) {
aeApiState *state = (aeApiState *)el->apidata;
if (delmask & AE_READABLE) FD_CLR(fd, &state->rfds);
if (delmask & AE_WRITABLE) FD_CLR(fd, &state->wfds);
}
/**
* 等待事件就绪(核心阻塞/非阻塞 poll)
* Wait for events to become ready — the core blocking/non-blocking poll.
*
* @param tvp NULL → 永久阻塞 / NULL means block forever
* tvp->tv_sec==0 && tvp->tv_usec==0 → 立即返回(非阻塞)
* @return 就绪事件数 / number of ready events
*/
static int aeApiPoll(aeEventLoop *el, struct timeval *tvp) {
aeApiState *state = (aeApiState *)el->apidata;
int retval, numevents = 0;
/* 每次 poll 前复制 fd_set,因为 select 会修改入参
* Copy fd sets before each call since select() modifies them in place. */
state->_rfds = state->rfds;
state->_wfds = state->wfds;
/* 调用 select,等待 fd 就绪或超时 / Call select: wait for ready fds or timeout */
retval = select(el->maxfd + 1, &state->_rfds, &state->_wfds, NULL, tvp);
if (retval > 0) {
for (int j = 0; j <= el->maxfd; j++) {
int mask = 0;
aeFileEvent *fe = &el->events[j];
if (fe->mask == AE_NONE) continue;
/* 将 select 结果映射回 ae 掩码
* Map select result back to ae mask. */
if (fe->mask & AE_READABLE && FD_ISSET(j, &state->_rfds))
mask |= AE_READABLE;
if (fe->mask & AE_WRITABLE && FD_ISSET(j, &state->_wfds))
mask |= AE_WRITABLE;
if (mask) {
el->fired[numevents].fd = j;
el->fired[numevents].mask = mask;
numevents++;
}
}
} else if (retval == -1 && errno != EINTR) {
/* EINTR 表示被信号中断,正常忽略 / EINTR means interrupted by signal — ignore */
perror("select");
}
return numevents;
}
/* =========================================================================
* 工具函数:获取当前时间 / Utility: get current time
* ========================================================================= */
static void aeGetTime(long long *seconds, long long *milliseconds) {
struct timeval tv;
gettimeofday(&tv, NULL);
*seconds = tv.tv_sec;
*milliseconds = tv.tv_usec / 1000;
}
/* 将当前时间加上 milliseconds 毫秒,写入 *sec / *ms
* Add `milliseconds` to now; write result into *sec / *ms. */
static void aeAddMillisecondsToNow(long long milliseconds,
long long *sec, long long *ms) {
long long cur_sec, cur_ms, when_sec, when_ms;
aeGetTime(&cur_sec, &cur_ms);
when_sec = cur_sec + milliseconds / 1000;
when_ms = cur_ms + milliseconds % 1000;
if (when_ms >= 1000) { /* 毫秒进位 / carry milliseconds into seconds */
when_sec++;
when_ms -= 1000;
}
*sec = when_sec;
*ms = when_ms;
}
/* =========================================================================
* 事件循环生命周期 / Event Loop Lifecycle
* ========================================================================= */
/**
* 创建事件循环 / Create an event loop.
*
* 分配 events[] 和 fired[] 数组,初始化多路复用后端。
* Allocate events[] and fired[] arrays, initialize the multiplexing backend.
*/
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *el;
el = (aeEventLoop *)malloc(sizeof(aeEventLoop));
if (!el) goto err;
el->events = (aeFileEvent *)malloc(sizeof(aeFileEvent) * setsize);
el->fired = (aeFiredEvent *)malloc(sizeof(aeFiredEvent) * setsize);
if (!el->events || !el->fired) goto err;
el->setsize = setsize;
el->maxfd = -1;
el->timeEventHead = NULL;
el->timeEventNextId = 0;
el->stop = 0;
el->beforesleep = NULL;
el->aftersleep = NULL;
el->flags = 0;
/* 所有文件事件初始化为 AE_NONE(未注册)
* Mark all file event slots as unregistered. */
for (int i = 0; i < setsize; i++)
el->events[i].mask = AE_NONE;
if (aeApiCreate(el) == -1) goto err;
return el;
err:
if (el) {
free(el->events);
free(el->fired);
free(el);
}
return NULL;
}
/**
* 停止事件循环 / Stop the event loop.
* 设置 stop 标志,aeMain 在当前迭代结束后退出。
* Sets stop flag; aeMain exits after current iteration.
*/
void aeStop(aeEventLoop *el) {
el->stop = 1;
}
/**
* 删除事件循环,释放所有资源
* Delete the event loop and release all resources.
*/
void aeDeleteEventLoop(aeEventLoop *el) {
aeApiFree(el);
/* 释放所有时间事件链表节点 / Free all time event nodes */
aeTimeEvent *te = el->timeEventHead;
while (te) {
aeTimeEvent *next = te->next;
free(te);
te = next;
}
free(el->events);
free(el->fired);
free(el);
}
/* =========================================================================
* 文件事件注册 / File Event Registration
* ========================================================================= */
/**
* 注册文件事件 / Register a file event.
*
* mask 可以是 AE_READABLE、AE_WRITABLE 或两者的 OR。
* mask can be AE_READABLE, AE_WRITABLE, or their OR.
*
* 同一个 fd 可以多次调用以追加事件类型(读/写分开注册)。
* The same fd can be registered multiple times to add event types.
*/
int aeCreateFileEvent(aeEventLoop *el, int fd, int mask,
aeFileProc *proc, void *clientData) {
if (fd >= el->setsize) {
errno = ERANGE;
return -1;
}
aeFileEvent *fe = &el->events[fd];
/* 通知后端监听新的事件类型 / Tell backend to watch the new event type */
if (aeApiAddEvent(el, fd, mask) == -1) return -1;
fe->mask |= mask; /* 合并掩码 / Merge mask bits */
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
/* 更新最大 fd,供 select 使用 / Update maxfd for select */
if (fd > el->maxfd) el->maxfd = fd;
return 0;
}
/**
* 注销文件事件 / Unregister a file event.
*/
void aeDeleteFileEvent(aeEventLoop *el, int fd, int mask) {
if (fd >= el->setsize) return;
aeFileEvent *fe = &el->events[fd];
if (fe->mask == AE_NONE) return; /* fd 未注册,忽略 / Not registered */
/* AE_BARRIER 不单独删除,随读/写一起清理
* AE_BARRIER is not deleted alone; cleared together with read/write. */
mask &= ~AE_BARRIER;
aeApiDelEvent(el, fd, mask);
fe->mask = fe->mask & (~mask); /* 清除对应位 / Clear the bit(s) */
/* 如果 maxfd 正好是被清空的 fd,向下扫描重新定位 maxfd
* If maxfd was the just-cleared fd, scan down to find the new maxfd. */
if (fd == el->maxfd && fe->mask == AE_NONE) {
for (int j = el->maxfd - 1; j >= 0; j--) {
if (el->events[j].mask != AE_NONE) {
el->maxfd = j;
break;
}
}
}
}
/* =========================================================================
* 时间事件注册 / Time Event Registration
* ========================================================================= */
/**
* 创建时间事件 / Create a time event.
*
* @param milliseconds 多少毫秒后首次触发 / first fire delay in ms
* @param proc 到期回调 / expiry callback
* @param clientData 用户数据 / user data
* @param finalizerProc 删除时调用(可为 NULL)/ called on deletion (may be NULL)
* @return 新事件的唯一 ID(失败返回 AE_DELETED_EVENT_ID)
* unique ID of the new event (AE_DELETED_EVENT_ID on failure)
*/
long long aeCreateTimeEvent(aeEventLoop *el, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc) {
long long id = el->timeEventNextId++;
aeTimeEvent *te = (aeTimeEvent *)malloc(sizeof(aeTimeEvent));
if (!te) return AE_DELETED_EVENT_ID;
te->id = id;
te->refcount = 0;
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
/* 计算首次触发的绝对时间戳
* Compute the absolute timestamp for the first fire. */
aeAddMillisecondsToNow(milliseconds, &te->when_sec, &te->when_ms);
/* 插入到链表头(O(1))
* Insert at list head — O(1). */
te->prev = NULL;
te->next = el->timeEventHead;
if (el->timeEventHead)
el->timeEventHead->prev = te;
el->timeEventHead = te;
return id;
}
/**
* 删除时间事件 / Delete a time event.
*
* 只将 id 标记为 AE_DELETED_EVENT_ID;实际释放在下一轮 processTimeEvents 中。
* Only marks id as AE_DELETED_EVENT_ID; actual free happens in next
* processTimeEvents pass (safe even if called from within the callback).
*/
int aeDeleteTimeEvent(aeEventLoop *el, long long id) {
aeTimeEvent *te = el->timeEventHead;
while (te) {
if (te->id == id) {
te->id = AE_DELETED_EVENT_ID; /* 惰性删除 / Lazy deletion */
return 0; /* 成功 / success */
}
te = te->next;
}
return -1; /* 未找到 / not found */
}
/* =========================================================================
* 核心调度逻辑 / Core Dispatch Logic
* ========================================================================= */
/**
* 处理所有到期的时间事件 / Process all expired time events.
*
* 遍历链表,调用已到期事件的回调。回调返回 AE_NOMORE 则删除该事件;
* 否则按返回值重新计算下次触发时间。
*
* Walk the list, firing callbacks for expired events.
* If callback returns AE_NOMORE the event is deleted;
* otherwise its next fire time is updated using the return value (ms).
*/
static int processTimeEvents(aeEventLoop *el) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
te = el->timeEventHead;
maxId = el->timeEventNextId - 1;
/* 记录本轮最大 id,跳过本轮新注册的事件(避免无限递归)
* Snapshot maxId to skip events created during this pass. */
long long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
while (te) {
long long id;
/* ---------- 惰性删除 / Lazy deletion ---------- */
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
/* 若还有其他地方引用(refcount),暂不释放
* If still referenced (refcount > 0), defer free. */
if (te->refcount == 0) {
if (te->prev) te->prev->next = te->next;
else el->timeEventHead = te->next;
if (te->next) te->next->prev = te->prev;
if (te->finalizerProc)
te->finalizerProc(el, te->clientData);
free(te);
}
te = next;
continue;
}
/* 跳过本轮新增的事件 / Skip events created this pass */
if (te->id > maxId) {
te = te->next;
continue;
}
/* ---------- 检查是否到期 / Check expiry ---------- */
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
te->refcount++; /* 防止回调中删除自身 / guard self-delete */
retval = te->timeProc(el, id, te->clientData);
te->refcount--;
processed++;
if (retval != AE_NOMORE) {
/* 重新调度:计算下次触发时间
* Reschedule: compute next fire time. */
aeAddMillisecondsToNow(retval, &te->when_sec, &te->when_ms);
} else {
/* 标记为删除,下轮清理 / Mark for deletion; cleaned next pass */
aeDeleteTimeEvent(el, id);
}
}
te = te->next;
}
return processed;
}
/**
* 搜索最近要触发的时间事件
* Search for the nearest time event (earliest fire time).
*
* 用于计算 poll 的超时值:若没有时间事件则永久阻塞;
* 若有则最多阻塞到最近一个时间事件到期。
*
* Used to compute the poll timeout: block forever if no time events exist;
* otherwise block at most until the nearest expiry.
*/
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *el) {
aeTimeEvent *te = el->timeEventHead;
aeTimeEvent *nearest = NULL;
while (te) {
if (!nearest ||
te->when_sec < nearest->when_sec ||
(te->when_sec == nearest->when_sec && te->when_ms < nearest->when_ms))
{
nearest = te;
}
te = te->next;
}
return nearest;
}
/**
* aeProcessEvents — 单轮事件处理
* aeProcessEvents — single iteration of event processing.
*
* 这是 Redis 事件循环的心脏。每次调用:
* 1. 计算 poll 超时(由最近时间事件决定)
* 2. 可选调用 beforesleep 回调
* 3. 调用后端 poll(select/epoll/kqueue)
* 4. 可选调用 aftersleep 回调
* 5. 依次分发所有就绪的文件事件回调
* 6. 处理所有到期的时间事件
*
* This is the heart of the Redis event loop. Each call:
* 1. Computes poll timeout (driven by nearest time event)
* 2. Optionally calls beforesleep
* 3. Calls backend poll (select/epoll/kqueue)
* 4. Optionally calls aftersleep
* 5. Dispatches all ready file event callbacks
* 6. Processes all expired time events
*
* @return 本轮处理的事件总数 / total events processed this iteration
*/
int aeProcessEvents(aeEventLoop *el, int flags) {
int processed = 0, numevents;
/* 若既不处理文件事件也不处理时间事件,直接返回
* If neither file nor time events are requested, return immediately. */
if (!(flags & AE_FILE_EVENTS) && !(flags & AE_TIME_EVENTS)) return 0;
/* ----------------------------------------------------------------
* 计算 poll 超时 / Compute poll timeout
* ---------------------------------------------------------------- */
struct timeval tv, *tvp = NULL;
if (flags & AE_DONT_WAIT) {
/* 非阻塞模式:超时立即设为 0 / Non-blocking: set timeout to 0 immediately */
tv.tv_sec = 0;
tv.tv_usec = 0;
tvp = &tv;
} else if (el->maxfd != -1 || (flags & AE_TIME_EVENTS)) {
aeTimeEvent *shortest = aeSearchNearestTimer(el);
if (shortest) {
/* 计算距最近时间事件还有多久 / Compute time until nearest event */
long long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tv.tv_sec = shortest->when_sec - now_sec;
tv.tv_usec = (shortest->when_ms - now_ms) * 1000;
/* 已过期:超时设为 0(立即返回)/ Already expired → timeout 0 */
if (tv.tv_sec < 0) tv.tv_sec = 0;
if (tv.tv_usec < 0) tv.tv_usec = 0;
tvp = &tv;
}
/* 否则 tvp 保持 NULL → select 永久阻塞(无任何事件)
* Otherwise tvp stays NULL → select blocks forever (no events at all). */
}
/* ----------------------------------------------------------------
* beforesleep 回调 / beforesleep callback
* ---------------------------------------------------------------- */
if (el->beforesleep != NULL && (flags & AE_CALL_BEFORE_SLEEP))
el->beforesleep(el);
/* ----------------------------------------------------------------
* 调用后端 poll / Call backend poll
* ---------------------------------------------------------------- */
numevents = aeApiPoll(el, tvp);
/* ----------------------------------------------------------------
* aftersleep 回调 / aftersleep callback
* ---------------------------------------------------------------- */
if (el->aftersleep != NULL && (flags & AE_CALL_AFTER_SLEEP))
el->aftersleep(el);
/* ----------------------------------------------------------------
* 分发文件事件 / Dispatch file events
*
* AE_BARRIER:若同一 fd 同时可读可写,先处理写事件再处理读事件。
* 用于在回复客户端之前确保数据已持久化(AOF fsync 场景)。
*
* AE_BARRIER: if an fd is both readable and writable in the same
* iteration, fire the write callback BEFORE the read callback.
* Used to guarantee persistence (AOF fsync) before replying.
* ---------------------------------------------------------------- */
for (int j = 0; j < numevents; j++) {
aeFileEvent *fe = &el->events[el->fired[j].fd];
int mask = el->fired[j].mask;
int fd = el->fired[j].fd;
int fired = 0; /* 本 fd 已触发的事件数 / events fired for this fd */
/* BARRIER:写优先于读 / With BARRIER: write fires before read */
int invert = fe->mask & AE_BARRIER;
if (!invert && (mask & AE_READABLE) && fe->rfileProc) {
fe->rfileProc(el, fd, fe->clientData, mask);
fired++;
}
if ((mask & AE_WRITABLE) && fe->wfileProc &&
/* 避免同一回调函数因读写均就绪而被调用两次
* Avoid calling the same proc twice if both events are ready. */
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->wfileProc(el, fd, fe->clientData, mask);
fired++;
}
/* BARRIER 模式下,读在写之后触发 / With BARRIER, read fires after write */
if (invert && (mask & AE_READABLE) && fe->rfileProc) {
if (!fired || fe->rfileProc != fe->wfileProc)
fe->rfileProc(el, fd, fe->clientData, mask);
}
processed++;
}
/* ----------------------------------------------------------------
* 处理时间事件 / Process time events
* ---------------------------------------------------------------- */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(el);
return processed;
}
/* =========================================================================
* 主循环 / Main Loop
* ========================================================================= */
/**
* aeMain — 启动事件循环,直到 aeStop() 被调用
* aeMain — start the event loop; runs until aeStop() is called.
*
* 这是 Redis server.c 中 main() 最后调用的函数。
* This is the last function called in Redis's server.c main().
*/
void aeMain(aeEventLoop *el) {
el->stop = 0;
while (!el->stop) {
/* 每轮同时处理文件事件与时间事件,允许睡眠前/后回调
* Process both file and time events each iteration,
* enabling before/after-sleep hooks. */
aeProcessEvents(el,
AE_ALL_EVENTS | AE_CALL_BEFORE_SLEEP | AE_CALL_AFTER_SLEEP);
}
}
/* =========================================================================
* 演示 / Demo
* ========================================================================= */
/* 模拟 Redis serverCron 周期性任务回调
* Simulates Redis's serverCron periodic task callback. */
static int serverCron(aeEventLoop *el, long long id, void *clientData) {
(void)el; (void)id; (void)clientData;
static int tick = 0;
printf("[serverCron] tick #%d — 周期性任务 / periodic task running\n", ++tick);
/* 返回 100 表示 100ms 后再次触发 / Return 100 → reschedule after 100 ms */
if (tick >= 5) return AE_NOMORE; /* 演示:5 次后停止 / stop after 5 ticks */
return 100;
}
/* beforesleep 回调示意 / beforesleep callback example */
static void beforeSleep(aeEventLoop *el) {
(void)el;
/* 真实 Redis 在此处理快速 IO、AOF flush、惰性释放等
* Real Redis handles fast-path IO, AOF flush, lazy-free, etc. here. */
}
int main(void) {
printf("========================================\n");
printf(" Redis AE 事件循环演示 / AE Event Loop Demo\n");
printf("========================================\n\n");
/* 创建事件循环 / Create event loop */
aeEventLoop *el = aeCreateEventLoop(AE_SETSIZE);
if (!el) {
fprintf(stderr, "aeCreateEventLoop 失败 / failed\n");
return 1;
}
/* 注册 beforesleep 钩子 / Register beforesleep hook */
el->beforesleep = beforeSleep;
/* 注册 serverCron:首次 100ms 后触发 / Register serverCron: first fire in 100 ms */
long long cronId = aeCreateTimeEvent(el, 100, serverCron, NULL, NULL);
if (cronId == AE_DELETED_EVENT_ID) {
fprintf(stderr, "aeCreateTimeEvent 失败 / failed\n");
aeDeleteEventLoop(el);
return 1;
}
printf("[main] serverCron 已注册,id=%lld / registered, id=%lld\n\n",
cronId, cronId);
/*
* 启动主循环。
* serverCron 触发 5 次后返回 AE_NOMORE,时间事件链表为空;
* 此时 poll 将永久阻塞(无 fd 监听、无时间事件)。
* 为演示方便,我们手动在 serverCron 返回 AE_NOMORE 前停止循环。
*
* Start main loop.
* After serverCron fires 5 times it returns AE_NOMORE; the time event
* list becomes empty and poll would block forever (no fds, no timers).
* For this demo we stop the loop manually when serverCron removes itself.
*
* 在真实 Redis 中,serverCron 每 1000/hz ms 重新调度自身,永不结束。
* In real Redis, serverCron reschedules itself every 1000/hz ms forever.
*/
/* 手动执行几轮,直到 serverCron 完成为止 / Manually pump until serverCron is done */
for (int i = 0; i < 20; i++) {
int n = aeProcessEvents(el, AE_ALL_EVENTS | AE_DONT_WAIT);
if (n == 0 && el->timeEventHead == NULL) {
printf("\n[main] 无更多事件,退出循环 / No more events, exiting loop\n");
break;
}
/* 小延迟模拟真实时间流逝 / Small delay to simulate real elapsed time */
struct timespec ts = {0, 110 * 1000 * 1000}; /* 110 ms */
nanosleep(&ts, NULL);
}
aeDeleteEventLoop(el);
printf("[main] 事件循环已销毁 / Event loop destroyed\n");
return 0;
}
4. 核心数据结构设计
4.1 文件事件 aeFileEvent
typedef struct aeFileEvent {
int mask; // 已注册的事件类型:AE_READABLE | AE_WRITABLE | AE_BARRIER
aeFileProc *rfileProc; // 可读回调函数
aeFileProc *wfileProc; // 可写回调函数
void *clientData; // 用户私有数据(通常是 client 指针)
} aeFileEvent;
关键设计:Redis 用一个 aeFileEvent 数组,下标就是 fd 编号。这样通过 fd 查找事件是 O(1),不需要任何哈希表。
events[0] → fd=0 的事件(通常是 stdin)
events[1] → fd=1 的事件(通常是 stdout)
events[3] → fd=3 的事件(通常是 server socket)
events[5] → fd=5 的事件(某个客户端连接)
...
代价是内存预分配:aeCreateEventLoop(setsize) 时一次性分配 setsize 个槽位。Redis 默认是 10240,在内存充裕的服务器上完全可以接受。
4.2 时间事件 aeTimeEvent
typedef struct aeTimeEvent {
long long id; // 唯一 ID,由 timeEventNextId 自增
long long when_sec; // 触发时间(秒)
long long when_ms; // 触发时间(毫秒,0-999)
aeTimeProc *timeProc; // 到期回调
aeEventFinalizerProc *finalizerProc; // 删除时的清理回调(可为 NULL)
void *clientData; // 用户私有数据
struct aeTimeEvent *prev; // 双向链表前驱
struct aeTimeEvent *next; // 双向链表后继
int refcount; // 引用计数(防止回调中删除自身导致崩溃)
} aeTimeEvent;
时间事件回调的约定:
| 返回值 | 含义 |
|---|---|
> 0 |
重新调度,N 毫秒后再次触发 |
0 |
立刻再次触发(下一轮就触发) |
-1(AE_NOMORE) |
只触发一次,触发后自动删除 |
4.3 就绪事件缓冲 aeFiredEvent
typedef struct aeFiredEvent {
int fd; // 就绪的 fd
int mask; // 就绪的事件类型(AE_READABLE / AE_WRITABLE)
} aeFiredEvent;
这是 aeApiPoll 写入结果的缓冲区,大小同样是 setsize。
4.4 事件循环主体 aeEventLoop
struct aeEventLoop {
int maxfd; // 当前已注册的最大 fd(select 需要)
int setsize; // events[] 和 fired[] 的容量
long long timeEventNextId; // 下一个时间事件的 ID(单调递增)
aeFileEvent *events; // 文件事件数组(按 fd 索引)
aeFiredEvent *fired; // poll 就绪事件缓冲
aeTimeEvent *timeEventHead; // 时间事件链表头
int stop; // 非 0 时退出主循环
void *apidata; // I/O 后端私有数据(epoll_fd / kqueue_fd 等)
aeBeforeSleepProc *beforesleep; // poll 前钩子
aeBeforeSleepProc *aftersleep; // poll 后钩子
int flags; // 运行时标志
};
5. I/O 多路复用后端抽象
5.1 为什么要做后端抽象?
不同操作系统提供的 I/O 多路复用 API 不同:
- Linux:
epoll(最高效,O(1) 事件通知,支持边缘触发) - macOS / BSD:
kqueue(功能更丰富,同样高效) - Solaris:
/dev/poll或event ports - 通用:
select(最古老,fd 数量上限 1024,每次 O(n) 扫描)
Redis 希望同一套业务代码在所有平台上运行,因此把这些差异封装成统一接口,通过 编译时 #include 选择后端:
// ae.c 末尾
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
5.2 统一接口:四个函数
每个后端只需实现以下四个静态函数:
static int aeApiCreate(aeEventLoop *el); // 初始化后端(创建 epoll_fd 等)
static void aeApiFree(aeEventLoop *el); // 释放后端资源
static int aeApiAddEvent(aeEventLoop *el, int fd, int mask); // 注册/修改 fd 监听
static void aeApiDelEvent(aeEventLoop *el, int fd, int delmask); // 移除 fd 监听
static int aeApiPoll(aeEventLoop *el, struct timeval *tvp); // 等待就绪事件
所有后端实现都放在 el->apidata 里,上层代码完全感知不到差异。
5.3 select 实现详解
typedef struct aeApiState {
fd_set rfds; // 关注可读的 fd 集合(永久保存)
fd_set wfds; // 关注可写的 fd 集合(永久保存)
fd_set _rfds; // 每次 poll 前复制的工作副本
fd_set _wfds; // 每次 poll 前复制的工作副本
} aeApiState;
⚠️ 关键细节:
select()会原地修改fd_set参数,调用返回后fd_set中只保留就绪的 fd。如果不使用副本,下一轮 poll 就会丢失之前注册的所有监听!
static int aeApiPoll(aeEventLoop *el, struct timeval *tvp) {
aeApiState *state = el->apidata;
// 每次都要先复制,因为 select 会修改入参
state->_rfds = state->rfds;
state->_wfds = state->wfds;
int retval = select(el->maxfd + 1,
&state->_rfds, &state->_wfds, NULL, tvp);
// ...
}
5.4 epoll vs kqueue vs select 对比
| 特性 | select | epoll | kqueue |
|---|---|---|---|
| 平台 | 全平台 | Linux | macOS/BSD |
| fd 上限 | 1024(FD_SETSIZE) |
无(受系统限制) | 无 |
| 时间复杂度 | O(n)(每次扫描全部 fd) | O(1)(内核直接通知) | O(1) |
| 触发模式 | 水平触发 | 水平/边缘触发 | 水平/边缘触发 |
| 内核拷贝 | 每次都需要从用户空间拷贝 fd_set | 只在注册时拷贝一次 | 只在注册时拷贝一次 |
| Redis 优先级 | 最低(回退方案) | 最高(Linux) | 最高(macOS/BSD) |
6. 文件事件注册与注销
6.1 Redis文件事件
以下内容来自 :《Redis 设计与实现》https://redisbook.readthedocs.io/en/latest/internal/ae.html
Redis 服务器通过在多个客户端之间进行多路复用, 从而实现高效的命令请求处理: 多个客户端通过套接字连接到 Redis 服务器中, 但只有在套接字可以无阻塞地进行读或者写时, 服务器才会和这些客户端进行交互。
Redis 将这类因为对套接字进行多路复用而产生的事件称为文件事件(file event), 文件事件可以分为读事件和写事件两类。
(1) 读事件
读事件标志着客户端命令请求的发送状态。
当一个新的客户端连接到服务器时, 服务器会给为该客户端绑定读事件, 直到客户端断开连接之后, 这个读事件才会被移除。
读事件在整个网络连接的生命期内, 都会在等待和就绪两种状态之间切换:
- 当客户端只是连接到服务器,但并没有向服务器发送命令时,该客户端的读事件就处于等待状态。
- 当客户端给服务器发送命令请求,并且请求已到达时(相应的套接字可以无阻塞地执行读操作),该客户端的读事件处于就绪状态。 作为例子, 下图展示了三个已连接到服务器、但并没有发送命令的客户端:
这三个客户端的状态如下表: | 客户端 | 读事件状态 | 命令发送状态 | |——|——–|——–| | 客户端 X | 等待 | 未发送 | | 客户端 Y | 等待 | 未发送 | | 客户端 Z | 等待 | 未发送 | 之后, 当客户端 X 向服务器发送命令请求, 并且命令请求已到达时, 客户端 X 的读事件状态变为就绪:
这时, 三个客户端的状态如下表(只有客户端 X 的状态被更新了): |——|——–|——–| | 客户端 | 读事件状态 | 命令发送状态 | | 客户端 X | 就绪 | 已发送,并且已到达 | | 客户端 Y | 等待 | 未发送 | | 客户端 Z | 等待 | 未发送 | 当事件处理器被执行时, 就绪的文件事件会被识别到, 相应的命令请求会被发送到命令执行器, 并对命令进行求值。
(2) 写事件
写事件标志着客户端对命令结果的接收状态。
和客户端自始至终都关联着读事件不同, 服务器只会在有命令结果要传回给客户端时, 才会为客户端关联写事件, 并且在命令结果传送完毕之后, 客户端和写事件的关联就会被移除。
一个写事件会在两种状态之间切换:
- 当服务器有命令结果需要返回给客户端,但客户端还未能执行无阻塞写,那么写事件处于等待状态。
- 当服务器有命令结果需要返回给客户端,并且客户端可以进行无阻塞写,那么写事件处于就绪状态。 当客户端向服务器发送命令请求, 并且请求被接受并执行之后, 服务器就需要将保存在缓存内的命令执行结果返回给客户端, 这时服务器就会为客户端关联写事件。
作为例子, 下图展示了三个连接到服务器的客户端, 其中服务器正等待客户端 X 变得可写, 从而将命令的执行结果返回给它:
此时三个客户端的事件状态分别如下表:
客户端 读事件状态 写事件状态 客户端 X 等待 等待 客户端 Y 等待 无 客户端 Z 等待 无 当客户端 X 的套接字可以进行无阻塞写操作时, 写事件就绪, 服务器将保存在缓存内的命令执行结果返回给客户端:
此时三个客户端的事件状态分别如下表(只有客户端 X 的状态被更新了):
客户端 读事件状态 写事件状态 客户端 X 等待 已就绪 客户端 Y 等待 无 客户端 Z 等待 无 当命令执行结果被传送回客户端之后, 客户端和写事件之间的关联会被解除(只剩下读事件), 至此, 返回命令执行结果的动作执行完毕:
读事件只有在客户端断开和服务器的连接时,才会被移除。 这也就是说,当客户端关联写事件的时候,实际上它在同时关联读/写两种事件。 因为在同一次文件事件处理器的调用中, 单个客户端只能执行其中一种事件(要么读,要么写,但不能又读又写),当出现读事件和写事件同时就绪的情况时,事件处理器优先处理读事件。 这也就是说,当服务器有命令结果要返回客户端,而客户端又有新命令请求进入时,服务器先处理新命令请求。
注册:aeCreateFileEvent
int aeCreateFileEvent(aeEventLoop *el, int fd, int mask,
aeFileProc *proc, void *clientData) {
if (fd >= el->setsize) return -1; // fd 超出容量
aeFileEvent *fe = &el->events[fd];
// 1. 通知后端(epoll_ctl ADD/MOD)
if (aeApiAddEvent(el, fd, mask) == -1) return -1;
// 2. 合并掩码位(同一 fd 可分别注册读/写)
fe->mask |= mask;
// 3. 绑定回调
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
// 4. 更新 maxfd(select 需要知道最大 fd)
if (fd > el->maxfd) el->maxfd = fd;
return 0;
}
注意:读和写可以绑定不同的回调函数,也可以绑定同一个(Redis 中客户端读写通常是不同的函数)。
注销:aeDeleteFileEvent
void aeDeleteFileEvent(aeEventLoop *el, int fd, int mask) {
aeFileEvent *fe = &el->events[fd];
aeApiDelEvent(el, fd, mask); // 通知后端
fe->mask &= ~mask; // 清除对应掩码位
// 如果 maxfd 恰好是被清空的 fd,需要向下扫描重新定位
if (fd == el->maxfd && fe->mask == AE_NONE) {
for (int j = el->maxfd - 1; j >= 0; j--) {
if (el->events[j].mask != AE_NONE) {
el->maxfd = j;
break;
}
}
}
}
6.1 读事件状态切换(等待 <-> 就绪)
结合 Redis 设计与实现中的图示,可以把客户端读事件理解成下面两种状态循环:
- 等待:客户端已连接,但还没有命令到达;
- 就绪:客户端命令已经到达,套接字可无阻塞读取。
客户端连接建立
↓
绑定 AE_READABLE
↓
[等待] --(命令到达)--> [就绪] --(读取并处理)--> [等待]
↑ |
└--------------(连接关闭)------------------┘ 解绑读事件
这也是为什么 Redis 客户端在整个生命周期中几乎一直带着读事件,直到断开连接才移除。
6.2 写事件状态切换(按需注册、发送后移除)
写事件和读事件最大的区别是:写事件不是常驻的。
- 有回复要发,但暂时不可写:写事件处于等待;
- 套接字可写:写事件就绪,发送输出缓冲区;
- 缓冲区清空:立即移除写事件。
命令执行完成,产生回复
↓
绑定 AE_WRITABLE
↓
[等待可写] --(fd 可写)--> [就绪并发送] --(发送完毕)--> 解绑 AE_WRITABLE
这套机制避免了长期监听 “总是可写” 的无意义事件,减少事件循环负担。
7. 时间事件注册与调度
注册:aeCreateTimeEvent
long long aeCreateTimeEvent(aeEventLoop *el, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc) {
long long id = el->timeEventNextId++; // 分配唯一 ID
aeTimeEvent *te = malloc(sizeof(aeTimeEvent));
te->id = id;
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->refcount = 0;
// 计算首次触发的绝对时间戳
aeAddMillisecondsToNow(milliseconds, &te->when_sec, &te->when_ms);
// 插入链表头(O(1))
te->prev = NULL;
te->next = el->timeEventHead;
if (el->timeEventHead)
el->timeEventHead->prev = te;
el->timeEventHead = te;
return id;
}
7.1 为什么用链表而不是堆?
从算法角度看,最小堆(优先队列)查找最近到期事件是 O(log n),而无序链表是 O(n)。堆似乎更优?
Redis 的选择是无序链表,原因很实际:
- Redis 的时间事件极少。在正式版本中,整个 Redis 进程通常只有 1 个时间事件(
serverCron)。O(n) 遍历的开销几乎为零。 - 链表插入是 O(1),而堆插入是 O(log n)。对于极少量元素,链表完全够用。
- 代码更简单,维护成本低。
如果未来 Redis 时间事件数量大幅增加,可以轻松换成最小堆,上层接口无需改变。
7.2 惰性删除与引用计数
调用 aeDeleteTimeEvent 时,Redis 不立即释放内存,只是将 id 标记为 AE_DELETED_EVENT_ID(-1)。真正的释放发生在下一次 processTimeEvents 遍历时:
// 在 processTimeEvents 中
if (te->id == AE_DELETED_EVENT_ID) {
if (te->refcount == 0) { // 确认没有在执行中
// 从链表中摘除并 free
}
continue;
}
为什么需要 refcount?
考虑这个场景:时间事件的回调函数在执行过程中调用了 aeDeleteTimeEvent(el, id)(删除自身)。如果立即 free,processTimeEvents 在回调返回后继续访问 te->next 就是野指针,程序崩溃。
refcount 解决了这个问题:
te->refcount++; // 进入回调前加一
retval = te->timeProc(...);
te->refcount--; // 回调返回后减一
// 此时如果 te->id 已被标记删除,下次扫描到时 refcount 为 0,才真正 free
8. 核心调度:aeProcessEvents
这是整个 AE 的心脏,每次调用完成一轮完整的事件处理。
int aeProcessEvents(aeEventLoop *el, int flags) {
// ① 计算 poll 超时
// ② beforesleep 钩子
// ③ aeApiPoll(阻塞等待)
// ④ aftersleep 钩子
// ⑤ 分发文件事件
// ⑥ 处理时间事件
}
8.1 超时时间的计算
poll 的超时时间决定了 Redis 最多等待多久才去检查时间事件:
超时 = min(所有时间事件中最近的到期时间 - 当前时间, ∞)
具体逻辑:
if (flags & AE_DONT_WAIT) {
// 非阻塞模式:立即返回
tv = {0, 0};
tvp = &tv;
} else {
aeTimeEvent *shortest = aeSearchNearestTimer(el);
if (shortest) {
// 计算距到期还有多久
tv.tv_sec = shortest->when_sec - now_sec;
tv.tv_usec = (shortest->when_ms - now_ms) * 1000;
if (tv.tv_sec < 0 || tv.tv_usec < 0) tv = {0, 0}; // 已过期
tvp = &tv;
}
// 否则 tvp = NULL → 永久阻塞(没有任何时间事件)
}
这个设计保证了:时间事件的精度取决于 poll 阻塞的时长,最大误差约等于 poll 超时时间。Redis serverCron 默认每 1000/hz ms 执行一次,hz 默认为 10,即每 100ms 一次,精度完全够用。
8.2 AE_BARRIER:写优先于读
通常情况下,AE 按照”先处理读、再处理写”的顺序分发事件。但 Redis 引入了 AE_BARRIER 标志,强制写在读之前触发:
正常模式: READ → WRITE
BARRIER 模式:WRITE → READ
使用场景:AOF(Append Only File)持久化。
Redis 在回复客户端之前,必须确保数据已经写入磁盘(fsync)。如果先处理读(接受新命令),再处理写(发送回复),可能在回复还没发出去时就崩溃,客户端误以为命令没有执行。
设置了 AE_BARRIER 的 fd,AE 会先调用 wfileProc(发送回复 / AOF fsync),再调用 rfileProc(读取新命令)。
int invert = fe->mask & AE_BARRIER;
if (!invert && (mask & AE_READABLE)) {
fe->rfileProc(el, fd, fe->clientData, mask); // 先读
}
if (mask & AE_WRITABLE) {
fe->wfileProc(el, fd, fe->clientData, mask); // 后写
}
if (invert && (mask & AE_READABLE)) {
fe->rfileProc(el, fd, fe->clientData, mask); // BARRIER:读在写之后
}
8.3 beforesleep / aftersleep 钩子
Redis 在 server.c 中注册了这两个钩子:
aeSetBeforeSleepProc(server.el, beforeSleep);
aeSetAfterSleepProc(server.el, afterSleep);
beforeSleep(poll 前) 做的事情:
- 处理 pending 的命令(pipeline 场景)
- AOF 缓冲区刷盘
- 惰性释放(
lazyfree) - 向 replica 发送积压的复制数据
afterSleep(poll 后) 做的事情:
- 处理 module 事件
- 更新服务器时钟缓存(
server.unixtime)
8.4 同时可读可写时如何选择?
在 Redis 的常规语义里,同一个 fd 既可读又可写时,默认优先处理读事件。可以理解为:
- 先尽快把新命令读进来,保持请求吞吐;
- 再处理发送回复。
这与 Redis 设计与实现中的描述一致:同一次分发里不会对同一连接无限制地交错抢占,而是按既定顺序执行。你在 8.2 看到的 AE_BARRIER 则是特殊场景下的顺序改写机制。
8.5 调度语义:合作执行,不保证精确定时
文件事件和时间事件是合作关系,不是抢占关系,核心语义有三条:
- 一次循环里,先处理文件事件,再处理时间事件;
- poll 最大阻塞时间由最近到期时间事件决定;
- 时间事件可能晚于计划时间执行(因为前面可能在处理文件事件)。
对循环时间事件(例如 serverCron)来说,Redis 保证的是”至少执行”,而不是”严格每隔 t 毫秒精确执行”。更准确地说:
- 如果两次处理之间已经过去
>= t,那么该事件至少会被处理一次; - 但不承诺每个精确时间片都准点触发。
这也是单线程、非抢占事件循环在高吞吐场景下的正常行为。
9. 主循环 aeMain
void aeMain(aeEventLoop *el) {
el->stop = 0;
while (!el->stop) {
aeProcessEvents(el,
AE_ALL_EVENTS | AE_CALL_BEFORE_SLEEP | AE_CALL_AFTER_SLEEP);
}
}
就这么简单。Redis 的 server.c 中 main() 函数的最后两行是:
aeSetBeforeSleepProc(server.el, beforeSleep);
aeMain(server.el); // 进入事件循环,永不返回(直到 SHUTDOWN)
aeStop(el) 被调用后(收到 SHUTDOWN 命令时),el->stop 置为 1,下一轮迭代结束后 aeMain 返回,Redis 进程优雅退出。
10. 与 Libevent / libuv 的对比
| 维度 | Redis AE | Libevent | libuv |
|---|---|---|---|
| 代码量 | ~800 行 | ~30,000 行 | ~30,000 行 |
| 依赖 | 无(纯 C 标准库) | 无 | 无 |
| 事件类型 | 文件事件 + 时间事件 | 文件/信号/定时/自定义 | I/O/定时/信号/进程/线程池 |
| 线程安全 | 否(单线程设计) | 部分 | 是 |
| 平台后端 | epoll/kqueue/select | epoll/kqueue/IOCP/select | 同 Libevent + IOCP |
| 适用场景 | Redis 内嵌,专用 | 通用网络库 | Node.js 底层,跨平台 |
| 性能 | 极高(零额外开销) | 高 | 高 |
AE 的核心优势是极简:没有线程安全锁、没有复杂的事件优先级,为单线程高性能网络服务量身打造。
11. 总结
通过手写 Redis AE 事件循环器,我们深入理解了以下几个核心设计:
-
fd 数组直接索引:O(1) 查找文件事件,用内存换时间。
-
I/O 多路复用后端抽象:通过编译时
#include选择最优后端,上层代码零感知。四个统一接口函数是整个抽象的精髓。 -
时间事件用无序链表:在事件数量极少时,简单的链表比复杂的堆更实用,符合 KISS 原则。
-
惰性删除 + 引用计数:解决了”在回调中删除自身”这个经典的链表并发安全问题(单线程版本的安全问题)。
-
AE_BARRIER 写屏障:确保 AOF 数据持久化之后再回复客户端,是 Redis 数据安全的重要保障。
-
超时时间的精确计算:poll 超时与时间事件的协同,是 AE 同时驱动文件事件和时间事件的关键机制。
AE 的设计哲学体现了 Unix 的传统美学:做一件事,做到极致。它不是一个通用的事件驱动库,而是专门为 Redis 这种单线程、高并发、网络密集型服务设计的,在极简的代码中蕴含了深刻的工程智慧。