MapReduce Java API-多输入路径方式 约定不等于承诺〃 2022-09-03 08:24 120阅读 0赞 # 场景 # MapReduce Java API实例-统计单词出现频率: [https://blog.csdn.net/BADAO\_LIUMANG\_QIZHI/article/details/119410169][https_blog.csdn.net_BADAO_LIUMANG_QIZHI_article_details_119410169] 在上面实现统计单次出现的频率的基础上。 数据集只是单路径,如果有多个数据集文件,即有多个txt文件,要怎么实现。 多文件输入采用MultipleInputs.addInputPath方法即可完成。 注: 博客: [https://blog.csdn.net/badao\_liumang\_qizhi][https_blog.csdn.net_badao_liumang_qizhi] 关注公众号 霸道的程序猿 获取编程相关电子书、教程推送与免费下载。 # 实现 # map和reduce的代码基本和上面的一致 1、map类 package com.badao.multinput; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.StringTokenizer; public class MultInputMapper extends Mapper<LongWritable,Text,Text,IntWritable> { //1、编写map函数,通过继承Mapper类实现里面的map函数 // Mapper类当中的第一个函数是Object,也可以写成Long // 第一个参数对应的值是行偏移量 //2、第二个参数类型通常是Text类型,Text是Hadoop实现的String 类型的可写类型 // 第二个参数对应的值是每行字符串 //3、第三个参数表示的是输出key的数据类型 //4、第四个参数表示的是输出value的数据类型,IntWriable 是Hadoop实现的int类型的可写数据类型 public final static IntWritable one = new IntWritable(1); public Text word = new Text(); //key 是行偏移量 //value是每行字符串 @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer stringTokenizer = new StringTokenizer(value.toString()); while (stringTokenizer.hasMoreTokens()) { //stringTokenizer.nextToken()是字符串类型,使用set函数完成字符串到Text数据类型的转换 word.set(stringTokenizer.nextToken()); //通过write函数写入到本地文件 context.write(word,one); } } } 2、reduce类 package com.badao.multinput; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; //第一个参数类型是输入值key的数据类型,map中间输出key的数据类型 //第二个参数类型是输入值value的数据类型,map中间输出value的数据类型 //第三个参数类型是输出值key的数据类型,他的数据类型要跟job.setOutputKeyClass(Text.class) 保持一致 //第四个参数类型是输出值value的数据类型,它的数据类型要跟job.setOutputValueClass(IntWriable.class) 保持一致 public class MultInputReducer extends Reducer<Text, IntWritable,Text,IntWritable> { public IntWritable result = new IntWritable(); //key就是单词 values是单词出现频率列表 @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable val:values) { //get就是取出IntWriable的值 sum += val.get(); } result.set(sum); context.write(key,result); } } 3、job类 job这里不同,单路径时 FileInputFormat.addInputPath(job,new Path("D:\\words.txt")); 多路径时 Path path1 = new Path("D:\\words.txt"); Path path2 = new Path("D:\\words2.txt"); MultipleInputs.addInputPath(job,path1, TextInputFormat.class,MultInputMapper.class); MultipleInputs.addInputPath(job,path2, TextInputFormat.class,MultInputMapper.class); 完整代码 package com.badao.multinput; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer; import java.io.IOException; public class MultInputJob { public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException { wordCountLocal(); } public static void wordCountLocal()throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); //实例化一个作业,word count是作业的名字 Job job = Job.getInstance(conf, "multinputwordcount"); //指定通过哪个类找到对应的jar包 job.setJarByClass(MultInputJob.class); //为job设置Mapper类 job.setMapperClass(MultInputMapper.class); job.setCombinerClass(IntSumReducer.class); //为job设置reduce类 job.setReducerClass(MultInputReducer.class); //为job的输出数据设置key类 job.setOutputKeyClass(Text.class); //为job输出设置value类 job.setOutputValueClass(IntWritable.class); //多个输入路径 Path path1 = new Path("D:\\words.txt"); Path path2 = new Path("D:\\words2.txt"); MultipleInputs.addInputPath(job,path1, TextInputFormat.class,MultInputMapper.class); MultipleInputs.addInputPath(job,path2, TextInputFormat.class,MultInputMapper.class); //为job设置输出路径 FileOutputFormat.setOutputPath(job,new Path("D:\\mulinputOut")); job.waitForCompletion(true); } } 运行job类查看效果 ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0JBREFPX0xJVU1BTkdfUUlaSEk_size_16_color_FFFFFF_t_70][] [https_blog.csdn.net_BADAO_LIUMANG_QIZHI_article_details_119410169]: https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/119410169 [https_blog.csdn.net_badao_liumang_qizhi]: https://blog.csdn.net/badao_liumang_qizhi [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0JBREFPX0xJVU1BTkdfUUlaSEk_size_16_color_FFFFFF_t_70]: /images/20220829/aac8bb3f6e784f589be8b57a39f8c64a.png
还没有评论,来说两句吧...