Mapreduce中value集合的二次排序 梦里梦外; 2022-09-19 01:29 122阅读 0赞 Hadoop的MapReduce模型支持基于key的排序,即在一次MapReduce之后,结果都是按照key的大小排序的。但是在很多应用情况下,我们需要对映射在一个key下的value集合进行排序,即“secondary sort”。 在《hadoop the definate guide》的P227的“secondary sort”章节中,以<year,temperature>为例,在map阶段按照year来分发temperature,在reduce阶段按照同一year对应的temperature大小排序。 本文以<String, Int>格式为例,先介绍已知类型的二次排序,再介绍泛型<key,value>集合的二次排序。 设输入为如下的序列对: str1 3 str2 2 str1 1 str3 9 str2 10 我们期望的输出结果为: str1 1,3 str2 2,10 str3 9 1 value集合的二次排序 (1)先定义一个TextInt类,将String及int对象封装为一个整体。由于TextInt在mapreduce中要作为key进行比较,必须实现WritableComparable接口。如下: public class TextInt implements WritableComparable<TextInt>\{ private String text ; private int value; public TextInt()\{\} public TextInt(String text , int value)\{ this.text = text; this.value = value; \} public String getFirst()\{ return this.text; \} public int getSecond()\{ return this.value; \} @Override public void readFields(DataInput in) throws IOException \{ text = in.readUTF(); value = in.readInt(); \} @Override public void write(DataOutput out) throws IOException \{ out.writeUTF(text); out.writeInt(value); \} @Override public int compareTo(TextInt that) \{ return this.text.compareTo(that.text); \} \} (2)我们用KeyValueTextInputFormat的方式来读取输入文件,以tab等分割符切分输入的<key,value>对,key,value都是Text类型。所以在mapper阶段需要把将value还原到int数据,同时封装String及int为TextInt; @Override public void map(Text key, Text value, OutputCollector<TextInt,IntWritable> output, Reporter reporter) throws IOException \{ int intValue = Integer.parseInt(value.toString()); TextInt ti = new TextInt(key.toString(), intValue); output.collect(ti, new IntWritable(intValue)); \} (3)在reduce阶段,为了方便查看输出数据,我们把同一个string对应的int数据封装在一起,如下: @Override public void reduce(TextInt key, Iterator<IntWritable> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException \{ StringBuffer combineValue = new StringBuffer(); while( values.hasNext())\{ int value = values.next().get(); combineValue.append(value + ","); \} output.collect(new Text(key.getFirst()),newText(combineValue.toString())); \} (4)上面的map及reduce跟普通的mapreduce没什么区别,很容易理解,但真正实现二次排序的是以下两个comparator及一个partitioner。 1)TextIntComparator:先比较TextInt的String,再比较value; public static class TextIntComparator extends WritableComparator\{ public TextIntComparator()\{ super(TextInt.class, true); //注册comparator \} @Override public int compare(WritableComparable a, WritableComparable b) \{ TextInt o1 = (TextInt)a; TextInt o2 = (TextInt)b; if ( !o1.getFirst().equals(o2.getFirst()))\{ return o1.getFirst().compareTo(o2.getFirst()); \} else\{ return o1.getSecond() - o2.getSecond(); \} \}\} 2)TextComparator: 只比较TextInt中的String。 public static class TextComparator extends WritableComparator\{ public TextComparator()\{ super(TextInt.class, true); \} @Override public int compare(WritableComparable a, WritableComparable b) \{ TextInt o1 = (TextInt)a; TextInt o2 = (TextInt)b; return o1.getFirst().compareTo(o2.getFirst()); \} \} 3) PartitionByText:根据TextInt中的String来分割TextInt对象: public static class PartitionByText implementsPartitioner<TextInt, IntWritable>\{ @Override public int getPartition(TextInt key, IntWritable value, intnumPartitions) \{ return (key.getFirst().hashCode() & Integer.MAX\_VALUE) %numPartitions; \} @Override public void configure(JobConf job) \{\} \} (5)OK,基础工作都完成了,现在看实际的job调用: //……define input & output //定义Job JobConf conf = new JobConf(Join.class); conf.setJobName("sort by value"); //add inputpath: FileInputFormat.addInputPath(conf, new Path(input)); conf.setInputFormat(KeyValueTextInputFormat.class); conf.setMapperClass(Mapper.class); conf.setMapOutputKeyClass(TextInt.class); conf.setMapOutputValueClass(IntWritable.class); conf.setOutputKeyComparatorClass(TextIntComparator.class); conf.setOutputValueGroupingComparator(TextComparator.class); conf.setPartitionerClass(PartitionByText.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); conf.setReducerClass(Reduce.class); conf.setOutputFormat(TextOutputFormat.class); //output FileOutputFormat.setOutputPath(conf, new Path(output)); JobClient.runJob(conf); 通过上面的job执行,在output的文件中,就能看到类似的输出结果了: str1 1,3 str2 2,10 str3 9 上面介绍了如何实现,在“知其然”的基础上,我们进一步“知其所以然”。首先,我们需要把相同string的int数据分发到同一个reducer,这就是PartitionByText的作用;其次,用TextComparator控制reduce阶段int数据集合的group,即把相同的string对应的int数据组装在一起;最后,用TextIntComparator实现在reduce阶段的排列方式。 2 泛型value的二次排序 在上一个章节中,将已知类型的key和value封装成TextInt对象,然后做二次排序。在很多情况下,我们需要对不同类型的key或value做二次排序,或者value是一个二元组/多元组,实现多级的排序,这时可以使用泛型的key或value。 定义泛型的key/value组合对象,key和value均要实现WritableComparable接口。 public static class CombinedObject implements WritableComparable\{private static Configuration conf = new Configuration(); private Class<? extends WritableComparable> firstClass; private Class<? extends WritableComparable> secondClass; private WritableComparable first ; private WritableComparable second; public CombinedObject()\{ \} public CombinedObject(Class<? extends WritableComparable>keyClass, Class<? extends WritableComparable> valueClass)\{ if (keyClass == null || valueClass == null) \{ throw new IllegalArgumentException("nullvalueClass"); \} this.firstClass = keyClass; this.secondClass = valueClass; first = ReflectionUtils.newInstance(firstClass, conf); second = ReflectionUtils.newInstance(secondClass, conf); \} public CombinedObject(WritableComparable f, WritableComparables)\{ this(f.getClass(), s.getClass()); setFirst(f); setSecond(s); \} public void setFirst(WritableComparable key)\{ this.first =key; \} public void setSecond(WritableComparable value)\{ this.second = value; \} public WritableComparable getFirst()\{ return this.first; \} @Override public void readFields(DataInput in) throws IOException \{ String firstClassName = in.readUTF(); String secondClassName = in.readUTF(); firstClass = (Class<? extends WritableComparable>)WritableName.getClass(firstClassName, conf); secondClass = (Class<? extends WritableComparable>)WritableName.getClass(secondClassName, conf); first = ReflectionUtils.newInstance(firstClass, conf); second = ReflectionUtils.newInstance(secondClass, conf); first.readFields(in); second.readFields(in); \} @Override public void write(DataOutput out) throws IOException \{ out.writeUTF(firstClass.getName()); out.writeUTF(secondClass.getName()); first.write(out); second.write(out); \} @Override public boolean equals(Object o) \{ if( o instanceof CombinedObject)\{ CombinedObject that = (CombinedObject)o; return that.first.equals(this.first); \} return false; \} @Override public int hashCode() \{ return first.hashCode(); \} @Override public int compareTo(Object o) \{ CombinedObject that = (CombinedObject)o; return that.first.compareTo(this.first); \} \} 注意: 在Writable接口的write(DataOutputout)及readFields(DataInput in)函数中,需要考虑firstClass及secondClass的序列化及反序列化。 在comparator接口的equal,hashCode,compareTo三个具体实现中,只考虑first的比较即可。 KeyComparator:只比较first public static class KeyComparator extends WritableComparator\{ public KeyComparator()\{ super(CombinedObject.class, true); \} @Override public int compare(WritableComparable a, WritableComparable b) \{ CombinedObject o1 = (CombinedObject)a; CombinedObject o2 = (CombinedObject)b; return o1.first.compareTo(o2.first); \} \} KeyValueComparator:比较first之后,再比较second。 public static class KeyValueComparator extends WritableComparator\{ public KeyValueComparator()\{ super(CombinedObject.class, true); \} @Override public int compare(WritableComparable a, WritableComparable b) \{ CombinedObject o1 = (CombinedObject)a; CombinedObject o2 = (CombinedObject)b; if ( !o1.first.equals(o2.first))\{ return o1.first.compareTo(o2.first); \} else\{ return o2.second.compareTo(o1.second); //descend \} \} \} KeyPartitioner:根据first来分发CombinedObject。 public static class KeyPartitioner implementsPartitioner<CombinedObject, Writable>\{ @Override public int getPartition(CombinedObject key, Writable value, intnumPartitions) \{ return (key.getFirst().hashCode() & Integer.MAX\_VALUE) %numPartitions; \} @Override public void configure(JobConf job) \{ \} \} 具体的调用方式: job.setMapperClass(ContentMapper.class); job.setMapOutputKeyClass(CombinedObject.class); // sort value list job.setOutputKeyComparatorClass(KeyValueComparator.class); job.setOutputValueGroupingComparator(KeyComparator.class); job.setPartitionerClass(KeyPartitioner.class); job.setReducerClass(SameInfoReducer.class); 在这个章节介绍了对泛型的key和value的二次排序,可以设计更多组合形式的<key,value>的排序,也可以自定义多元组构成value,实现更多灵活的排序方式。 本文转自《hadoop开发者》第二期。
还没有评论,来说两句吧...