Mapreduce求TopK最大值

素颜马尾好姑娘i 2022-08-26 12:15 224阅读 0赞
  1. package suanfa;
  2. import java.io.IOException;
  3. import java.net.URI;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.FileSystem;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.NullWritable;
  9. import org.apache.hadoop.io.Text;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  16. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  17. import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
  18. public class TopK {
  19. public static final String INPUT_PATH="hdfs://192.168.0.9:9000/seq100w.txt";
  20. public static final String OUTPUT_PATH="hdfs://192.168.0.9:9000/maxseq";
  21. public static void main(String[] args) throws Exception {
  22. Configuration conf = new Configuration();
  23. Job job = new Job(conf,TopK.class.getSimpleName());
  24. //设置输入路径
  25. FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
  26. //设置输入格式化
  27. job.setInputFormatClass(TextInputFormat.class);
  28. //设置自定义map
  29. job.setMapperClass(MyMapper.class);
  30. //设置map输出类型
  31. job.setMapOutputKeyClass(LongWritable.class);
  32. job.setMapOutputValueClass(NullWritable.class);
  33. //分区
  34. job.setPartitionerClass(HashPartitioner.class);
  35. //设置reduce任务
  36. job.setNumReduceTasks(1);
  37. //排序、分组
  38. //规约
  39. //设置自定义reduce类
  40. job.setReducerClass(MyReduce.class);
  41. //设置reduce输出类型
  42. job.setOutputKeyClass(LongWritable.class);
  43. job.setOutputValueClass(NullWritable.class);
  44. //删除已存在的路径
  45. FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
  46. Path path=new Path(OUTPUT_PATH);
  47. if(fileSystem.exists(path)){
  48. fileSystem.delete(path,true);
  49. }
  50. //设置输出路径
  51. FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
  52. //设置输出格式化类
  53. job.setOutputFormatClass(TextOutputFormat.class);
  54. //提交任务
  55. job.waitForCompletion(true);
  56. }
  57. static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{
  58. public long max=Long.MIN_VALUE;
  59. @Override
  60. protected void map(LongWritable key, Text value,Context context)
  61. throws IOException, InterruptedException {
  62. long temp=Long.parseLong(value.toString());
  63. if(temp>max){
  64. max=temp;
  65. }
  66. }
  67. @Override
  68. protected void cleanup(Context context)
  69. throws IOException, InterruptedException {
  70. context.write(new LongWritable(max), NullWritable.get());
  71. }
  72. }
  73. static class MyReduce extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{
  74. public long max=Long.MIN_VALUE;
  75. @Override
  76. protected void reduce(LongWritable k2, Iterable<NullWritable> v2s,Context context)
  77. throws IOException, InterruptedException {
  78. long temp=k2.get();
  79. if(temp>max){
  80. max=temp;
  81. }
  82. }
  83. @Override
  84. protected void cleanup(Context context)
  85. throws IOException, InterruptedException {
  86. context.write(new LongWritable(max), NullWritable.get());
  87. }
  88. }
  89. }

发表评论

表情:
评论列表 (有 0 条评论,224人围观)

还没有评论,来说两句吧...

相关阅读