首先欢迎各位来到我的博客很高兴能够在这里和您见面希望您在这里不仅可以有所收获同时也能感受到一份轻松欢乐的氛围祝你生活愉快如有需要请大家订阅我的专栏【大数据系列】哟我会定期更新相关系列的文章关注关注请关注请大家关注下博主您的支持是我不断创作的最大动力文章目录引言一、分而治之古老智慧遇上大数据二、Map Reduce从单词计数看懂一切2.1 直面问题单机好不好做2.2 Map 阶段各管一摊先出局部结果2.3 Shuffle洗牌Map 到 Reduce 的桥梁2.4 Reduce 阶段汇总全局结果三、不只是思想MapReduce 的完整运行机制3.1 从代码到执行3.2 输入分片如何决定启动多少个 Map3.3 中间结果如何处理四、代码实战从 WordCount 开始4.1 Maven 依赖4.2 Mapper 类4.3 Reducer 类4.4 Driver 类作业配置与提交4.5 打包与运行五、MapReduce 的“分”与“合”再深入5.1 Map 端还能做更多5.2 Partitioner决定哪个 key 去哪个 Reduce5.3 排序是 MapReduce 的内置特性六、MapReduce 的局限与演进6.1 为什么有人说 MapReduce“慢”6.2 Spark 的改进七、总结一张图回顾 MapReduce 的精髓引言从Google发布MapReduce论文至今已有二十年它仍然是理解大数据处理绕不开的第一课。很多人觉得MapReduce已经“过时”了——毕竟Spark、Flink跑得更快。但MapReduce带来的思想转变影响了此后所有的分布式计算框架把一个大问题拆成许多小问题分别解决再汇总结果。这八个字就是“分而治之”。一、分而治之古老智慧遇上大数据分而治之Divide and Conquer是一个极其古老的算法思想。维基百科上说分治法就是把一个复杂的问题分成两个或多个相同或相似的子问题再把子问题分成更小的子问题……直到最后子问题可以简单地直接求解然后将子问题的解合并成原问题的解。人类历史上最早的“分治法”例子大概是这样的要数清一大盒绿豆有多少粒一个人一粒粒数太慢。叫来100个人每人分一小把绿豆各自数清自己手里的粒数最后把所有人的结果加起来。这就是最原始的MapReduce。Google 在2004年发表的MapReduce论文中正是把这个古老思想迁移到了分布式计算场景。论文中给出的定义非常数学化MapReduce 是一种编程模型用于处理和生成大规模数据集。用户指定一个map函数通过它把输入键值对处理成一组中间键值对再指定一个reduce函数用于将具有相同中间键的所有中间值合并起来。简单说Map 负责分拆和初步处理Reduce 负责汇总和最终计算。大数据集Map 分而治之Map Task 1Map Task 2Map Task N...Shuffle 洗牌Reduce 聚合Reduce Task 1Reduce Task 2最终结果二、Map Reduce从单词计数看懂一切几乎所有的MapReduce教程都会用WordCount词频统计作为入门案例。因为它足够简单又能完整展现“分而治之”的全部步骤。问题给你几百个GB的文本文件统计每个单词出现的总次数。2.1 直面问题单机好不好做一台普通服务器单机处理几百GB的文本需要把整个文件读一遍内存装不下只能流式读用哈希表统计每个单词的次数。理论上可行但处理时间可能是几个小时甚至几天。如果文件是TB级别甚至是PB级别单机就不行了——要么内存装不下哈希表要么处理时间长得无法接受。MapReduce的思路是把这堆文件切成很多块每台机器处理一小块的计算结果是部分统计结果Map阶段然后把所有机器的部分统计结果合并成最终结果Reduce阶段。在这个过程中我们不需要关心文件有多大——只要集群的规模足够大总能把处理时间压缩到可接受的范围。2.2 Map 阶段各管一摊先出局部结果假设文本内容是这样的三行hello world hello hadoop hello mapreduce把这三行文本切分成三个独立的输入分片InputSplit三个Map任务同时处理。每个Map任务的逻辑完全相同输入的 key行号不重要输入的 value一行文本输出若干个 (word, 1) 键值对Map 1 处理第一行hello world输出(hello, 1) (world, 1)Map 2 处理第二行hello hadoop输出(hello, 1) (hadoop, 1)Map 3 处理第三行hello mapreduce输出(hello, 1) (mapreduce, 1)至此Map 阶段完成了“分”的动作——把原始数据拆成小块各自算出了一份局部结果。2.3 Shuffle洗牌Map 到 Reduce 的桥梁Map 阶段产出的中间结果不能直接送给 Reduce。MapReduce 框架会自动做一件事——把相同 key 的所有 value 归集到一起再送给 Reduce。这个过程叫 Shuffle洗牌是Hadoop里最复杂、最耗时的环节。对于上面的例子shuffle 之后会形成这样的数据结构hello - [1, 1, 1] world - [1] hadoop - [1] mapreduce - [1]只有hello这个 key 需要进入 Reduce 计算因为它有多个值。一个 key 只给一个 Reduce 任务但一个 Reduce 任务可以处理多个 key。2.4 Reduce 阶段汇总全局结果Reduce 任务的输入是(hello, [1,1,1])。Reduce 的逻辑就是把列表里所有的 1 加起来sum 0 for each v in values: sum v output (hello, sum)最终三个 Reduce 任务会输出(hello, 3) (world, 1) (hadoop, 1) (mapreduce, 1)所有输出合并在一起就是最终结果。大功告成。三、不只是思想MapReduce 的完整运行机制分而治之是灵魂但要让它在分布式环境下稳定运行还需要一整套精心设计的机制。3.1 从代码到执行我们写的 MapReduce 程序在Hadoop中会经历四个层级的变化层级作用Job用户提交的完整计算任务TaskJob 拆解出的 Map Task 或 Reduce TaskAttempt每个 Task 可以重试多次每次重试就是一个 AttemptRecordTask 处理的最小数据单元通常是文件中的一行3.2 输入分片如何决定启动多少个 MapHDFS 上存储的大文件被切分成 Block默认128MB。MapReduce 在处理时会按照逻辑上的分片InputSplit来决定启动多少个 Map 任务。通常一个分片对应一个 Block所以一个128MB的 Block 就会启动一个 Map 任务去处理它。但也不一定。分片大小是可调的比如设置分片大小为64MB那么128MB的文件会被切成两个分片启动两个 Map 任务。分片数量的多少决定了 Map 任务的并行度。3.3 中间结果如何处理Map 的输出不是直接写到 HDFS——那样太慢了。Map 的输出先写到本地磁盘的缓冲区满了之后溢写到本地文件系统的临时文件。Reduce 再从每个 Map 所在的节点上拉取对应分区的数据。Shuffle 过程是所有分布式计算框架的难点因为它涉及大量的网络传输和排序操作。Hadoop 在 1.x 和 2.x 中花了极大的精力优化这个过程。四、代码实战从 WordCount 开始下面是一个完整的 WordCount 示例使用 Hadoop 新版 APImapreduce包非旧的mapred包。4.1 Maven 依赖dependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion3.3.6/version/dependency4.2 Mapper 类packagecom.example.wordcount;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importjava.io.IOException;importjava.util.StringTokenizer;publicclassWordCountMapperextendsMapperLongWritable,Text,Text,IntWritable{privatefinalstaticIntWritableonenewIntWritable(1);privateTextwordnewText();Overrideprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{// 获取一行文本Stringlinevalue.toString();// 按空格切分单词StringTokenizertokenizernewStringTokenizer(line);while(tokenizer.hasMoreTokens()){word.set(tokenizer.nextToken());// 输出 (word, 1)context.write(word,one);}}}4.3 Reducer 类packagecom.example.wordcount;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;importjava.io.IOException;publicclassWordCountReducerextendsReducerText,IntWritable,Text,IntWritable{privateIntWritableresultnewIntWritable();Overrideprotectedvoidreduce(Textkey,IterableIntWritablevalues,Contextcontext)throwsIOException,InterruptedException{intsum0;for(IntWritableval:values){sumval.get();}result.set(sum);context.write(key,result);}}4.4 Driver 类作业配置与提交packagecom.example.wordcount;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;publicclassWordCountDriver{publicstaticvoidmain(String[]args)throwsException{if(args.length!2){System.err.println(Usage: WordCountDriver input path output path);System.exit(-1);}ConfigurationconfnewConfiguration();JobjobJob.getInstance(conf,word count);// 设置主类job.setJarByClass(WordCountDriver.class);// 设置 Mapper 和 Reducer 类job.setMapperClass(WordCountMapper.class);job.setCombinerClass(WordCountReducer.class);// 可选用于本地聚合优化job.setReducerClass(WordCountReducer.class);// 设置输出 key/value 类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 设置输入输出路径FileInputFormat.addInputPath(job,newPath(args[0]));FileOutputFormat.setOutputPath(job,newPath(args[1]));// 提交作业并等待完成System.exit(job.waitForCompletion(true)?0:1);}}4.5 打包与运行# 打包成 jarmvn clean package# 上传测试文件到 HDFShdfs dfs-mkdir-p/user/hadoop/input hdfs dfs-put/path/to/your/textfile.txt /user/hadoop/input/# 运行作业hadoop jar target/wordcount-1.0-SNAPSHOT.jar com.example.wordcount.WordCountDriver\/user/hadoop/input /user/hadoop/output# 查看结果hdfs dfs-cat/user/hadoop/output/part-r-00000五、MapReduce 的“分”与“合”再深入5.1 Map 端还能做更多MapReduce 允许在 Map 之后、Reduce 之前加一个Combiner本地归约器本质上就是在 Map 端先对自己的局部结果做一次 Reduce。上例中的 Combiner 直接用了 Reducer 类——因为 WordCount 的合并操作满足交换律和结合律。用了 Combiner 之后Map 端输出(hello, 1), (hello, 1), (hello, 1)会被先合并成(hello, 3)才发给 Reduce网络传输的数据量大大减少。5.2 Partitioner决定哪个 key 去哪个 ReduceReduce 默认只有一个但生产环境不会这样——只有一个 Reduce 能力太差。当 Reduce 数量 1 时就需要一个规则来分配 key 给不同的 Reduce。默认的分配规则是哈希取模partition hash(key) % numReduceTasks这样同一个 key 总是进入同一个 Reduce 分区才能保证后续计算正确。5.3 排序是 MapReduce 的内置特性在数据交给 Reduce 之前MapReduce 框架会按照 key 对中间结果进行排序。这个排序是自动发生的不需要用户编写任何排序代码。它带来了两个好处Reduce 阶段处理时相同 key 的 value 天然地连续排列在一起Reduce 可以按顺序处理 key方便实现某些需要排序输出的业务这也解释了为什么 MapReduce 在排序类的任务中特别擅长比如全局排序、Top N。六、MapReduce 的局限与演进6.1 为什么有人说 MapReduce“慢”MapReduce 的设计目标不是快而是稳——它能处理极大规模的数据而且容错性极强适合一次写、多次读的批处理场景。但它的短板也很明显问题原因中间结果写磁盘Map 的输出写到本地磁盘Reduce 再从磁盘读取IO 开销大启动开销大每个任务都是一个独立的 JVM 进程启停耗时长不适合迭代计算机器学习算法往往需要多轮迭代每轮都要重启作业不适合实时计算从作业提交到结果产出分钟级延迟是常态6.2 Spark 的改进Spark 把中间结果尽量放在内存中大幅提升了迭代计算的速度。但在很多场景下如超大数据的 ETLHDFS 的稳定性和成本优势依然让 MapReduce 有一席之地。今天的大数据平台往往是混合架构HDFS 做存储YARN 或 Kubernetes 做资源调度Spark / Flink / Hive 做计算。MapReduce 虽然不再像十年前那样耀眼但它开创的思想已经成为这片土地的基础。七、总结一张图回顾 MapReduce 的精髓Reduce 阶段Shuffle 阶段Map 阶段输入数据分片Map Task 1Map Task 2Map Task N分区 排序 归并Reduce Task 1Reduce Task 2输出文件 1输出文件 2核心要点分而治之把大任务拆小并行处理计算向数据移动把代码分发到数据所在的节点而不是把数据搬运到代码所在的位置容错性优先任务失败自动重试整个作业不会因为单点故障而失败抽象隐藏复杂性用户只关心 Map 和 Reduce 的业务逻辑分布式执行的细节由框架处理MapReduce 不是最快的计算引擎但它一定是最好的入门教材。理解了它你就能理解所有分布式计算框架最底层的逻辑。你第一次接触 MapReduce 时理解“分而治之”花了多长时间有没有什么让你恍然大悟的瞬间欢迎评论区分享你的故事❤️❤️❤️觉得有用的话点个赞 呗。❤️❤️❤️本人水平有限如有纰漏欢迎各位大佬评论批评指正如果觉得这篇文对你有帮助的话也请给个点赞、收藏下吧非常感谢! Stay Hungry Stay Foolish 道阻且长,行则将至,让我们一起加油吧