hadoop案例:partition分区

淡淡的烟草味﹌ 2022-11-21 15:14 274阅读 0赞

文章目录

    • 输入数据
    • 期望结果
    • 需求分析
      • 自定义PhoneFlowBean
    • 自定义MyPartitioner
    • Mapper类
    • Reducer类
    • Driver类
    • 执行结果

输入数据

  1. 1 13736230513 192.196.100.1 www.hadoop.com 2481 24681 200
  2. 2 13846544121 192.196.100.2 264 0 200
  3. 3 13956435636 192.196.100.3 132 1512 200
  4. 4 13966251146 192.168.100.1 240 0 404
  5. 5 18271575951 192.168.100.2 www.hadoop.com 1527 2106 200
  6. 6 84188413 192.168.100.3 www.hadoop.com 4116 1432 200
  7. 7 13590439668 192.168.100.4 1116 954 200
  8. 8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200
  9. 9 13729199489 192.168.100.6 240 0 200
  10. 10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200
  11. 11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200
  12. 12 15959002129 192.168.100.9 www.hadoop.com 1938 180 500
  13. 13 13560439638 192.168.100.10 918 4938 200
  14. 14 13470253144 192.168.100.11 180 180 200
  15. 15 13682846555 192.168.100.12 www.qq.com 1938 2910 200
  16. 16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200
  17. 17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404
  18. 18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200
  19. 19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200
  20. 20 13768778790 192.168.100.17 120 120 200
  21. 21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200
  22. 22 13568436656 192.168.100.19 1116 954 200

期望结果

期望输出数据
手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。

需求分析

在这里插入图片描述

自定义PhoneFlowBean

  1. package com.mr.mypartition;
  2. import org.apache.hadoop.io.Writable;
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. public class PhoneFlowBean implements Writable {
  7. // 上传流量
  8. private long upFlow;
  9. // 下载流量
  10. private long downFlow;
  11. // 总流量 = 上传流量+下载流量
  12. private long sumFlow;
  13. public PhoneFlowBean() {
  14. }
  15. public PhoneFlowBean(long upFlow, long downFlow) {
  16. this.upFlow = upFlow;
  17. this.downFlow = downFlow;
  18. this.sumFlow = upFlow + downFlow;
  19. }
  20. public void write(DataOutput out) throws IOException {
  21. out.writeLong(upFlow);
  22. out.writeLong(downFlow);
  23. out.writeLong(sumFlow);
  24. }
  25. public void readFields(DataInput in) throws IOException {
  26. this.upFlow = in.readLong();
  27. this.downFlow = in.readLong();
  28. this.sumFlow = in.readLong();
  29. }
  30. @Override
  31. public String toString() {
  32. return upFlow +
  33. "\t" + downFlow +
  34. "\t" + sumFlow;
  35. }
  36. public void setUpFlow(long upFlow) {
  37. this.upFlow = upFlow;
  38. }
  39. public void setDownFlow(long downFlow) {
  40. this.downFlow = downFlow;
  41. }
  42. public void setSumFlow(long sumFlow) {
  43. this.sumFlow = sumFlow;
  44. }
  45. public long getUpFlow() {
  46. return upFlow;
  47. }
  48. public long getDownFlow() {
  49. return downFlow;
  50. }
  51. public long getSumFlow() {
  52. return sumFlow;
  53. }
  54. }

自定义MyPartitioner

  1. package com.mr.mypartition;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Partitioner;
  4. public class MyPartitioner extends Partitioner<Text, PhoneFlowBean> {
  5. // 根据手机号的前三位返回不同的分区号
  6. @Override
  7. public int getPartition(Text text, PhoneFlowBean phoneFlowBean, int numPartitions) {
  8. String substring = text.toString().substring(0, 3);
  9. if ("136".equals(substring)) {
  10. return 0;
  11. } else if ("137".equals(substring)) {
  12. return 1;
  13. } else if ("138".equals(substring)) {
  14. return 2;
  15. } else if ("139".equals(substring)) {
  16. return 3;
  17. } else {
  18. return 4;
  19. }
  20. }
  21. }

Mapper类

  1. package com.mr.mypartition;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import java.io.IOException;
  6. public class PhoneFlowMapper extends Mapper<LongWritable, Text, Text, PhoneFlowBean> {
  7. // 4. 定义对象,防止每次执行map方法重复创建对象
  8. Text phoneNumberText = new Text();
  9. PhoneFlowBean phoneFlowBean = new PhoneFlowBean();
  10. @Override
  11. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  12. // 1. 读取每行文本数据,按照分隔符进行切割
  13. String string = value.toString();
  14. String[] split = string.split("\t");
  15. // 2. 抽取手机号,上行和下行流量
  16. String phoneNumber = split[1];
  17. String upFlow = split[split.length - 3];
  18. String downFlow = split[split.length - 2];
  19. // 3. 组装 flowBean对象
  20. phoneFlowBean.setUpFlow(Long.parseLong(upFlow));
  21. phoneFlowBean.setDownFlow(Long.parseLong(downFlow));
  22. // flowBean.setSumFlow(Long.parseLong(upFlow)+Long.parseLong(downFlow));
  23. phoneNumberText.set(phoneNumber);
  24. // FlowBean flowBean = new FlowBean(Long.parseLong(upFlow), Long.parseLong(downFlow));
  25. // 5. 往reducer写出手机号,flowBean对象
  26. context.write(phoneNumberText, phoneFlowBean);
  27. }
  28. }

Reducer类

  1. package com.mr.mypartition;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. import java.io.IOException;
  5. public class PhoneFlowReducer extends Reducer<Text, PhoneFlowBean, Text, PhoneFlowBean> {
  6. // 4. 将创建FlowBean对象的方法提取出来
  7. PhoneFlowBean phoneFlowBean = new PhoneFlowBean();
  8. @Override
  9. protected void reduce(Text key, Iterable<PhoneFlowBean> values, Context context) throws IOException, InterruptedException {
  10. // 1. 定义上行总流量和下行总流量
  11. long upTotal = 0;
  12. long downTotal = 0;
  13. // 2. 迭代 values(封装的是一个个FlowBean对象,根据手机号来分组)
  14. for (PhoneFlowBean value : values) {
  15. upTotal += value.getUpFlow();
  16. downTotal += value.getDownFlow();
  17. }
  18. // 3. 继续封装FlowBean 对象
  19. phoneFlowBean.setUpFlow(upTotal);
  20. phoneFlowBean.setDownFlow(downTotal);
  21. phoneFlowBean.setSumFlow(upTotal + downTotal);
  22. // 5. 输出结果
  23. context.write(key, phoneFlowBean);
  24. }
  25. }

Driver类

  1. package com.mr.mypartition;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. import java.io.IOException;
  9. public class PhoneFlowDriver {
  10. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  11. // 0 指定路径
  12. args = new String[]{ "E:\\Hadoop\\src\\main\\resources\\input\\iphone", "E:\\Hadoop\\src\\main\\resources\\mypart"};
  13. // 1 获取配置信息configuration以及封装任务job
  14. Configuration configuration = new Configuration();
  15. Job job = Job.getInstance(configuration);
  16. // 2 设置Driver加载路径 setJarByClass
  17. job.setJarByClass(PhoneFlowDriver.class);
  18. // 3 设置map和reduce类 setMaper setReducer
  19. job.setMapperClass(PhoneFlowMapper.class);
  20. job.setReducerClass(PhoneFlowReducer.class);
  21. // 4 设置map输出 setmapoutputkey setmapoutputvalue
  22. job.setMapOutputKeyClass(Text.class);
  23. job.setMapOutputValueClass(PhoneFlowBean.class);
  24. // 5 设置最终输出kv类型 (reducer的输出kv类型) setoutoutkey setoutputvalue
  25. job.setOutputKeyClass(Text.class);
  26. job.setOutputValueClass(PhoneFlowBean.class);
  27. // 设置自定义的分区器
  28. job.setPartitionerClass(MyPartitioner.class);
  29. // 设置reduceTask 的个数,影响最终的文件个数
  30. job.setNumReduceTasks(5);
  31. // 6 设置本地的输入和输出路径 fileinputformat.setinputpath
  32. FileInputFormat.setInputPaths(job, new Path(args[0]));
  33. FileOutputFormat.setOutputPath(job, new Path(args[1]));
  34. // 7 提交
  35. boolean completion = job.waitForCompletion(true);
  36. System.exit(completion ? 0 : 1);
  37. }
  38. }

执行结果

在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,274人围观)

还没有评论,来说两句吧...

相关阅读

    相关 hive分区partition介绍

    一、简要概述 1. Hive分区更方便于数据管理,常见的有时间分区和业务分区。 二、hive分区原理 通过实例来理解Hive分区的原理: (一)多分区操作: 1.

    相关 Hive partition 分区

    分区表实际上就是对应一个HDFS文件系统上的独立的文件夹,该文件夹下是该分区所有的数据文件。Hive中的分区就是分目录,把一个大的数据集根据业务需要分割成小的数据集。在查询时通

    相关 MySQL数据库:分区Partition

    一、分区: 分区就是将表的数据按照特定规则存放在不同的区域,也就是将表的数据文件分割成多个小块,在查询数据的时候,只要知道数据数据存储在哪些区域,然后直接在对应的区域进行