Redis 单线程网络模型及 I/O 多路复用
Redis 是时下流行的一个高性能键值内存数据库。与 memcached 的多线程网络模型相比,它采用了基于事件循环的单线程网络模型,并且提供了比 memcached 更丰富的数据类型和相关功能。纵然如此,它拥有和 memcached 近乎相同的执行效率^1。在 benchmark 中,它能做到单机每秒数万甚至超过十万级别的并发量^2。
在这篇文章中,我们将探究支撑 redis 实现这种效率的网络模型的构成和原理。
Reactor 模式
一般而言,服务器可以通过并行或 I/O 多路复用两种方式来实现它的并发特性。
并行架构的典型是多线程服务器。主线程在一个循环里面监听套接字,当一个新的请求到来时,就创建一个新线程专门处理这个请求。这种架构最符合直觉,但存在以下问题: 1. 不论是操作系统调度线程,或者是创建线程本身,都需要消耗一定资源。当并发数量增大时,这种消耗可能会变得不可忍受。 2. 多线程之间可能会并发访问数据,产生竞态条件。需要花费额外的精力去处理锁,使得编程容易出错并且趋于复杂。
I/O 多路复用则指利用操作系统提供的系统调用,如 epoll 或 select,使得多个套接字能够在一个线程中被并发地、交替的处理。也就是说,多个套接字复用同一个线程。具体来说,epoll、select 等系统调用能够监听多个套接字,当这些套接字发生某些值得关注的事件(如可读、可写等),就通知其调用者。调用者得到通知后,配合非阻塞 I/O^3,就能立即对套接字进行读写而不用陷入阻塞等待 I/O 可用。
一般来说,大多数 web 服务都将时间消耗在等待 I/O 上,属于 I/O 密集型而非计算密集型程序。因此,利用 I/O 多路复用能够得到良好的并发性。
Reactor 模式是一种事件处理模式,用于处理一个或多个并发传递给服务器的请求^4。它利用 I/O 多路复用将到来的请求分发给相应的处理程序。 redis 使用 reactor 设计模式来是实现其事件循环框架。
具体来说,reactor 设计模式定义了以下四种结构:
- Resources:向系统提供输入或消耗输出的任何资源。
- Synchronous Event Demultiplexer:同步事件分离器。监听资源信息,当有资源可用时,通知分派器。
- Dispatcher:负责注册或注销事件处理程序,以及将事件分离器传递过来的资源交付给相应的处理程序。
- Request Handler:最终处理资源的程序。
在 redis 中,Resources 就是客户端的套接字,Synchronous Event Demultiplexer 由 epoll、select 等 I/O 多路复用相关的系统调用实现,Dispatcher 是文件事件分派器,Request Handler 是注册的命令。如图:
以上各个部件在接下来详细解释。
I/O 多路复用
redis 使用 I/O 多路复用实现了 reactor 设计模式中的 Synchronous Event Demultiplexer。相关的代码分布在 ae_epoll.c、ae_select.c 中,指分别使用 epoll、select 等系统调用实现的多路复用。这些文件虽然使用了不同的系统调用和编码方式,但为上层提供了一套统一的 api。在不同的系统中,将会根据可用性和性能包含其中的一个文件:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
此处以 ae_epoll.c 为例,分析它的实现。epoll 的用法见 https://en.wikipedia.org/wiki/Epoll。
首先关注创建 epoll 对象的函数:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
// 为事件分配内存
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
// 创建一个 epoll 对象,保存它的文件描述符
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}
调用 aeApiCreate 之后,就在系统中注册了一个 epoll 对象,并且将相应的文件描述符保存在 aeApiState.epfd
中。之后就能通过它注册相应事件:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18// fd:套接字的描述符,mask:注册的事件类型
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0};
// 如果事件已经被创建,那么需要 EPOLL_CTL_MOD 修改操作,否则需要 EPOLL_CTL_ADD 添加操作
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
// 使用 epoll_ctl 为响应的描述符注册事件
// state->epfd 由 aeApiCreate 保存
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
aeApiAddEvent 内部使用了 epoll_ctl 为相应描述符注册事件。使用相应的套接字描述符 fd,以及想要关联的事件类型 mask 调用 aeApiAddEvent 之后,当套接字的状态变化为 mask 指定的类型时,就能通过 epoll_wait 得到通知。
之后可以使用 aeApiPoll 等待事件:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
// epoll_wait 返回激活的事件数量,并且将相应信息保存在 state->events 中,它会等待由 tvp 指定的时间,如果过期还没有事件,就立即返回
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
// 遍历 state->events,查找激活的事件
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
// 将激活的套接字描述符及事件类型,保存在 eventLoop->fired 中
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
除此之外,还有aeApiDelEvent
,aeApiResize
,aeApiFree
,aeApiName
等函数,此处不详细叙述。
这些函数虽然在不同系统中有不同实现,但接口都是相同的。调用它们,就能使用操作系统提供的 I/O 多路复用功能得知相应套接字状态的变化信息。redis 基于这些函数实现事件循环。
事件
redis 自己实现了一个事件循环框架。它的主要运行过程,实际上就是一直在框架中循环等待事件,然后交替执行的过程。具体来说,redis 中存在文件事件和时间事件两种类型的事件。
时间事件类型的定义如下:1
2
3
4
5
6
7
8
9
10
11/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
} aeTimeEvent;
可以看到,里面包含了事件应该发生的时间 when_sec、when_ms,以及事件处理函数 timeProc。并且可以组织为一个双向链表,以在每一次事件循环中尝试查找到来的事件。
文件事件类型的定义如下:1
2
3
4
5
6
7/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
mask 指示了事件的类型,rfileProc 和 wfileProc 是相应读写事件发生时应该调用的处理函数。
redis 中,时间事件是指应该周期检查、执行的任务,如 rdb 持久化、清理过期键值对等。目前 redis 中只有一个时间事件,即 serverCron,所有的周期任务都放在里面完成。
redis 的网络事件处理器由文件事件构成。它使用 I/O 多路复用监听多个套接字,当套接字的状态发生相应变化时,如创建、可读、可写、关闭,就激活相应的文件事件。
事件循环
事件循环相关的信息保存在一个结构体里:1
2
3
4
5
6
7
8
9
10
11
12
13
14/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
events 保存了相关的文件事件,fired 保存了激活的文件事件。
在进行事件循环前,要先创建这个结构体:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
...
if (aeApiCreate(eventLoop) == -1) goto err;
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
}
aeCreateEventLoop 为事件分配了内存,调用了上面提到的 aeApiCreate 初始化 I/O 多路复用结构,然后将事件的状态 mask 设为 AE_NONE,表明还没有设置这个事件。
创建了结构体之后,就可以注册事件:1
2
3
4
5
6
7
8
9
10
11
12int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
return AE_OK;
}
aeCreateFileEvent 将指定的套接字描述符、事件类型和处理程序注册到事件循环中。之后开始事件循环后,如果相应套接字的事件到来,就会调用给定的处理程序。
之后就可以调用 aeMain 开始事件循环。实际上,在 redis 的整个生命周期中,实际上就是一直在进行如下的事件循环:1
2
3
4
5
6
7
8void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
只要 eventLoop->stop 没有被置为 1,就一直进行循环。如果有 beforesleep,在处理本轮循环前先调用它。在 aeProcessEvents 中正式处理循环:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// 搜索时间事件链表,找到最近到来的时间事件
shortest = aeSearchNearestTimer(eventLoop);
...
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
// I/O 多路复用,等待至 tvp,也就是最近的时间事件激活时间
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
// 处理到来的文件事件
for (j = 0; j < numevents; j++) {
// 找到相应的文件事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
// 如果是可读事件,就调用 rfileProc
if (fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
// 处理时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
aeProcessEvents 调用 aeApiPoll 等待发生文件事件,直至最近的时间事件到来。如果有文件事件,就先处理文件事件,再处理事件时间。否则就一直沉睡至时间事件到来。
当 aeApiPoll 返回后,激活的文件事件信息就保存在 eventLoop->fired 中。之后根据 eventLoop->fired 调用 rfileProc 或 wfileProc 等处理程序。
处理流程
启动服务器程序时,redis 会初始化服务器:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20void initServer(void) {
...
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
...
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
...
}
在其中调用了 aeCreateEventLoop 创建事件循环,然后根据指定的套接字注册了文件事件,类型设置为 AE_READABLE,意为当这些套接字变得可读就激活事件。
在 server.c main 函数中开启了事件循环:1
2
3
4
5
6
7int main(int argc, char **argv) {
...
aeMain(server.el);
...
}
可见,redis 一直在它生命周期的事件循环中监听指定套接字上的可读事件。一旦请求到来,就会调用 acceptTcpHandler:1
2
3
4
5
6
7
8
9
10
11
12
13void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
while(max--) {
// cfd: client fd
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
...
acceptCommonHandler(cfd,0,cip);
}
}
使用 anetTcpAccept 创建链接得到客户端的套接字 cfd 之后,调用 acceptCommonHandler 处理这个请求:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c;
c = createClient(fd);
...
}
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
if (fd != -1) {
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
...
}
...
}
这里又注册了一个文件事件 readQueryFromClient,当套接字中有可读的命令时,就会调用它。
最终,redis 会将读取的命令保存在 c->cmd
结构体中,并调用 lookupCommand 找到对应的处理程序,如果条件合适,就调用它:1
2
3
4
5
6
7void call(client *c, int flags) {
...
c->cmd->proc(c);
...
}
c->cmd->proc
会将输出保存在 c 中的缓冲区中。
何时发送保存在缓冲区中的结果呢?
在每一轮事件循环开始前,也就是beforesleep
中,都会尝试发送数据,并且注册发送数据的文件事件:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36void beforeSleep(struct aeEventLoop *eventLoop) {
...
/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites();
...
}
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
/* Try to write buffers to the client socket. */
if (writeToClient(c->fd,c,0) == C_ERR) continue;
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
...
if (aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
}
return processed;
}
其中,server.clients_pending_write
为一个链表,包含还未进行写入操作的客户端。