hadoop案例:自定义outputformat 心已赠人 2022-11-21 15:14 120阅读 0赞 ### 文章目录 ### * * 需求 * 输入数据 * 期望输出数据 * 需求分析 * 编写代码 * * 自定义FilterOutputFormat * 自定义FilterRecordWriter * Mapper类 * Reducer类 * Driver类 * 执行结果 ## 需求 ## 过滤输入的log日志,包含hadoop的网站输出到e:/hadoop.log,不包含hadoop的网站输出到e:/other.log ## 输入数据 ## log.txt http://www.baidu.com http://www.google.com http://cn.bing.com http://www.hadoop.com http://www.sohu.com http://www.sina.com http://www.sin2a.com http://www.sin2desa.com http://www.sindsafa.com ## 期望输出数据 ## ![在这里插入图片描述][20201102153305394.png_pic_center] ## 需求分析 ## ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQzNjc0MzYw_size_16_color_FFFFFF_t_70_pic_center] ## 编写代码 ## ### 自定义FilterOutputFormat ### package com.mr.filteroutformat; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable> { @Override public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { return new FilterRecordWriter(job); } } ### 自定义FilterRecordWriter ### package com.mr.filteroutformat; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; public class FilterRecordWriter extends RecordWriter<Text, NullWritable> { //构造器方法 FSDataOutputStream hadoopOut = null; FSDataOutputStream otherOut = null; public FilterRecordWriter(TaskAttemptContext job) { FileSystem fs; try { fs = FileSystem.get(job.getConfiguration()); // 2 创建输出文件路径 Path hadoopPath = new Path("E:\\Hadoop\\src\\main\\resources\\ouput\\filterlog\\hadoop.log"); Path otherPath = new Path("E:\\Hadoop\\src\\main\\resources\\ouput\\filterlog\\.other.log"); // 3 创建输出流 hadoopOut = fs.create(hadoopPath); otherOut = fs.create(otherPath); } catch (IOException e) { e.printStackTrace(); } } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { // 判断是否包含“hadoop”输出到不同文件 if (key.toString().contains("hadoop")) { hadoopOut.write(key.toString().getBytes()); } else { otherOut.write(key.toString().getBytes()); } } @Override public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { // 关闭资源 IOUtils.closeStream(hadoopOut); IOUtils.closeStream(otherOut); } } ### Mapper类 ### package com.mr.filteroutformat; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FilterMapper extends Mapper<LongWritable, Text,Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(value,NullWritable.get()); } } ### Reducer类 ### package com.mr.filteroutformat; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FilterReducer extends Reducer<Text, NullWritable,Text,NullWritable> { Text k = new Text(); @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = key.toString(); // 2 拼接 line = line + "\r\n"; // 3 设置key k.set(line); // 4 输出 context.write(k, NullWritable.get()); } } ### Driver类 ### package com.mr.filteroutformat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FilterDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "E:\\Hadoop\\src\\main\\resources\\input\\filterlog", "E:\\Hadoop\\src\\main\\resources\\ouput\\filterlog" }; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FilterDriver.class); job.setMapperClass(FilterMapper.class); job.setReducerClass(FilterReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); // 要将自定义的输出格式组件设置到job中 job.setOutputFormatClass(FilterOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(args[0])); // 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat // 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录 FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } } ### 执行结果 ### ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQzNjc0MzYw_size_16_color_FFFFFF_t_70_pic_center 1] [20201102153305394.png_pic_center]: /images/20221120/f4e055dbbeaa486abff6d966bcabcd54.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQzNjc0MzYw_size_16_color_FFFFFF_t_70_pic_center]: /images/20221120/d8d49ab0384d462ea653b1e8c833f3b8.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQzNjc0MzYw_size_16_color_FFFFFF_t_70_pic_center 1]: /images/20221120/34ac8995a76942659d7d393db61af570.png
还没有评论,来说两句吧...