HDPCD-Java-复习笔记(5) 拼搏现实的明天。 2022-06-07 05:26 107阅读 0赞 **Partition and Sorting** **Partitioners** **![Center][]** **All values with the same key must be sent to the same Reducer.** If there is no Partitioner configured, MapReduce jobs use the HashPartitioner by default. The HashPartitioner uses the hashCode method of Object (along with the modulus operator) to determine how the records are partitioned. A group of records from the intermediate key space is assigned to each reduce node. These groups of records are called **partitions**, and**a partition** represents **the input of a Reducer**. **How a record gets assigned to a Reducer**: 1.The Mapper outputs <key, value> pairs (records). Once the map task is complete, the partitioning of records can begin. 2.The Partitioner is an object that defines a g**etPartition** method. **Each < key ,value > pair** is passed into the getPartition method, along with **the number of Reducers**. 3.The **getPartition** method **r****eturns an int** that determines which Reducer the < key ,value > pair is sent to. The **Default Partitioner** public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } } Writing a **Custom Partitioner** public class WordCountPartitioner extends Partitioner<Text, IntWritable> { @Override public int getPartition(Text key, IntWritable value, int numReduceTasks) { if (numReduceTasks == 1) { return 0; } return (key.toString().length() * value.get()) % numReduceTasks; } } The TotalOrderPartitioner accomplishes its task by using an external file that defines how the keys are split across partitions. **Involves two main steps:** 1.Create a partition file. 2.Share the partition file amongst all Mappers. Create a partition file * job.setPartitionerClass(TotalOrderPartitioner.class); * InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<Text, Text>(0.1, 200, 3); * InputSampler.writePartitionFile(job, sampler); • hadoop jar hadoop-core.jar org.apache.hadoop. • mapreduce.lib.partition.InputSampler •\-inFormat org.apache.hadoop.mapreduce.lib. • input.KeyValueTextInputFormat •\-keyClass org.apache.hadoop.io.Text •\-r 3 •\-splitInterval 0.1 3 population\_data.txt \_partition.lst **Distributing the Partition File** •String partitionFile = TotalOrderPartitioner.getPartitionFile(conf); •URI partitionUri = new URI(partitionFile + "\#" • \+ TotalOrderPartitioner.DEFAULT\_PATH); •job.addCacheFile(partitionUri); The **getPartitionFile** method returns the path to partition file. The **partitionUri** is a URI that represents the path that the**TotalOrderPartitioner** looks up when retrieving the partition file. The **addCacheFile** method adds the partition file to the **LocalResource** so each Container can have access to it. **Overview of Sorting** **![Center 1][]** The **shuffle/sort** phase**performs two key tasks**: 1.Keys are sorted in their natural order. 2.Keys that are equal are grouped together. Recall the key class has to be of type **WritableComparable**, which **forces a compareTo method to be defined**. The **compareTo**method creates what is called **a natural order for the keys.** **Grouping Comparator,** it decides **which keys are equal to each other**. If two keys are equal, their values get grouped together. **Secondary Sort** **![Center 2][]** The easiest way to **implement a secondary sort**is to **move part of the value into the key** to form **a composite key.** Here are the **three main steps**to follow**:** 1.**Write a custom key class that contains the secondary key.** While it may be possible to use a built-in Hadoop key class, typically it must be defined. 2.**Write a custom Grouping Comparator** to determine how keys are grouped. 3.**Write a custom Partitioner** that ensures grouped keys are sent to the same reducer. **Writing Custom Keys** A custom key class needs to implement the **WritableComparable** interface. public class CustomerKey implements WritableComparable<CustomerKey> { private int customerId; private String zipCode; @Override public int compareTo(CustomerKey arg0) { int result = this.zipCode - arg0.zipCode; return ((result != 0) ? result : (this.customerId - arg0.customerId)); } @Override public void readFields(DataInput in) throws IOException { this.customerId = in.readInt(); this.zipCode = in.readUTF(); } @Override public void write(DataOutput out) throws IOException { out.writeInt(customerId); out.writeUTF(zipCode); } //setters and getters... } Writing a Group Comparator A custom Group Comparator needs to implement the **WritableComparator** class. public class CustomerGroupComparator extends WritableComparator { protected CustomerGroupComparator() { super(CustomerKey.class, true); } @SuppressWarnings("rawtypes") @Override public int compare(WritableComparable a, WritableComparable b) { CustomerKey lhs = (CustomerKey) a; CustomerKey rhs = (CustomerKey) b; return lhs.getZipCode().compareTo(rhs.getZipCode()); } } [Center]: /images/20220607/d88e172290f241f582612b6cd46d149a.png [Center 1]: /images/20220607/f0a0fd2bf66a402a8ae72e0a1d70cc77.png [Center 2]: /images/20220607/d2528d3231c3499b95f07dd1a0797e19.png
还没有评论,来说两句吧...