MapReduce自定义分区Partition
Partition分区
默认Partitioner 分区
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。
自定义Partitioner步骤
可以根据自己的需要自定义分区,将不同的key放置到响应的分区内
定义类继承Partitioner,重写getPartition()方法
public class CustomPartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {
// 控制分区代码逻辑
… …
return partition;
}
}
Job驱动中,设置自定义Partitioner
job.setPartitionerClass(CustomPartitioner.class);
自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
job.setNumReduceTasks(5);
分区总结:
- 如果ReduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
- 如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;
- 如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个
ReduceTask,最终也就只会产生一个结果文件 part-r-00000; - 分区号必须从零开始,逐一累加。
案例:按照id不同将csv文件分到不同的分区
自定义类继承Partitioner:CsvPartition:
package com.gis507.test.partitionDemo;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class CsvPartition extends Partitioner<Text,CsvBean> {
@Override
public int getPartition(Text text, CsvBean csvBean, int numPartitions) {
int partition;
String text1 = text.toString();
if (text1.equals("1")){
return 0;
}else if (text1.equals("2")){
return 1;
}else if (text1.equals("3")){
return 2;
}else if (text1.equals("4")){
return 3;
}else{
return 4;
}
}
}
然后在Driver中设置分区
package com.gis507.test.AISDataSort;
import com.gis507.test.AISTest4.AISDataPartition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class AISDataDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1、新建Job
Configuration conf = new Configuration();
//10、key和value之间使用","
conf.set("mapred.textoutputformat.separator", ",");
//11、mapreduce的内存设置
conf.set("mapreduce.map.memory.mb","512");
conf.set("mapreduce.reduce.memory.mb","512");
Job job = Job.getInstance(conf);
//2、设置jar包
job.setJarByClass(AISDataDriver.class);
//3、关联map和reduce关系
job.setMapperClass(AISDataMapper.class);
job.setReducerClass(AISDataReducer.class);
//4、设置map的输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(AISDataBean.class);
//5、设置最终的输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(AISDataBean.class);
//8、设置分区
job.setPartitionerClass(AISDataPartition.class);
//9、设置分区个数
job.setNumReduceTasks(5);
//6、设置输入输出路径
FileInputFormat.setInputPaths(job,new Path("D:\\AAUser\\dic\\Files\\testFile\\tests.csv"));
FileOutputFormat.setOutputPath(job,new Path("D:\\AAUser\\dic\\Files\\testFile1"));
//7、提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0:1);
}
}
注意:
这里的key为Text类行,如果需要和String比较,需要转成String类型再进行比较
还没有评论,来说两句吧...