MapReduce [Q&A]

date
Jul 12, 2022
slug
mapreduce-qa
status
Published
tags
MapReduce
Q&A
Distributed
summary
MapReduce
type
Post

Q1. MapReduce 是什么?

MapReduce 是处理和生成大数据集的编程模型和对应的实现。用户可以指定一个 Map 函数和 Reduce 函数。Map 函数处理输入的文件内容,然后生成一组临时的 <key, value>。Reduce 函数将所有相关联 key 的临时的 <key, value> 合并起来。现实中许多任务都可以表示为这个模型。
MapReduce 已经将所有实现细节隐藏起来了。不需要程序员有相关并发编程或分布式系统经验,他们编写的 Map 和 Reduce 函数会并行地运行在大量商业机器组成的集群中。

Q2. MapReduce 可以用来做什么?

反向索引,将一个单词与文章进行映射,可以快速地通过单词找到该单词出现在哪些文章中。
分布式排序,如果数据量太大无法读取到机器的内存中进行排序,可以用 MapReduce 生成多个局部排序的结果,最后再进行多路归并。

Q3. MapReduce 具体执行流程是怎样的?

notion image
  1. MapReduce 库首先将输入的文件分成 M 个片,每一个片相当于一个 Map 任务。每一片的大小通常是 16MB 到 64MB。接着在集群上启动 Master 和一些 Worker。
  1. Master 需要将 M 个 Map 任务和 R 个 Reduce 任务分配给这些 Worker。
  1. 一个 Worker 被分配了 Map 任务,它将从对应的输入分片中读取内容,生成一组临时的 <key, value> 存储在内存中。Worker 定期地将内存中的 <key, value> 写入本地磁盘,接着它把写入位置信息传递给 Master。(写入时使用分区函数将输出的文件分成 R 个区域,每一个区域供给一个 Reduce 任务读取)
  1. 当所有 Map 任务执行完后,开始执行 Reduce 任务。Master 会将需要读取的位置信息传递给 Reduce,Reduce RPC 读取 Worker 上本地磁盘存储的临时 <key, value>。它将相同 key 的组合在一起,通过 key 排序,最后传递给用户指定的 Reduce 函数。Reduce 函数的结果会输入到一个结果文件里。(一个 Reduce 任务一个输出文件)

Q4. MapReduce Reduce 任务的输出文件需要合并吗?

都可以,看具体的需求。如果想把这些输出文件传递给下一个 MapReduce 任务或者其他的能读取许多文件的处理应用,就不需要合并。

Q5. Master 的数据结构是怎样的?

存储每一个 Map 任务和 Reduce 任务的状态,比如空闲、进行中、完成。还需要存储每一个 Worker 机器的标识,以及存储在 Worker 上的 Map 任务输出的 R 个临时文件的大小和位置。

Q6. MapReduce 如何容错?

Master Ping 每一个 Worker 以确保它在线。如果没有响应则认为 Worker 出错。
  • Worker 出错
对于 Map 任务,所有完成与正在执行的都需要重置为空闲状态,让它们得以被调度在新 Worker 上。完成与未完成 Map 任务,他们的临时文件存储在了 Worker 自己的本地磁盘上,Worker 无响应了证明现在也无法读取这些临时文件了,必须在其他 Worker 上重新执行。
对于 Reduce 任务,正在执行的需要重置为空闲状态重新执行。已完成的 Reduce 任务不需要重新执行,因为它的输出已经存储在分布式文件系统中。特别的,如果一个 Map 任务被重新执行了,正在执行的 Reduce 任务也必须重新执行,因为 Worker 失效导致的 Map 任务重新执行,也会导致该 Map 任务输出的临时文件位置发生变化。
  • Master 出错
虽然使用检查点机制重启一个 Master 很简单,但还是建议重新执行整个 MapReduce 任务。一般 Map 任务和 Reduce 任务都是确定的、幂等的。
 
总之就是,如果用户提供的 Map 函数和 Reduce 函数都是确定的,则 MapReduce 库输出的结果与顺序执行产生的结果一致。如果 Map 函数和 Reduce 函数是不定的,MapReduce 只能提供较弱的语义性。

Q7. MapReduce 如何对网络优化?(Locality)

Master 在尝试分配 Map 任务或 Reduce 任务时,优先分配读取的数据分片是存储在本地磁盘上的(分布式文件系统的副本储存在自己本地磁盘中),这样可以减少部分读取的网络带宽。

Q8. Map 任务和 Reduce 任务的数量怎么指定比较好?(Task Granularity)

Master 至少调度 O(M + R) 次,且储存 O(M * R) 个状态(临时文件的信息),虽然每一个状态占用也不太大。
实际上,选择的 M 最好能让输入内容的分片大小与上分布式文件系统的文件块大小相匹配。(Hadoop 分成 64MB 一片,则 M 尽量让输入内容的分片大小尽量接近 64MB)
R 应该是机器数量的小倍数,比如现在拥有 2000 个 Worker 机器,则 R 可以选择 5000。

Q9. 如果有几个 Map 任务和 Reduce 任务特别慢,拖累了整个 MapReduce 任务怎么办?(Backup Task)

这种情况可能是因为机器性能导致的,MapReduce 的最后几个 Map 任务或 Task 任务特别慢。此时只有几个机器在执行,其他机器都是空闲状态。MapReduce 库可以重新调度这些非常慢的任务,在其他空闲的机器上执行它们的副本。只要最初的任务或复制任务其中一个完成,则可以视该任务完成。

Q10. 分区函数的巧用?

分区函数用来界定,一个 <key, value>,应该传递给哪个 Reduce 任务,或者说,哪些 <key, value> 应该输出到同一个输出文件里。论文中有一个例子,可以将拥有相同前缀域名的地址输出到同一个文件中。hash(Hostname(urlkey)) mod R。

Q11. Combiner 函数?

MapReduce 允许用户指定一个 Combiner 函数,它的输入输出与 Reduce 函数一样,它可以给 Map 任务输出的临时 <key, value> 做一些数据整理。它在每一个执行 Map 任务的机器上被执行。它和 Reduce 的最大区别是:Reduce 输出的到分布式文件系统里,Combiner 输出到即将要发送给 Reduce 任务的临时文件里。

Q12. 输入和输出形式?

怎样都可以,默认提供的是文本处理。如果有需要,可以实现自己的 reader 从数据库或是别的地方读取数据。

Q13. 如果执行过程中遇到段错误或第三方库错误怎么办?(Skipping Bad Records)

这些错误一般会导致执行的 Worker 停止,Worker 会用尽最后一口气发送一个 UDP 告知 Master。如果 Master 收到这些 UDP 不止一次,Master 会考虑跳过这个坏记录。

Q14. MapReduce 有哪些监控信息?(Status Information、Counters)

Master 会启动一个内部 HTTP 服务器,运行一个页面。里面涵盖了一些执行过程的重要信息,比如某个任务什么时候完成的,输入内容的大小,临时文件的大小,处理的速度等等。
而且 MapReduce 库也会自动维护一些计数器,比如被处理的键值对,输出的键值对等。用户也可以在 Map 函数里或 Reduce 函数里调用计数器,以便监控更详细的信息。

Q15. MapReduce 性能表现怎么样?

论文中测试了两类比较具有代表性的程序:将数据从一种表现形式转换为另一种表现形式(Sort);从大量数据中提取少量有用的信息(Grep)。不管是在正常执行、取消备份任务或是大量任务损坏时,MapReduce 都有一个较好的性能表现。
 

© shallrise 2023