MapReduce自定义分区Partition

傷城~ 2023-10-04 16:34 149阅读 0赞

Partition分区

默认Partitioner 分区

  1. public class HashPartitioner<K, V> extends Partitioner<K, V> {
  2. public int getPartition(K key, V value, int numReduceTasks) {
  3. return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  4. }
  5. }

默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。

自定义Partitioner步骤

可以根据自己的需要自定义分区,将不同的key放置到响应的分区内

  1. 定义类继承Partitioner,重写getPartition()方法

    1. public class CustomPartitioner extends Partitioner<Text, FlowBean> {
    2. @Override
    3. public int getPartition(Text key, FlowBean value, int numPartitions) {
    4. // 控制分区代码逻辑
    5. return partition;
    6. }
    7. }
  2. Job驱动中,设置自定义Partitioner

    1. job.setPartitionerClass(CustomPartitioner.class);
  3. 自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask

    1. job.setNumReduceTasks(5);

分区总结:

  1. 如果ReduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
  2. 如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;
  3. 如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个
    ReduceTask,最终也就只会产生一个结果文件 part-r-00000;
  4. 分区号必须从零开始,逐一累加。

案例:按照id不同将csv文件分到不同的分区

自定义类继承Partitioner:CsvPartition:

  1. package com.gis507.test.partitionDemo;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Partitioner;
  4. public class CsvPartition extends Partitioner<Text,CsvBean> {
  5. @Override
  6. public int getPartition(Text text, CsvBean csvBean, int numPartitions) {
  7. int partition;
  8. String text1 = text.toString();
  9. if (text1.equals("1")){
  10. return 0;
  11. }else if (text1.equals("2")){
  12. return 1;
  13. }else if (text1.equals("3")){
  14. return 2;
  15. }else if (text1.equals("4")){
  16. return 3;
  17. }else{
  18. return 4;
  19. }
  20. }
  21. }

然后在Driver中设置分区

  1. package com.gis507.test.AISDataSort;
  2. import com.gis507.test.AISTest4.AISDataPartition;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import java.io.IOException;
  10. public class AISDataDriver {
  11. public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  12. //1、新建Job
  13. Configuration conf = new Configuration();
  14. //10、key和value之间使用","
  15. conf.set("mapred.textoutputformat.separator", ",");
  16. //11、mapreduce的内存设置
  17. conf.set("mapreduce.map.memory.mb","512");
  18. conf.set("mapreduce.reduce.memory.mb","512");
  19. Job job = Job.getInstance(conf);
  20. //2、设置jar包
  21. job.setJarByClass(AISDataDriver.class);
  22. //3、关联map和reduce关系
  23. job.setMapperClass(AISDataMapper.class);
  24. job.setReducerClass(AISDataReducer.class);
  25. //4、设置map的输出类型
  26. job.setMapOutputKeyClass(Text.class);
  27. job.setMapOutputValueClass(AISDataBean.class);
  28. //5、设置最终的输出类型
  29. job.setOutputKeyClass(Text.class);
  30. job.setOutputValueClass(AISDataBean.class);
  31. //8、设置分区
  32. job.setPartitionerClass(AISDataPartition.class);
  33. //9、设置分区个数
  34. job.setNumReduceTasks(5);
  35. //6、设置输入输出路径
  36. FileInputFormat.setInputPaths(job,new Path("D:\\AAUser\\dic\\Files\\testFile\\tests.csv"));
  37. FileOutputFormat.setOutputPath(job,new Path("D:\\AAUser\\dic\\Files\\testFile1"));
  38. //7、提交job
  39. boolean result = job.waitForCompletion(true);
  40. System.exit(result ? 0:1);
  41. }
  42. }

注意:

这里的key为Text类行,如果需要和String比较,需要转成String类型再进行比较

发表评论

表情:
评论列表 (有 0 条评论,149人围观)

还没有评论,来说两句吧...

相关阅读