paper: https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf

MapReduce 指一个由 Google 提出的,用于在集群上使用并行、分布式算法处理大规模数据的编程模型,以及一系列相关实现。本文是我阅读 Google 原论文之后的一些纪要。

MapReduce 模型

在 lisp 以及许多其它函数式语言中,常常存在用于处理顺序表数据结构的 mapreduce 等原语。在本质上,它们是能够以一个顺序表及处理函数为参数的高阶函数

map 将一一以顺序表中的元素为参数调用这个处理函数,并以这些调用返回的结果作为元素形成一个新的顺序表。也就是说,这个处理函数将作用于原顺序表中的每一个元素。以 python 为例:

1
2
3
4
# square
>>> result = map(lambda x: x * x, [1, 2, 3, 4, 5])
>>> list(result)
[1, 4, 9, 16, 25]

reduce 同样将遍历顺序表,但它以前一次处理函数返回的结果,以及当前元素为参数递归的调用处理函数:

1
2
3
# sum
>>> reduce(lambda a, b: a + b, [1, 2, 3, 4, 5])
15

Google 受这些原语所启发,发现大多数现实世界的分布式作业拥有与其相似的特征。例如,我们可以以这样的方式来统计一大堆文档集合里的词频:

1
2
3
4
5
6
def map(key: string, value: string):
for word in value:
emit(word, "1")

def reduce(key: string, values: Iterator):
emit(string(len(values)))

在这里,map 常常接受文件名和内容作为参数,返回一系列键值对。reduce 以键名和其相关的一系列值为参数,返回最终的结果。在上面的例子中,map 将读取文档内容,然后返回形位 word: "1" 形式的键值对。MapReduce 实现会将键名相同的值集合在一起,传递给 reduce 函数。而 reduce 函数则简单统计了键对应值的数量。

在单处理器环境中以这种模型运行,不一定会获得比通常方式更高的效率。然而,正如文章开始所说的那样,MapReduce 应用于分布式计算。在实践中,这种受限的模型,能够让框架方便地将任务分布式化,并自动处理诸如数据划分、容错、负载均衡等任务。

MapReduce 模型中的 Map,是由用户指定的函数。Map 接收一个 key/value pair 值,并且产生一系列中间 key/value pair 值。MapReduce 库将拥有相同 key 值的 pair 聚合在一起并传给 Reduce 函数。

Reduce 同样由用户定义。它接受一个 key 以及一系列相关的 value,形成更小的 value 集合。典型地一个 Reduce 调用产生零个或一个 value 值。

通过将输入数据自动划分为 M 个片段,Map 在多台机器上被分布式地调用。通过将 Map 产生的结果划分为 R 份,Reduce 也能在多台机器上调用。R 的数量和划分函数通常是由用户指定的。

MapReduce 模型的典型执行流程如下:

  1. 一开始,用户程序中的 MapReduce 库将输入文件划分为 M 段。段的大小可配置,通常为 64M。然后,用户程序在集群机器中创建程序的副本。
  2. 其中一个副本被指定为 master。其余的都是由 master 分配任务的 workers。这里有 M 个 Map 任务,以及 R 个 Reduce 任务可以分配。 master 将选择一个空闲的 worker 分配任务。
  3. 被指定了 map 任务的 worker 将读取响应的输入的划分。它从输入中解析出 key/value pair,然后传递给用户定义的 Map 函数,将产生的中间 key/value pair 缓存在内存中。
  4. 缓存的数据周期性地写入本地磁盘,由划分函数划分为 R 个区域。这些区域地位置将发送给 master,以被 master 转发给 reduce work。
  5. 所有的 map 任务完成后,master 开始分配 reduce 任务。master 将对应的地址传送给 reduce worker。reduce worker 通过远程过程调用读取这些区域的内容。读取完之后,reduce work 通过键将这些内容排序。然后将拥有相同的 key 的 values 传递给用户定义的 Reduce 函数。Reduce 函数的输出将会被追加到分区对应的文件中。
  6. reduce 任务完成后,返回用户调用 MapReduce 过程的地方,继续执行用户接下来的程序。

容错处理

Worker 错误

master 会周期性地 ping 每一个 worker,如果在一段时间之内,一个 worker 都没有响应,那么 master 会认为这个 worker 发生了错误。如果这个 worker 执行的是 map 任务,那么这个 worker 执行过的所有任务都将被 master 安排在其它 worker 上重新执行。如果执行的是 reduce 任务,那么当前执行的任务会被安排在其它 worker 上重新执行。

之所以要重新执行失败 map worker 上的所有任务,是因为 map worker 产生的输出是保存在本地磁盘上的,一旦失败则无法读取。而 reduce worder 的输出保存在全局文件系统上,失败只需要重新执行当前任务即可。

Master 错误

周期地将 master 保存地数据结构保存为 checkpoint 存入磁盘。一旦 master 发生错误,就从最近保存的 checkpoint 中恢复。

失效处理机制

当用户定义的 Map 和 Reduce 是确定性的时,MapReduce 的输入输出和在程序串行、没有出现错误的情况下执行的输入输出是一致的。MapReduce 通过自身文件系统的原子性操作保证这一点成立。
值得注意的是,MapReduce 的错误处理都假设在 fail-stop 上,由于硬件或软件原因的结果错误无法处理。

优化

储存位置

网络容易成为整个系统的限制。因此,MapReduce 可以构建于一个分布式文件系统(例如: gfs)之上。在分配任务时,MapReduce 从文件系统之中读取各个文件储存的位置,然后让 map worker 读取最近的输入,从而降低对网络的要求。

任务细度

为了更好的负载均衡和容错策略,通常使 M 和 R 为 worker 数量的几倍。

备份任务

实践表明,大部分 worker 都执行完毕的情况下,仍然在执行的几个 worker 可能由于机器的软硬件因素仍然要执行很长时间。因此,MapReduce 会自动将最后几个任务备份到多个 worker 上执行。以先完成的为结果。

技巧

划分函数

默认情况下,通常通过哈希来划分输入。然而,如果用户有某些特别需求可以自定义划分函数。

保证排序

传递给 Reduce 函数的 values 是经过排序的。

合并函数

可以给 Map 提供一个合并函数。这个合并函数通常和 Reduce 函数一样,在 Map 完成之后本地调用,以减少最终需要网络传输的数据数量。

跳过坏任务

可能由于用户代码或某些第三方库的原因,一些任务的执行将始终失败。worker 在失败时会发送一条消息给 master。master 会记住任务失败的次数。如果次数太多,会跳过这个任务的执行。