Redis 单线程网络模型及 I/O 多路复用

Redis 是时下流行的一个高性能键值内存数据库。与 memcached 的多线程网络模型相比,它采用了基于事件循环的单线程网络模型,并且提供了比 memcached 更丰富的数据类型和相关功能。纵然如此,它拥有和 memcached 近乎相同的执行效率^1。在 benchmark 中,它能做到单机每秒数万甚至超过十万级别的并发量^2

在这篇文章中,我们将探究支撑 redis 实现这种效率的网络模型的构成和原理。

Reactor 模式

一般而言,服务器可以通过并行或 I/O 多路复用两种方式来实现它的并发特性。

并行架构的典型是多线程服务器。主线程在一个循环里面监听套接字,当一个新的请求到来时,就创建一个新线程专门处理这个请求。这种架构最符合直觉,但存在以下问题: 1. 不论是操作系统调度线程,或者是创建线程本身,都需要消耗一定资源。当并发数量增大时,这种消耗可能会变得不可忍受。 2. 多线程之间可能会并发访问数据,产生竞态条件。需要花费额外的精力去处理锁,使得编程容易出错并且趋于复杂。

I/O 多路复用则指利用操作系统提供的系统调用,如 epoll 或 select,使得多个套接字能够在一个线程中被并发地、交替的处理。也就是说,多个套接字复用同一个线程。具体来说,epoll、select 等系统调用能够监听多个套接字,当这些套接字发生某些值得关注的事件(如可读、可写等),就通知其调用者。调用者得到通知后,配合非阻塞 I/O^3,就能立即对套接字进行读写而不用陷入阻塞等待 I/O 可用。

一般来说,大多数 web 服务都将时间消耗在等待 I/O 上,属于 I/O 密集型而非计算密集型程序。因此,利用 I/O 多路复用能够得到良好的并发性。

Reactor 模式是一种事件处理模式,用于处理一个或多个并发传递给服务器的请求^4。它利用 I/O 多路复用将到来的请求分发给相应的处理程序。 redis 使用 reactor 设计模式来是实现其事件循环框架。

具体来说,reactor 设计模式定义了以下四种结构:

  • Resources:向系统提供输入或消耗输出的任何资源。
  • Synchronous Event Demultiplexer:同步事件分离器。监听资源信息,当有资源可用时,通知分派器。
  • Dispatcher:负责注册或注销事件处理程序,以及将事件分离器传递过来的资源交付给相应的处理程序。
  • Request Handler:最终处理资源的程序。

在 redis 中,Resources 就是客户端的套接字,Synchronous Event Demultiplexer 由 epoll、select 等 I/O 多路复用相关的系统调用实现,Dispatcher 是文件事件分派器,Request Handler 是注册的命令。如图:

以上各个部件在接下来详细解释。

I/O 多路复用

redis 使用 I/O 多路复用实现了 reactor 设计模式中的 Synchronous Event Demultiplexer。相关的代码分布在 ae_epoll.c、ae_select.c 中,指分别使用 epoll、select 等系统调用实现的多路复用。这些文件虽然使用了不同的系统调用和编码方式,但为上层提供了一套统一的 api。在不同的系统中,将会根据可用性和性能包含其中的一个文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif

此处以 ae_epoll.c 为例,分析它的实现。epoll 的用法见 https://en.wikipedia.org/wiki/Epoll。

首先关注创建 epoll 对象的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;

static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));

if (!state) return -1;

// 为事件分配内存
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
// 创建一个 epoll 对象,保存它的文件描述符
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}

调用 aeApiCreate 之后,就在系统中注册了一个 epoll 对象,并且将相应的文件描述符保存在 aeApiState.epfd 中。之后就能通过它注册相应事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// fd:套接字的描述符,mask:注册的事件类型
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0};
// 如果事件已经被创建,那么需要 EPOLL_CTL_MOD 修改操作,否则需要 EPOLL_CTL_ADD 添加操作
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;

ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
// 使用 epoll_ctl 为响应的描述符注册事件
// state->epfd 由 aeApiCreate 保存
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}

aeApiAddEvent 内部使用了 epoll_ctl 为相应描述符注册事件。使用相应的套接字描述符 fd,以及想要关联的事件类型 mask 调用 aeApiAddEvent 之后,当套接字的状态变化为 mask 指定的类型时,就能通过 epoll_wait 得到通知。

之后可以使用 aeApiPoll 等待事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;

// epoll_wait 返回激活的事件数量,并且将相应信息保存在 state->events 中,它会等待由 tvp 指定的时间,如果过期还没有事件,就立即返回
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;

numevents = retval;
// 遍历 state->events,查找激活的事件
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;

if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
// 将激活的套接字描述符及事件类型,保存在 eventLoop->fired 中
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}

除此之外,还有aeApiDelEventaeApiResizeaeApiFreeaeApiName 等函数,此处不详细叙述。

这些函数虽然在不同系统中有不同实现,但接口都是相同的。调用它们,就能使用操作系统提供的 I/O 多路复用功能得知相应套接字状态的变化信息。redis 基于这些函数实现事件循环。

事件

redis 自己实现了一个事件循环框架。它的主要运行过程,实际上就是一直在框架中循环等待事件,然后交替执行的过程。具体来说,redis 中存在文件事件和时间事件两种类型的事件。

时间事件类型的定义如下:

1
2
3
4
5
6
7
8
9
10
11
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
} aeTimeEvent;

可以看到,里面包含了事件应该发生的时间 when_sec、when_ms,以及事件处理函数 timeProc。并且可以组织为一个双向链表,以在每一次事件循环中尝试查找到来的事件。

文件事件类型的定义如下:

1
2
3
4
5
6
7
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;

mask 指示了事件的类型,rfileProc 和 wfileProc 是相应读写事件发生时应该调用的处理函数。

redis 中,时间事件是指应该周期检查、执行的任务,如 rdb 持久化、清理过期键值对等。目前 redis 中只有一个时间事件,即 serverCron,所有的周期任务都放在里面完成。

redis 的网络事件处理器由文件事件构成。它使用 I/O 多路复用监听多个套接字,当套接字的状态发生相应变化时,如创建、可读、可写、关闭,就激活相应的文件事件。

事件循环

事件循环相关的信息保存在一个结构体里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
} aeEventLoop;

events 保存了相关的文件事件,fired 保存了激活的文件事件。

在进行事件循环前,要先创建这个结构体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;

if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);

...

if (aeApiCreate(eventLoop) == -1) goto err;
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
}

aeCreateEventLoop 为事件分配了内存,调用了上面提到的 aeApiCreate 初始化 I/O 多路复用结构,然后将事件的状态 mask 设为 AE_NONE,表明还没有设置这个事件。

创建了结构体之后,就可以注册事件:

1
2
3
4
5
6
7
8
9
10
11
12
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
return AE_OK;
}

aeCreateFileEvent 将指定的套接字描述符、事件类型和处理程序注册到事件循环中。之后开始事件循环后,如果相应套接字的事件到来,就会调用给定的处理程序。

之后就可以调用 aeMain 开始事件循环。实际上,在 redis 的整个生命周期中,实际上就是一直在进行如下的事件循环:

1
2
3
4
5
6
7
8
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}

只要 eventLoop->stop 没有被置为 1,就一直进行循环。如果有 beforesleep,在处理本轮循环前先调用它。在 aeProcessEvents 中正式处理循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;

int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;

// 搜索时间事件链表,找到最近到来的时间事件
shortest = aeSearchNearestTimer(eventLoop);
...
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;

// I/O 多路复用,等待至 tvp,也就是最近的时间事件激活时间
numevents = aeApiPoll(eventLoop, tvp);

/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);

// 处理到来的文件事件
for (j = 0; j < numevents; j++) {
// 找到相应的文件事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */

// 如果是可读事件,就调用 rfileProc
if (fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}

/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}

processed++;
}

// 处理时间事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);

return processed; /* return the number of processed file/time events */
}

aeProcessEvents 调用 aeApiPoll 等待发生文件事件,直至最近的时间事件到来。如果有文件事件,就先处理文件事件,再处理事件时间。否则就一直沉睡至时间事件到来。

当 aeApiPoll 返回后,激活的文件事件信息就保存在 eventLoop->fired 中。之后根据 eventLoop->fired 调用 rfileProc 或 wfileProc 等处理程序。

处理流程

启动服务器程序时,redis 会初始化服务器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void initServer(void) {
...

server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

...

/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}

...
}

在其中调用了 aeCreateEventLoop 创建事件循环,然后根据指定的套接字注册了文件事件,类型设置为 AE_READABLE,意为当这些套接字变得可读就激活事件。

在 server.c main 函数中开启了事件循环:

1
2
3
4
5
6
7
int main(int argc, char **argv) {
...

aeMain(server.el);

...
}

可见,redis 一直在它生命周期的事件循环中监听指定套接字上的可读事件。一旦请求到来,就会调用 acceptTcpHandler:

1
2
3
4
5
6
7
8
9
10
11
12
13
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];

while(max--) {
// cfd: client fd
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);

...

acceptCommonHandler(cfd,0,cip);
}
}

使用 anetTcpAccept 创建链接得到客户端的套接字 cfd 之后,调用 acceptCommonHandler 处理这个请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c;
c = createClient(fd);

...
}

client *createClient(int fd) {
client *c = zmalloc(sizeof(client));

if (fd != -1) {
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}

...
}

...
}

这里又注册了一个文件事件 readQueryFromClient,当套接字中有可读的命令时,就会调用它。

最终,redis 会将读取的命令保存在 c->cmd 结构体中,并调用 lookupCommand 找到对应的处理程序,如果条件合适,就调用它:

1
2
3
4
5
6
7
void call(client *c, int flags) {
...

c->cmd->proc(c);

...
}

c->cmd->proc 会将输出保存在 c 中的缓冲区中。

何时发送保存在缓冲区中的结果呢?

在每一轮事件循环开始前,也就是beforesleep中,都会尝试发送数据,并且注册发送数据的文件事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
void beforeSleep(struct aeEventLoop *eventLoop) {
...

/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites();

...
}

int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);

listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);

/* Try to write buffers to the client socket. */
if (writeToClient(c->fd,c,0) == C_ERR) continue;

/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {

...

if (aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
}
return processed;
}

其中,server.clients_pending_write为一个链表,包含还未进行写入操作的客户端。

GFS 面面观

paper: https://static.googleusercontent.com/media/research.google.com/en//archive/gfs-sosp2003.pdf

GFS 是 Google 设计的一个用于大规模数据密集型应用的、可扩展的分布式文件系统,部署在普通硬件设备上,并且能够为用户提供出色的性能和灾难冗余能力。虽然 Google 并没有将 GFS 开源,但是却在 2004 年发布了一篇论文公布了其一些技术细节,影响了以后的诸多分布式文件系统。

本文是对原论文要点的记录。

设计假设

GFS 与传统文件系统共享某些相同的设计目标,如性能、可伸缩性、可靠性以及可用性。然而,GFS 的设计人员审视了他们的应用环境以及使用场景,并基于此给为 GFS 提供了某些不同的设计假设:

  1. 组件失效是常态事件,而不是意外事件。
    • GFS 部署在几百甚至几千台普通的廉价商业设备上。因此在任何时刻都有可能有某些硬件无法访问或工作异常。因此 GFS 从设计初期考虑到这一点,以将持续监控、错误检测、灾难冗余和自动恢复集成到 GFS 中为目标。
  2. 巨大的文件。
    • 存储在 GFS 中文件通常是巨大的,数 GB 的文件很普遍。因此,需要重新审视分布式文件系统中的某些设计决策,如 I/O 操作和 block 的大小等。
  3. 大部分修改的文件操作以追加写入而不是随机写入的方式。
    • 用户通常对文件进行追加写入,然后大量的顺序读。而不是频繁的随机读写。因此,GFS 特别为追加写入而优化,并保证其是原子性的。
  4. 应用程序和 GFS 协同设计。
    • 通过放宽对一致性的要求,采用弱一致性模型,大大简化了 GFS 的设计。引入了原子性的追加写入操作,不需要额外的同步设计。

设计概述

接口

虽然 GFS 提供了一套与传统文件系统类似的 API 接口,但并非完全符合 POSIX 标准。GFS 支持如创建新文件、删除文件、打开文件、关闭文件、读写文件、快照等操作。

架构

GFS 包含一个单独的 master 节点,以及多个 Chunk 服务器节点。整体架构如图:

figure

具体上来说:

  • GFS 储存的文件都被分成固定大小的 Chunk,并且被复制在多个 Chunk 服务器上。在一个 Chunk 被创建的时候,master 会分给 chunk 一个全局唯一的标识。默认情况下,一个 Chunk 拥有三个储存复制节点。
  • master 节点拥有所有的元数据,如:名字空间、访问控制信息、文件和 Chunk 的映射,Chunk 和位置的映射等。master 还管理系统范围内的活动,如 Chunk lease 管理、垃圾回收、Chunk 迁移等。master 会发送心跳包定期和 Chunk 服务器交互以知晓其状态。
  • 客户端只和 master 交互获得元数据,然后再根据元数据和 Chunk 服务器直接交互。

单一 master

单一 master 简化了整个系统的设计,但在另一种意义上,也很容易成为整个系统的瓶颈。因此必须尽量减少对 master 节点的读写。因此,客户端不从 master 节点获取数据。而是告诉 master 要读取的文件和区域,然后 master 返回这个区域所在的 chunk 服务器的位置。由客户端自己联系 Chunk 服务器获取数据。

Chunk 尺寸

Chunk 的大小是设计要素之一。 GFS 选择一个 Chunk 为 64 MB,远大于常规文件系统的 block size。选取一个大的 Chunk 大小有如下优点:

  1. 减少了客户端和 master 的通信需求。从而降低 master 的工作负载。
  2. 采用较大的尺寸,可以对一个块进行更多的操作。通过与 Chunk 服务器保持长时间 tcp 链接,从而降低网络负载。
  3. 减少了 master 需要保存在内存中的元数据信息。

元数据

master 储存三种类型元数据:

  1. 文件和 Chunk 的命名空间。
  2. 文件和 Chunk 的映射关系。
  3. 每个 Chunk 副本的存放地点。

前两种数据会定期被写入日志文件中,并被同步到远程服务器。当 master 发生崩溃时,就可以借助这些备份以恢复服务。第三种数据由 master 启动或运行时,向每个 Chunk 服务器轮询获取信息。

一致性模型

定义:

  • 一致:如果所有客户端无论从哪个副本读,读到的数据都是一样的,那么就认为这个数据是一致的。
  • 已定义:修改文件之后,相关数据是一致的,并且客户端能够读取到它刚才修改的内容,那么相关数据是已定义的。已定义暗含了一致。

GFS 提供一个弱一致性模型。对于并发随机写入操作来说,数据是一致的,但是未定义的。对于追加写入操作来说,客户端总是可以读取到一致并且已定义的数据,但是实际储存在各个 Chunk 服务器上的数据可能有部分是不一致的。

GFS 通过以下几点保证数据一致性:

  • 对 Chunk 的所有副本修改操作顺序一致。
  • 使用 Chunk 版本号来检测副本因为服务器宕机而失效。
  • 与服务器定期握手来找到失效的 Chunk 服务器。
  • 使用 Checksum 来校验数据是否损坏。

系统交互

lease

GFS 使用 lease 来保证写入写入操作的一致性。master 会选出 chunk 的一个副本来发放 lease,称为副本的主 Chunk 服务器。这个 lease 有效期为 60s。当过了有效期之后,master 可能会选出一个新副本发放 lease。当有写入请求时,master 返回给客户端所有副本的位置以及哪一个副本持有 lease。客户端发送数据给最近的副本,然后数据沿拓扑向流向所有副本。当所有的副本都受到了数据后,客户端发送消息给主 Chunk 服务器。主 Chunk 服务器为收到的所有请求分配连续的序号,这些请求可能来自不同的客户端。它按照这个顺序将改变应用于自己的本地磁盘中,然后将顺序推送给所有从服务器。

figure

所有从 Chunk 服务器返回成功的消息给主 Chunk 服务器之后,主 Chunk 服务器才向客户端返回成功的消息。

如果有任何从 Chunk 服务器执行失败,主 Chunk 服务器会将这个错误报告给客户端,由客户端来重新发起请求。

数据流

数据流与写入操作的控制流程是分离的。在客户端收到 master 发送的位置信息后,它从这些位置中选出一个最近的 Chunk 服务器发送数据。然后数据沿着 Chunk 服务器顺序链发送给所有需要的 Chunk 服务器。

原子追加记录

GFS 保证追加记录的原子性。当存在多个并发的追加记录时,GFS 对于每个追加记录至少有一次是写入成功的。GFS 指定写入的偏移量并且在之后返回。

追加记录在之前描述的控制流程之上加了一些额外的步骤。对于一次指定的追加记录来说,主 Chunk 服务器会检查给定 Chunk 的大小。如果追加记录使得这个 Chunk 的大小超过限制,服务器会填充这个 Chunk 到最大大小然后指示客户端发起请求写入下一个 Chunk。

如果追加记录在任意一个 Chunk 服务器失败了,客户端需要进行重新操作。重新操作的结果是,同一个 Chunk 的副本可能拥有不一致的记录。GFS 不保证所有副本在字节序上是完全一致的,但至少保证有一次成功的写入。

因此,追加记录将使数据是一致的,但可能包含部分不一致的局部数据,如填充和错误。

快照

写时复制。master 维持一个对所有 chunk 服务器的引用计数。当客户端发起一次快照请求时,并不实际进行复制,而是将相应服务器的引用计数加一。当对相应服务器发起写入请求时,master 注意到它的引用计数大于一,然后才复制一个新的 chunk,并指示客户端写入这个新 chunk。引用计数减一。

master 节点操作

名称空间管理和锁

GFS 的名称空间是一个全局路径和元数据映射关系的查找表。这个表使用前缀压缩储存在内存中。在名称空间的属性结构上,每个节点都是都有一个关联的读写锁。

每个 master 节点的操作开始之前,都需要获得其路径上所有节点的读锁,以及最终节点的读写锁。

重新复制、负载均衡

当 master 节点创建一个新的 chunk 时,它会选择放置初始空间的位置,会考虑几个因素:

  • 在低于平均硬盘使用率的 Chunk 服务器上创建副本以平衡硬盘使用率。
  • 限制每个 Chunk 服务器最近的 Chunk 创建次数。因为创建操作通常意味着之后又大量的写入操作。
  • 为了容错性,使 Chunk 分布在多个机架。

当 Chunk 的有效副本数少于用户指定的复制数量时,master 节点会重新复制它。
同时,master 节点会周期性地进行负载均衡,检查当前副本的分布情况,然后移动副本以获得更好的空间利用率。

垃圾回收

GFS 在文件删除之后并不会立刻回收可用的物理空间。而是采用惰性回收的策略。
当删除一个文件时,只是把文件重命名为一个特殊的文件名以使其不可见,在实际删除之前,仍然可以恢复。

失效检测

master 节点维持有所有 Chunk 的版本号。客户机或 Chunk 服务器在执行操作时都会验证版本号。
master 在例行的垃圾回收工作中移除所有失效的副本。

容错

master 节点的复制

为了保证 master 节点的可靠性,master 服务器的状态也要复制。master 服务器的所有操作日志和 checkpoint 会被复制到多台远程机器上。
此外,GFS 中还有”影子” master 服务器,这些影子服务器在 master 宕机时提供只读访问。它们的数据通常比 master 更新得慢一点。

数据完整性

每个 Chunk 服务器使用 Checksum 来检查保存的数据是否损坏。对于读操作来说,在把数据返回给客户端或者其它的 Chunk 服务器之前,Chunk 服务器会校验读取操作涉及的范围内的块的 Checksum,因此 Chunk 服务器不会把错误数据传递到其它机器上。如果某个 Checksum 不正确,Chunk 服务器返回给请求者错误信息并通知 master,master 会重新复制副本以替代错误版本。

MapReduce 阅读纪要

paper: https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf

MapReduce 指一个由 Google 提出的,用于在集群上使用并行、分布式算法处理大规模数据的编程模型,以及一系列相关实现。本文是我阅读 Google 原论文之后的一些纪要。

MapReduce 模型

在 lisp 以及许多其它函数式语言中,常常存在用于处理顺序表数据结构的 mapreduce 等原语。在本质上,它们是能够以一个顺序表及处理函数为参数的高阶函数

map 将一一以顺序表中的元素为参数调用这个处理函数,并以这些调用返回的结果作为元素形成一个新的顺序表。也就是说,这个处理函数将作用于原顺序表中的每一个元素。以 python 为例:

1
2
3
4
# square
>>> result = map(lambda x: x * x, [1, 2, 3, 4, 5])
>>> list(result)
[1, 4, 9, 16, 25]

reduce 同样将遍历顺序表,但它以前一次处理函数返回的结果,以及当前元素为参数递归的调用处理函数:

1
2
3
# sum
>>> reduce(lambda a, b: a + b, [1, 2, 3, 4, 5])
15

Google 受这些原语所启发,发现大多数现实世界的分布式作业拥有与其相似的特征。例如,我们可以以这样的方式来统计一大堆文档集合里的词频:

1
2
3
4
5
6
def map(key: string, value: string):
for word in value:
emit(word, "1")

def reduce(key: string, values: Iterator):
emit(string(len(values)))

在这里,map 常常接受文件名和内容作为参数,返回一系列键值对。reduce 以键名和其相关的一系列值为参数,返回最终的结果。在上面的例子中,map 将读取文档内容,然后返回形位 word: "1" 形式的键值对。MapReduce 实现会将键名相同的值集合在一起,传递给 reduce 函数。而 reduce 函数则简单统计了键对应值的数量。

在单处理器环境中以这种模型运行,不一定会获得比通常方式更高的效率。然而,正如文章开始所说的那样,MapReduce 应用于分布式计算。在实践中,这种受限的模型,能够让框架方便地将任务分布式化,并自动处理诸如数据划分、容错、负载均衡等任务。

MapReduce 模型中的 Map,是由用户指定的函数。Map 接收一个 key/value pair 值,并且产生一系列中间 key/value pair 值。MapReduce 库将拥有相同 key 值的 pair 聚合在一起并传给 Reduce 函数。

Reduce 同样由用户定义。它接受一个 key 以及一系列相关的 value,形成更小的 value 集合。典型地一个 Reduce 调用产生零个或一个 value 值。

通过将输入数据自动划分为 M 个片段,Map 在多台机器上被分布式地调用。通过将 Map 产生的结果划分为 R 份,Reduce 也能在多台机器上调用。R 的数量和划分函数通常是由用户指定的。

MapReduce 模型的典型执行流程如下:

  1. 一开始,用户程序中的 MapReduce 库将输入文件划分为 M 段。段的大小可配置,通常为 64M。然后,用户程序在集群机器中创建程序的副本。
  2. 其中一个副本被指定为 master。其余的都是由 master 分配任务的 workers。这里有 M 个 Map 任务,以及 R 个 Reduce 任务可以分配。 master 将选择一个空闲的 worker 分配任务。
  3. 被指定了 map 任务的 worker 将读取响应的输入的划分。它从输入中解析出 key/value pair,然后传递给用户定义的 Map 函数,将产生的中间 key/value pair 缓存在内存中。
  4. 缓存的数据周期性地写入本地磁盘,由划分函数划分为 R 个区域。这些区域地位置将发送给 master,以被 master 转发给 reduce work。
  5. 所有的 map 任务完成后,master 开始分配 reduce 任务。master 将对应的地址传送给 reduce worker。reduce worker 通过远程过程调用读取这些区域的内容。读取完之后,reduce work 通过键将这些内容排序。然后将拥有相同的 key 的 values 传递给用户定义的 Reduce 函数。Reduce 函数的输出将会被追加到分区对应的文件中。
  6. reduce 任务完成后,返回用户调用 MapReduce 过程的地方,继续执行用户接下来的程序。

容错处理

Worker 错误

master 会周期性地 ping 每一个 worker,如果在一段时间之内,一个 worker 都没有响应,那么 master 会认为这个 worker 发生了错误。如果这个 worker 执行的是 map 任务,那么这个 worker 执行过的所有任务都将被 master 安排在其它 worker 上重新执行。如果执行的是 reduce 任务,那么当前执行的任务会被安排在其它 worker 上重新执行。

之所以要重新执行失败 map worker 上的所有任务,是因为 map worker 产生的输出是保存在本地磁盘上的,一旦失败则无法读取。而 reduce worder 的输出保存在全局文件系统上,失败只需要重新执行当前任务即可。

Master 错误

周期地将 master 保存地数据结构保存为 checkpoint 存入磁盘。一旦 master 发生错误,就从最近保存的 checkpoint 中恢复。

失效处理机制

当用户定义的 Map 和 Reduce 是确定性的时,MapReduce 的输入输出和在程序串行、没有出现错误的情况下执行的输入输出是一致的。MapReduce 通过自身文件系统的原子性操作保证这一点成立。
值得注意的是,MapReduce 的错误处理都假设在 fail-stop 上,由于硬件或软件原因的结果错误无法处理。

优化

储存位置

网络容易成为整个系统的限制。因此,MapReduce 可以构建于一个分布式文件系统(例如: gfs)之上。在分配任务时,MapReduce 从文件系统之中读取各个文件储存的位置,然后让 map worker 读取最近的输入,从而降低对网络的要求。

任务细度

为了更好的负载均衡和容错策略,通常使 M 和 R 为 worker 数量的几倍。

备份任务

实践表明,大部分 worker 都执行完毕的情况下,仍然在执行的几个 worker 可能由于机器的软硬件因素仍然要执行很长时间。因此,MapReduce 会自动将最后几个任务备份到多个 worker 上执行。以先完成的为结果。

技巧

划分函数

默认情况下,通常通过哈希来划分输入。然而,如果用户有某些特别需求可以自定义划分函数。

保证排序

传递给 Reduce 函数的 values 是经过排序的。

合并函数

可以给 Map 提供一个合并函数。这个合并函数通常和 Reduce 函数一样,在 Map 完成之后本地调用,以减少最终需要网络传输的数据数量。

跳过坏任务

可能由于用户代码或某些第三方库的原因,一些任务的执行将始终失败。worker 在失败时会发送一条消息给 master。master 会记住任务失败的次数。如果次数太多,会跳过这个任务的执行。