MapReduce 详细解读
MapReduce 是分布式计算的经典模型,它通过“分而治之”和“计算向数据移动”的理念,极大地简化了大规模数据处理的难度。尽管在新一代引擎面前显得有些“笨重”,但它奠定了分布式计算的基础,理解 MapReduce 的细节对学习 Spark、Flink 等框架仍有很大帮助。核心要点回顾输入分片 → Map → Shuffle(分区、排序、拷贝) → Reduce → 输出数据本地化、容错、推测执行
·
MapReduce 详细解读
MapReduce 是 Google 提出的分布式计算模型,也是 Hadoop 的核心组件。它将复杂的、运行在大型集群上的并行计算过程,高度抽象为两个阶段:Map(映射) 和 Reduce(归约)。
一、设计思想与核心概念
1.1 分而治之
- Map:将输入数据切分成多个独立的小块,并行处理,输出中间键值对(
<key, value>)。 - Shuffle:自动将 Map 的输出按 key 进行排序、分组,并传输给对应的 Reduce。
- Reduce:对每个 key 对应的 value 列表进行聚合计算,输出最终结果。
1.2 数据本地化
- 移动计算比移动数据更高效。MapReduce 会将任务(Map/Reduce)调度到数据所在的节点上执行,减少网络传输。
1.3 容错性与简单性
- 系统自动处理节点故障(任务重试)。开发者只需实现
map()和reduce()函数,无需关心分布式细节。
二、MapReduce 执行流程
整个作业的执行分为 Job 提交 → Map 阶段 → Shuffle 阶段 → Reduce 阶段 → 结果输出。
2.1 总体流程图
输入分片 (splits) → Map任务 → 环形缓冲区(内存) → 分区/排序/溢写 → 归并(combine) →
→ 多个Map输出文件 → 拷贝到Reducer → 归并排序 → 分组 → Reduce任务 → 输出文件
2.2 详细步骤
步骤 1:输入分片(Input Splits)
- 客户端将输入数据(如 HDFS 中的文件)逻辑切分为多个 split(默认等于 HDFS 块大小,128MB)。
- 每个 split 对应一个 Map 任务。split 不移动数据,只记录偏移量。
步骤 2:Map 阶段
- 每个 Map 任务读取自己的 split,逐条调用
map(key, value, context)。 - Map 输出是一个键值对列表,写入环形内存缓冲区(默认 100MB)。
- 缓冲区达到阈值(80%)时,后台线程开始溢写(spill) 到磁盘。
步骤 3:Shuffle(洗牌)—— Map 端
- 分区(Partition):对每个 Map 输出的 key 计算分区号(默认
hash(key) % reduce个数),决定该键值对去哪个 Reduce。 - 排序(Sort):在每个分区内,按键进行快速排序(内存中先排,溢写时归并)。
- 合并(Combine,可选):类似本地 reduce,减少网络传输。例如
map输出<word,1>,combine 后变为<word, count>。 - 溢写:多次溢写产生多个小文件,最终归并(merge) 成一个大文件(已分区且区内有序)。
步骤 4:Shuffle —— Reduce 端
- Reduce 任务启动后,向各个 Map 任务的完成节点拉取(fetch) 属于自己的分区数据(HTTP 协议)。
- 拉取的数据先放入内存缓冲区,不足则溢写到磁盘。
- 对所有拉取到的文件(可能来自多个 Map)进行归并排序,形成一个大文件(按 key 有序)。
步骤 5:Reduce 阶段
- 对归并后的数据,按键分组(相同 key 的 value 形成一个迭代器)。
- 调用
reduce(key, values, context),处理该 key 的所有 values。 - 输出最终结果到 HDFS(通常一个 Reduce 生成一个文件)。
三、关键组件与数据结构
| 组件 | 作用 |
|---|---|
| JobTracker(旧版)/ ResourceManager | 作业调度、资源管理、监控任务进度。 |
| TaskTracker(旧版)/ NodeManager | 执行具体的 Map/Reduce 任务。 |
| InputFormat | 定义如何切分输入文件,并创建 RecordReader 读取键值对。 |
| OutputFormat | 定义如何输出结果到 HDFS。 |
| Partitioner | 决定 Map 输出分配给哪个 Reduce。默认是哈希。 |
| Combiner | 本地聚合,继承自 Reducer,在 Map 端运行。 |
| 环形缓冲区 | 每个 Map 任务的内存缓冲,存储序列化的键值对及元数据。 |
四、Shuffle 深度解析
Shuffle 是 MapReduce 最核心、最复杂的部分,性能瓶颈通常在此。
4.1 Map 端 Shuffle 优化
- 调整缓冲区大小:
io.sort.mb(默认 100MB),增大可减少溢写次数。 - 溢写阈值:
map.sort.spill.percent(默认 0.8),适当调高可减少溢写。 - Combine 使用:显著减少 Map 输出数据量,但需满足结合律(如求和、计数)。
4.2 Reduce 端 Shuffle 优化
- 并行拉取线程:
mapred.reduce.parallel.copies(默认 5),提高拉取速度。 - 内存缓冲比例:
mapred.job.shuffle.input.buffer.percent(默认 0.7),用于存储拉取的数据。 - 归并因子:
io.sort.factor(默认 10),控制一次归并的文件数。
五、容错机制
5.1 任务重试
- 如果某个 Map 或 Reduce 任务失败,JobTracker 会将其重新调度到其他节点,最多重试
mapred.map.max.attempts次(默认 4)。
5.2 推测执行(Speculative Execution)
- 当大部分任务已完成,个别任务运行缓慢(可能因硬件或负载),系统会在其他节点启动一个备份任务,取先完成的那个结果。可通过
mapred.map.tasks.speculative.execution控制。
5.3 节点故障
- 如果 TaskTracker 挂掉,该节点上所有已完成/未完成的任务都需要重新执行。JobTracker 将其上的任务调度到其他节点。
六、数据本地化
MapReduce 尽力让 Map 任务读取本地 HDFS 数据(块副本所在节点)。优先级:
- 本地节点(同一 DN)
- 同机架节点(网络延迟低)
- 其他机架
七、性能调优常用参数
| 参数 | 默认值 | 说明 |
|---|---|---|
mapreduce.map.memory.mb |
1024 | Map 任务容器内存 |
mapreduce.reduce.memory.mb |
1024 | Reduce 任务容器内存 |
mapreduce.map.java.opts |
-Xmx800m | Map JVM 堆内存 |
mapreduce.task.io.sort.mb |
100 | Map 环形缓冲区大小 |
mapreduce.map.sort.spill.percent |
0.8 | 溢写阈值 |
mapreduce.reduce.shuffle.parallelcopies |
5 | Reduce 并行拉取线程数 |
mapreduce.job.reduces |
1 | 默认 Reduce 个数 |
mapreduce.input.fileinputformat.split.maxsize |
256MB | 最大 split 大小 |
八、MapReduce 编程模型示例(WordCount)
// Mapper 类
public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
// Reducer 类
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
九、MapReduce 的局限性
- 不适合实时/交互式查询:任务启动、调度、磁盘溢写开销大,延迟高(分钟级)。
- 迭代计算效率低:如机器学习算法,每轮迭代需读写 HDFS,产生大量 I/O。
- 不适合复杂 DAG 计算:多个 MapReduce 链式依赖,中间结果持久化,效率低。
- 开发复杂度:需要编写 Java 代码,相比 SQL 或高级 API 不够友好。
这些局限性催生了 Spark、Flink 等新一代计算引擎。
十、MapReduce vs Spark 对比
| 特性 | MapReduce | Spark |
|---|---|---|
| 计算模型 | 两阶段(Map+Reduce),磁盘交互 | DAG,内存计算 |
| 中间结果 | 写入 HDFS | 优先内存,不足溢写 |
| 启动开销 | 每个任务启动 JVM | 线程级复用 |
| 容错方式 | 任务级重算 | 血统(lineage)重算 |
| 适用场景 | 批处理、ETL | 批处理、流处理、机器学习 |
十一、总结
MapReduce 是分布式计算的经典模型,它通过“分而治之”和“计算向数据移动”的理念,极大地简化了大规模数据处理的难度。尽管在新一代引擎面前显得有些“笨重”,但它奠定了分布式计算的基础,理解 MapReduce 的细节对学习 Spark、Flink 等框架仍有很大帮助。
核心要点回顾:
- 输入分片 → Map → Shuffle(分区、排序、拷贝) → Reduce → 输出
- 数据本地化、容错、推测执行
- Shuffle 是性能关键,需调优
- 适合离线批量处理,不适合实时和迭代计算
更多推荐
所有评论(0)