Hadoop个人心得笔记之MapReduce 小灰灰 2021-12-18 06:43 412阅读 0赞 # MapReduce # **目录** MapReduce 一、概述 二、序列化机制 三、分区 四、排序 五、合并 六、数据本地化策略 七、job的执行流程 八、Shuffle过程 1.Map端的Shuffle 注意: 2.Reduce端的Shuffle 注意: 3.Shuffle调优 九、小文件处理 -------------------- -------------------- # 一、概述 # 1. 是Hadoop中的一套分布式的计算框架 2. 将整个计算过程拆分为2个阶段:Map阶段和Reduce阶段 3. Map阶段负责数据的整理,Reduce阶段负责数据的汇总 4. Reducer中的迭代器采用的是地址复用机制 5. Reducer中的迭代器只能遍历一次 6. 如果Mapper和Reducer的结果类型一致,可以只设置一个 7. 如果输入路径是一个文件,则MapReduce只处理这个文件;如果输入的是一个目录,则处理这个目录下的所有的文件 --- 注意:如果文件以\_开头,则该文件会被跳过。在MapReduce中,\_开头的文件被认为是隐藏文件不需要处理 8. 在MapReduce中,如果不指定,默认键值之间用\\t分隔 9. 如果Reducer没有业务逻辑,可以省略Reducer # 二、序列化机制 # 1. 在MapReduce中,要求被传输的数据必须能够被序列化 2. Hadoop中,序列化机制默认使用AVRO,但是Hadoop对AVRO的序列化机制进行了进一步的封装,提供了更简单的序列化机制 3. 在Hadoop要想实现序列化,需要实现Writable,重写其中的方法 4. 在Hadoop中,序列化的时候,要求属性不能为null 5. 案例:按人统计每一个的总流量 - cn.tedu.flow 6. 练习:score.txt - 计算每一个人的总成绩 - cn.tedu.serialscore # 三、分区 # 1. 分区在MapReduce中用于进行数据的分类 2. 在MapReduce中,如果不指定,则默认只有1个分区 3. 每一个分区都必须对应一个ReduceTask,每一个ReduceTask都会产生一个结果文件 4. 在MapReduce对分区进行了编号,编号默认从0开始递增 5. 分区的顶级父类是Partitioner 6. 在MapReduce中,默认使用的HashPartitioner 7. 案例:分地区统计不同的人花费的总流量 - cn.tedu.partflow # 四、排序 # 1. 在MapReduce中,会对键做自动的排序 - 自然排序 2. 如果自定义一个类产生的对象要想作为键,那么这个对象必须要允许被排序 - WritableComparable 3. 多属性排序的场景称之为二次排序 4. 练习:profit2.txt - 先按照月份进行升序,如果月份一样,则按照业绩降序 - cn.tedu.sortprofit # 五、合并 # 1. 在MapReduce中,因为默认只有1个Reduce,所以所有的计算压力都会落在Reduce上,导致Reduce的计算性能成了整个MapReduce的瓶颈,所以为了提高计算效率,可以在数据传输到Reduce之前先对数据进行一次整合,这个过程称之为合并 - Combine 2. 合并不会影响最终的计算结果 3. 一般而言,Combine和Reduce的逻辑是一致的,只需要在Driver中添加job.setCombinerClass(class); # 六、数据本地化策略 # 1. 当JobTracker接收到应用之后,会去访问NameNode获取要处理的文件信息 2. NameNode将文件信息返回给JobTracker 3. JobTracker收到文件信息之后会将文件进行切片(只包含切片信息不包含实际数据),一般习惯上,会将切片和切块设置的一样大。每一个切片会分给一个MapTask 4. JobTracker会MapTask分发到TaskTracker来执行。在分发的时候,哪一个DataNode上有对应的Block,那么MapTask就会分发这个节点上 - 数据本地化 5. 数据本地化的目的:减少对网络资源的消耗 ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xpdVk1MjE_size_16_color_FFFFFF_t_70][] # 七、job的执行流程 # 1. 客户端提交一个job任务到JobTracker: hadoop jar xxx.jar 2. JobTracker收集环境信息: 1. 检测类型是否匹配 2. 检测输入/输出路径是否合法 3. JobTracker给job分配一个全局递增的jobid,然后将jobid返回给客户端 4. 客户端收到jobid之后,将jar包提交到HDFS上 5. 准备执行job任务 6. JobTracker会将job任务进行划分,划分为MapTask和ReduceTask,其中MapTask的数量由切片数量决定,ReduceTask的数量由Partitioner/numReduceTask决定 7. JobTracker等待TaskTracker的心跳。一般TaskTracker和DataNode会设置为同一个节点。当TaskTracker发送心跳信息的时候,这个时候JobTracker就会给TaskTracker分配任务。注意:在分配的时候,MapTask符合数据本地化策略(当TaskTracker上有这个数据的时候才会将MapTask分给它),ReduceTask分配到相对空闲的节点上 8. TaskTracker领取到任务之后,到对应的节点下载jar包 - 体现的思想:逻辑移动而数据固定 9. TaskTracker会在本节点内启动JVM子进程执行MapTask或者是ReduceTask - 注意:每一个MapTask或者ReduceTask都会启动一次JVM子进程 # 八、Shuffle过程 # ## 1.Map端的Shuffle ## 1. 每一个Split会分给一个MapTask来处理 2. MapTask默认是对数据进行按行读取,每读取一行调用一次map方法 3. map方法在处理完一行数据的时候会将数据写出到缓冲区中 4. 每一个MapTask都自带了一个缓冲区,缓冲区维系在内存中 5. 缓冲区默认大小是100M 6. 当缓冲区达到条件的时候,将缓冲区中的数据写到本地磁盘上,这个过程称之为溢写(Spill),产生的文件称之为溢写文件 7. 溢写之后,后续产生的数据会继续写到缓冲区中 8. 溢写过程可能发生不止一次 9. 当map将结果放到缓冲区中之后,结果在缓冲区中会进行分区和排序过程;如果指定了Combiner,那么数据在缓冲区中还会进行合并 10. 如果所有数据处理完成,但是有一部分数据在缓冲区中,那么将缓冲区中的数据冲刷到上一次的溢写文件中 11. 因为可能会产生多次溢写,那么会产生多个溢写文件,在结果发往Reduce之前,会将多个溢写文件进行合并(merge),将多个溢写文件合并成一个结果文件 ### 注意: ### 1. 溢写过程不一定会发生。如果没有发生溢写过程,则将缓冲区中的数据直接冲刷到最后的结果文件中;如果只有一次溢写,那么这个溢写文件就是最后的结果文件 2. merge过程也不一定发生 3. 如果溢写文件个数>=3个,那么在merge过程中,自动进行一次combine(指定了Combiner) 4. 在merge过程中,结果会进行整体的分区和排序 5. 初始数据量并不能决定溢写次数 6. 阈值默认是0.8 - 即缓冲区的使用达到这个大小的80%的时候,就开始溢写 7. 缓冲区本质上是一个字节数组,而且是一个环形的缓冲区,目的是为了重复利用缓冲区 ## 2.Reduce端的Shuffle ## 1. ReduceTask启动fetch线程去MapTask上抓取数据,只抓取当前要处理的分区的数据,将抓取的数据放到文件中,每从一个MapTask中抓取到数据就会产生一个数据文件 2. 抓取完数据之后,将这个数据文件再次进行merge 3. 在merge过程中,对数据再次进行排序 4. merge完成之后,将相同的键所对应的值放入一个List集合,然后利用List集合去产生迭代器,这个过程称之为分组(group) 5. 每一组会调用一次reduce方法 6. 每一个ReduceTask会产生一个结果文件 ### 注意: ### 1. 默认fetch线程的数量为5,表示每一个ReduceTask会启动5个线程去抓取数据 2. fetch线程通过http请求去抓数据 3. merge因子默认为10,表示每10个数据文件进行一次合并,最后合成1个数据文件 4. reduce阈值默认为0.05,表示当有5%的MapTask结束,那么启动ReduceTask开始抓取数据 ## 3.Shuffle调优 ## 1. 调大缓冲区,实际开发中,一般会将缓冲区设置为250~400M左右 2. 尽量添加Combiner,不是所有场景下都可以添加Combiner 3. 调大缓冲区的阈值 - 这种方式不推荐 4. 将MapTask的结果进行压缩 - 这种方式能够有效的减少网络的传输时间 - 这种方式不算优化,只是在节点资源和网络资源之间进行取舍 5. 增加fetch线程的数量 - 考虑服务器的线程承载量 6. 增大merge因子 - 不推荐 7. 降低Reduce的阈值 - 不推荐 # 九、小文件处理 # 1. 危害: 1. 存储:每一个小文件在存储的时候都会产生一条元数据,如果存储大量的小文件,会产生大量的元数据,导致NameNode的效率变低;如果小文件数量过多,可能会导致NameNode的内存崩溃 1. 计算:每一个小文件都会作为一个切片来处理,每一个切片要对应一个MapTask,意味着如果产生了大量的小文件,会对应大量的切片,则需要大量的MapTask来处理。导致整个集群的并发量要增加,如果超过集群的承载能力导致服务器宕机 1. 处理手段:合并(将多个小文件合并成一个大文件、利用一个任务处理多个小文件)、压缩 2. 合并: 1. 手动合并:手动编码、合并工具 2. har包 - Hadoop Archive 3. CompositeInputFormat - 多源输入 3. har:将多个小文件合并成一个文件,并且达成一个har包 [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0xpdVk1MjE_size_16_color_FFFFFF_t_70]: /images/20211218/bdb80c30088c44ebb6af6f609f847fd7.png
相关 Hadoop个人心得笔记之上传文件错误 Hadoop上传文件错误 目录 Hadoop上传文件错误org.apache.hadoop.ipc.Rem 深藏阁楼爱情的钟/ 2021年12月20日 05:23/ 0 赞/ 600 阅读
相关 Hadoop个人心得笔记(一) Hadoop个人心得笔记(一) Volume:数据量大,包括采集、存储和计算的量都非常大。大数据的起始计 小鱼儿/ 2021年12月20日 04:31/ 1 赞/ 383 阅读
相关 Flume个人心得笔记 一、概述 1. Flume是Apache提供的开源的、分布式的、可靠的日志收集系统 2. 能够有效的收集、聚合、传输大量的日志数据 3. Flume有2个版本:Fl 布满荆棘的人生/ 2021年12月16日 12:25/ 0 赞/ 408 阅读
还没有评论,来说两句吧...