MapReduce 详细解读

MapReduce 是 Google 提出的分布式计算模型,也是 Hadoop 的核心组件。它将复杂的、运行在大型集群上的并行计算过程,高度抽象为两个阶段:Map(映射)Reduce(归约)


一、设计思想与核心概念

1.1 分而治之

  • Map:将输入数据切分成多个独立的小块,并行处理,输出中间键值对(<key, value>)。
  • Shuffle:自动将 Map 的输出按 key 进行排序、分组,并传输给对应的 Reduce。
  • Reduce:对每个 key 对应的 value 列表进行聚合计算,输出最终结果。

1.2 数据本地化

  • 移动计算比移动数据更高效。MapReduce 会将任务(Map/Reduce)调度到数据所在的节点上执行,减少网络传输。

1.3 容错性与简单性

  • 系统自动处理节点故障(任务重试)。开发者只需实现 map()reduce() 函数,无需关心分布式细节。

二、MapReduce 执行流程

整个作业的执行分为 Job 提交 → Map 阶段 → Shuffle 阶段 → Reduce 阶段 → 结果输出

2.1 总体流程图

输入分片 (splits) → Map任务 → 环形缓冲区(内存) → 分区/排序/溢写 → 归并(combine) → 
   → 多个Map输出文件 → 拷贝到Reducer → 归并排序 → 分组 → Reduce任务 → 输出文件

2.2 详细步骤

步骤 1:输入分片(Input Splits)
  • 客户端将输入数据(如 HDFS 中的文件)逻辑切分为多个 split(默认等于 HDFS 块大小,128MB)。
  • 每个 split 对应一个 Map 任务。split 不移动数据,只记录偏移量。
步骤 2:Map 阶段
  • 每个 Map 任务读取自己的 split,逐条调用 map(key, value, context)
  • Map 输出是一个键值对列表,写入环形内存缓冲区(默认 100MB)。
  • 缓冲区达到阈值(80%)时,后台线程开始溢写(spill) 到磁盘。
步骤 3:Shuffle(洗牌)—— Map 端
  • 分区(Partition):对每个 Map 输出的 key 计算分区号(默认 hash(key) % reduce个数),决定该键值对去哪个 Reduce。
  • 排序(Sort):在每个分区内,按键进行快速排序(内存中先排,溢写时归并)。
  • 合并(Combine,可选):类似本地 reduce,减少网络传输。例如 map 输出 <word,1>,combine 后变为 <word, count>
  • 溢写:多次溢写产生多个小文件,最终归并(merge) 成一个大文件(已分区且区内有序)。
步骤 4:Shuffle —— Reduce 端
  • Reduce 任务启动后,向各个 Map 任务的完成节点拉取(fetch) 属于自己的分区数据(HTTP 协议)。
  • 拉取的数据先放入内存缓冲区,不足则溢写到磁盘。
  • 对所有拉取到的文件(可能来自多个 Map)进行归并排序,形成一个大文件(按 key 有序)。
步骤 5:Reduce 阶段
  • 对归并后的数据,按键分组(相同 key 的 value 形成一个迭代器)。
  • 调用 reduce(key, values, context),处理该 key 的所有 values。
  • 输出最终结果到 HDFS(通常一个 Reduce 生成一个文件)。

三、关键组件与数据结构

组件 作用
JobTracker(旧版)/ ResourceManager 作业调度、资源管理、监控任务进度。
TaskTracker(旧版)/ NodeManager 执行具体的 Map/Reduce 任务。
InputFormat 定义如何切分输入文件,并创建 RecordReader 读取键值对。
OutputFormat 定义如何输出结果到 HDFS。
Partitioner 决定 Map 输出分配给哪个 Reduce。默认是哈希。
Combiner 本地聚合,继承自 Reducer,在 Map 端运行。
环形缓冲区 每个 Map 任务的内存缓冲,存储序列化的键值对及元数据。

四、Shuffle 深度解析

Shuffle 是 MapReduce 最核心、最复杂的部分,性能瓶颈通常在此。

4.1 Map 端 Shuffle 优化

  • 调整缓冲区大小io.sort.mb(默认 100MB),增大可减少溢写次数。
  • 溢写阈值map.sort.spill.percent(默认 0.8),适当调高可减少溢写。
  • Combine 使用:显著减少 Map 输出数据量,但需满足结合律(如求和、计数)。

4.2 Reduce 端 Shuffle 优化

  • 并行拉取线程mapred.reduce.parallel.copies(默认 5),提高拉取速度。
  • 内存缓冲比例mapred.job.shuffle.input.buffer.percent(默认 0.7),用于存储拉取的数据。
  • 归并因子io.sort.factor(默认 10),控制一次归并的文件数。

五、容错机制

5.1 任务重试

  • 如果某个 Map 或 Reduce 任务失败,JobTracker 会将其重新调度到其他节点,最多重试 mapred.map.max.attempts 次(默认 4)。

5.2 推测执行(Speculative Execution)

  • 当大部分任务已完成,个别任务运行缓慢(可能因硬件或负载),系统会在其他节点启动一个备份任务,取先完成的那个结果。可通过 mapred.map.tasks.speculative.execution 控制。

5.3 节点故障

  • 如果 TaskTracker 挂掉,该节点上所有已完成/未完成的任务都需要重新执行。JobTracker 将其上的任务调度到其他节点。

六、数据本地化

MapReduce 尽力让 Map 任务读取本地 HDFS 数据(块副本所在节点)。优先级:

  1. 本地节点(同一 DN)
  2. 同机架节点(网络延迟低)
  3. 其他机架

七、性能调优常用参数

参数 默认值 说明
mapreduce.map.memory.mb 1024 Map 任务容器内存
mapreduce.reduce.memory.mb 1024 Reduce 任务容器内存
mapreduce.map.java.opts -Xmx800m Map JVM 堆内存
mapreduce.task.io.sort.mb 100 Map 环形缓冲区大小
mapreduce.map.sort.spill.percent 0.8 溢写阈值
mapreduce.reduce.shuffle.parallelcopies 5 Reduce 并行拉取线程数
mapreduce.job.reduces 1 默认 Reduce 个数
mapreduce.input.fileinputformat.split.maxsize 256MB 最大 split 大小

八、MapReduce 编程模型示例(WordCount)

// Mapper 类
public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, Context context) 
            throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }
}

// Reducer 类
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}

九、MapReduce 的局限性

  1. 不适合实时/交互式查询:任务启动、调度、磁盘溢写开销大,延迟高(分钟级)。
  2. 迭代计算效率低:如机器学习算法,每轮迭代需读写 HDFS,产生大量 I/O。
  3. 不适合复杂 DAG 计算:多个 MapReduce 链式依赖,中间结果持久化,效率低。
  4. 开发复杂度:需要编写 Java 代码,相比 SQL 或高级 API 不够友好。

这些局限性催生了 Spark、Flink 等新一代计算引擎。


十、MapReduce vs Spark 对比

特性 MapReduce Spark
计算模型 两阶段(Map+Reduce),磁盘交互 DAG,内存计算
中间结果 写入 HDFS 优先内存,不足溢写
启动开销 每个任务启动 JVM 线程级复用
容错方式 任务级重算 血统(lineage)重算
适用场景 批处理、ETL 批处理、流处理、机器学习

十一、总结

MapReduce 是分布式计算的经典模型,它通过“分而治之”和“计算向数据移动”的理念,极大地简化了大规模数据处理的难度。尽管在新一代引擎面前显得有些“笨重”,但它奠定了分布式计算的基础,理解 MapReduce 的细节对学习 Spark、Flink 等框架仍有很大帮助。

核心要点回顾

  • 输入分片 → Map → Shuffle(分区、排序、拷贝) → Reduce → 输出
  • 数据本地化、容错、推测执行
  • Shuffle 是性能关键,需调优
  • 适合离线批量处理,不适合实时和迭代计算
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐