HDPCD-Java-复习笔记(4) 川长思鸟来 2022-06-07 04:46 96阅读 0赞 **Map Aggregation** **Aggregation** The term refers to a Mapper combining its <key, value> pairs, with the goal of reducing the amount of network traffic between the Mapper and the Reducer. There are **two ways** to **perform Map Aggregation** in Hadoop: **Combiners ---** The MapReduce framework has the concept of a Combiner, where you write a class that defines the aggregation, and the framework decides when to perform the aggregation. **In-map Aggregation ---** The Mapper contains logic that aggregates records, typically accomplished by buffering records in memory prior to writing them out. **Overview of Combiners** **![Center][]** The < key ,value > records **output by the Mapper are serialized**, so the **Combiner has to deserialize them**. A Combiner only aggregates data on one node. It does not combine the output of multiple Mappers. **Reduce-side Combining** ![Center 1][] The Combiner is also used in the reduce phase if the intermediate <key,value> pairs from Mappers are spilled to disk. The fact that the Reducer uses the Combiner behind-the-scenes to improve file I/O. Counters The pre-defined counters include usefulinformation,like the number of map input records, or the amount of byteswritten to HDFS. The Hadoop counters are global -they are asummation of events that occurs across the entire cluster. **User-defined Counters** Two ways to define your own counter in Hadoop: 1.Use an enum to define a group,and the elements in the enum become the counter names. 2.Use strings for the group name and counter name. **Combiner Example** public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable outputValue = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable count : values) { sum += count.get(); } outputValue.set(sum); context.write(key, outputValue); } } In-Map Aggregation In-Map Aggregation Example public class TopResultsMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private ArrayList<Word> words = new ArrayList<Word>(); private PriorityQueue<Word> queue; private int maxResults; @Override protected void setup(Context context) throws IOException, InterruptedException { maxResults = Integer.parseInt(context.getConfiguration() .get("maxResults")); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] input = StringUtils.split(value.toString(), '\\', ' '); for (String word : input) { Word currentWord = new Word(word, 1); if (words.contains(currentWord)) { //increment the existing Word's frequency for (Word w : words) { if (w.equals(currentWord)) { w.frequency++; break; } } } else { words.add(currentWord); } } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { Text outputKey = new Text(); IntWritable outputValue = new IntWritable(); queue = new PriorityQueue<Word>(words.size()); queue.addAll(words); for (int i = 1; i <= maxResults; i++) { Word tail = queue.poll(); if (tail != null) { outputKey.set(tail.value); outputValue.set(tail.frequency); context.write(outputKey, outputValue); } } } } public class Word implements Comparable<Word> { public String value; public int frequency; public Word(String value, int frequency) { this.value = value; this.frequency = frequency; } @Override public boolean equals(Object obj) { if (obj instanceof Word) { return value.equalsIgnoreCase(((Word) obj).value); } else { return false; } } @Override public int compareTo(Word w) { return w.frequency - this.frequency; } } public enum MyCounters { GOOD_RECORDS, BAD_RECORDS } context.getCounter(MyCounters.GOOD_RECORDS).increment(1); [Center]: /images/20220607/8e553c8dcffa4925a28bdae9997c1ac4.png [Center 1]: /images/20220607/a1ad46bee1a14b2eb188d529a145fdb9.png
还没有评论,来说两句吧...