【Hadoop】Hadoop生态系列之InputForamt.class与OutputFormat.class分析

上一篇:Hadoop生态系列之MapReduce概述及MapReduce任务开发与发布

指路牌

    • InputForamt&OutputFormat
      • 整体设计
      • InputFormat.class
        • TextInputFormat.class
        • NLineInputFormat.class
        • KeyValueTextInputFormat.class
        • MultipleInputs.class
        • CombineFileInputFormat.class
        • DBInputFormat.class
      • OutputFormat.classs
        • TextoutputFormat.class
        • DBOutputFormat.class
        • JedisOutputFormat.class
        • 依赖解决
    • InputFormat/OuputFormat与Mapper和Reducer

InputForamt&OutputFormat

整体设计

\[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8JglVn8m-1601378732933)(F:/BigData/hadoop生态系列课程/assets/image-20200929100753565.png)\]

InputFormat.class

该类是Hadoop提供的顶层抽象类,该类主要定制切片计算逻辑和切片数据的读取逻辑

  1. public abstract class InputFormat<K, V> {
  2. public InputFormat() {
  3. }
  4. //计算切片/ 数据拆分逻辑 区间
  5. public abstract List<InputSplit> getSplits(JobContext var1)
  6. throws IOException, InterruptedException;
  7. //实现逻辑区间的读取逻辑,将读取的数据传递给Mapper
  8. public abstract RecordReader<K, V> createRecordReader(InputSplit var1,TaskAttemptContext var2)
  9. throws IOException, InterruptedException;
  10. }

在Hadoop的实现包中提供了InputFormat.class接口预实现主要有:

  • CompositeInputFormat.class - 主要实现在Map段实现大规模数据集的join
  • DBInputFormat - 主要提供针对RDBMS数据库的读取实现,主要针对OracleMySQL数据库。
  • FileInputFormat -主要针对分布式文件系统提供的预实现。
TextInputFormat.class

默认将文件按照128MB的大小为单位进行切割,切割的区间称为一个Split ,然后使用LineRecordReader对区间数据进行读取,该LineRecordReader会给Mapper提供每一行文本数据作为value,同时提供该value在文本行中的字节偏移量该偏移量是一个Long类型的参数,通常没有什么作用。

注意:默认FileInputFormat 的所有子类,在没有覆盖 getSplits方法的时候,默认计算的切片大小的区间(0,140.8MB]因为底层在计算文件切片的时候是通过(文件的大小/128MB > 1.1)?切分新块:不切分

代码参考:Hadoop生态系列之MapReduce概述及MapReduce任务开发与发布

NLineInputFormat.class

默认将文件按照N行对文件进行切割,切割的区间称为一个Split ,然后使用LineRecordReader对区间数据进行读取,该LineRecordReader会给Mapper提供每一行文本数据作为value,同时提供该value在文本行中的字节偏移量该偏移量是一个Long类型的参数,通常没有什么作用。

覆盖了FileInputFormat的getSplits,因此我们在使用NLineInputFormat的时候一般需要设置行数。

  1. NLineInputFormat.setNumLinesPerSplit(job,1000);
  2. public class URLCountApplication extends Configured implements Tool {
  3. public int run(String[] strings) throws Exception {
  4. //1.创建一个Job对象
  5. Configuration conf = getConf();
  6. Job job= Job.getInstance(conf,"URLCountApplication");
  7. //2.告诉job数据格式
  8. job.setInputFormatClass(NLineInputFormat.class);
  9. NLineInputFormat.setNumLinesPerSplit(job,1000);
  10. job.setOutputFormatClass(TextOutputFormat.class);
  11. //3.设置数据路径
  12. TextInputFormat.addInputPath(job,new Path("D:/data/click"));
  13. //系统自动创建,如果在执行前存在,则放弃执行
  14. TextOutputFormat.setOutputPath(job,new Path("D:/data/result"));
  15. //4.设置处理逻辑
  16. job.setMapperClass(URLMapper.class);
  17. job.setReducerClass(URLReducer.class);
  18. //5.设置输出的Key,value
  19. job.setMapOutputKeyClass(Text.class);
  20. job.setMapOutputValueClass(IntWritable.class);
  21. job.setOutputKeyClass(Text.class);
  22. job.setOutputValueClass(IntWritable.class);
  23. //6.提交job
  24. return job.waitForCompletion(true)?1:0;
  25. }
  26. public static void main(String[] args) throws Exception {
  27. ToolRunner.run(new URLCountApplication(),args);
  28. }
  29. }
KeyValueTextInputFormat.class

默认将文件按照128MB的大小为单位进行切割,切割的区间称为一个Split ,然后使用KeyValueLineRecordReader对区间数据进行读取,该KeyValueLineRecordReader会给Mapper提供key和value都是Text类型,该格式输入默认按照制表符\t进行分隔Key/Value如果没有正确拆分,会将整行作为key,value为null

  1. conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,",");
  2. public class URLCountApplication extends Configured implements Tool {
  3. public int run(String[] strings) throws Exception {
  4. //1.创建一个Job对象
  5. Configuration conf = getConf();
  6. conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,",");
  7. Job job= Job.getInstance(conf,"AvgCostAplication");
  8. //2.告诉job数据格式
  9. job.setInputFormatClass(KeyValueTextInputFormat.class);
  10. job.setOutputFormatClass(TextOutputFormat.class);
  11. //3.设置数据路径
  12. TextInputFormat.addInputPath(job,new Path("file:///D:/data/keyvalue"));
  13. //系统自动创建,如果在执行前存在,则放弃执行
  14. TextOutputFormat.setOutputPath(job,new Path("file:///D:/data/result"));
  15. //4.设置处理逻辑
  16. job.setMapperClass(AvgCostMapper.class);
  17. job.setReducerClass(AvgCostReducer.class);
  18. //5.设置输出的Key,value
  19. job.setMapOutputKeyClass(Text.class);
  20. job.setMapOutputValueClass(DoubleWritable.class);
  21. job.setOutputKeyClass(Text.class);
  22. job.setOutputValueClass(DoubleWritable.class);
  23. //6.提交job
  24. return job.waitForCompletion(true)?1:0;
  25. }
  26. public static void main(String[] args) throws Exception {
  27. ToolRunner.run(new URLCountApplication(),args);
  28. }
  29. }
MultipleInputs.class

这是一个复合的输入格式,主要适用于将多个不同格式的InputFormat组合使用,要求Map端的输出格式还必须保持一致。

  1. public class SumCostApplication extends Configured implements Tool {
  2. public int run(String[] strings) throws Exception {
  3. //1.创建一个Job对象
  4. Configuration conf = getConf();
  5. conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,",");
  6. Job job= Job.getInstance(conf,"SumCostCountApplication");
  7. //2.告诉job数据格式
  8. job.setOutputFormatClass(TextOutputFormat.class);
  9. //3.设置数据路径
  10. TextOutputFormat.setOutputPath(job,new Path("file:///D:/data/result"));
  11. //4.设置处理逻辑
  12. MultipleInputs.addInputPath(job,new Path("file:///D:/data/mul/keyvalue"), KeyValueTextInputFormat.class,KeyVlaueCostMapper.class);
  13. MultipleInputs.addInputPath(job,new Path("file:///D:/data/mul/text"), TextInputFormat.class,TextCostMapper.class);
  14. job.setReducerClass(CostSumReducer.class);
  15. //5.设置输出的Key,value
  16. job.setMapOutputKeyClass(Text.class);
  17. job.setMapOutputValueClass(DoubleWritable.class);
  18. job.setOutputKeyClass(Text.class);
  19. job.setOutputValueClass(DoubleWritable.class);
  20. //6.提交job
  21. return job.waitForCompletion(true)?1:0;
  22. }
  23. public static void main(String[] args) throws Exception {
  24. ToolRunner.run(new SumCostApplication(),args);
  25. }
  26. }
CombineFileInputFormat.class

上述的所有FileInputFormat都是以文件为单位计算文件切片,也就意味着如果计算的目录下有很多小文件,就会导致第一 阶段的Map任务过多。因此默认FileInputFormat对小文件处理不是太友好,因此Hadoop提供了CombineFileInputFormat格式类,该类专门用于处理小文件场景下的切片计算,会将多个小文件对应同一个切片。但是要求这些小文件的格式必须一致。我们可以使用CombineTextInputFormat该类和TextInputFormat用法一致,不同的是在于切片的计算上。

  1. public class URLCountApplication extends Configured implements Tool {
  2. public int run(String[] strings) throws Exception {
  3. //1.创建一个Job对象
  4. Configuration conf = getConf();
  5. Job job= Job.getInstance(conf,"URLCountApplication");
  6. //2.告诉job数据格式
  7. job.setInputFormatClass(CombineTextInputFormat.class);
  8. job.setOutputFormatClass(TextOutputFormat.class);
  9. //3.设置数据路径
  10. CombineTextInputFormat.addInputPath(job,new Path("file:///D:/data/click"));
  11. //系统自动创建,如果在执行前存在,则放弃执行
  12. TextOutputFormat.setOutputPath(job,new Path("file:///D:/data/result"));
  13. //4.设置处理逻辑
  14. job.setMapperClass(URLMapper.class);
  15. job.setReducerClass(URLReducer.class);
  16. //5.设置输出的Key,value
  17. job.setMapOutputKeyClass(Text.class);
  18. job.setMapOutputValueClass(IntWritable.class);
  19. job.setOutputKeyClass(Text.class);
  20. job.setOutputValueClass(IntWritable.class);
  21. //6.提交job
  22. return job.waitForCompletion(true)?1:0;
  23. }
  24. public static void main(String[] args) throws Exception {
  25. ToolRunner.run(new URLCountApplication(),args);
  26. }
  27. }
DBInputFormat.class

主要负责读取RDBMS(关系型数据库)中的数据目前仅仅支持MySQL / Oracle数据库

  1. public class UserDBWritable implements DBWritable {
  2. private Boolean sex;
  3. private Double salary;
  4. /**
  5. * DBOutputFormat使用的
  6. * @param statement
  7. * @throws SQLException
  8. */
  9. public void write(PreparedStatement statement) throws SQLException {
  10. }
  11. public void readFields(ResultSet resultSet) throws SQLException {
  12. this.sex=resultSet.getBoolean("sex");
  13. this.salary=resultSet.getDouble("salary");
  14. }
  15. public Boolean getSex() {
  16. return sex;
  17. }
  18. public void setSex(Boolean sex) {
  19. this.sex = sex;
  20. }
  21. public Double getSalary() {
  22. return salary;
  23. }
  24. public void setSalary(Double salary) {
  25. this.salary = salary;
  26. }
  27. }
  28. public class DBAvgSalaryApplication extends Configured implements Tool {
  29. public int run(String[] strings) throws Exception {
  30. //1.创建一个Job对象
  31. Configuration conf = getConf();
  32. //设置并行度
  33. conf.setInt(MRJobConfig.NUM_MAPS,5);
  34. DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver",
  35. "jdbc:mysql://localhost:3306/test","root","123456");
  36. Job job= Job.getInstance(conf,"DBAvgSalaryApplication");
  37. //2.告诉job数据格式
  38. job.setInputFormatClass(DBInputFormat.class);
  39. String query="select sex,salary from t_user";
  40. String countQuery="select count(*) from t_user";
  41. DBInputFormat.setInput(job,UserDBWritable.class,query,countQuery);
  42. job.setOutputFormatClass(TextOutputFormat.class);
  43. //3.设置数据路径
  44. //系统自动创建,如果在执行前存在,则放弃执行
  45. TextOutputFormat.setOutputPath(job,new Path("D:/data/result"));
  46. //4.设置处理逻辑
  47. job.setMapperClass(UserAvgMapper.class);
  48. job.setReducerClass(UserAvgReducer.class);
  49. //5.设置输出的Key,value
  50. job.setMapOutputKeyClass(BooleanWritable.class);
  51. job.setMapOutputValueClass(DoubleWritable.class);
  52. job.setOutputKeyClass(Text.class);
  53. job.setOutputValueClass(DoubleWritable.class);
  54. //6.提交job
  55. return job.waitForCompletion(true)?1:0;
  56. }
  57. public static void main(String[] args) throws Exception {
  58. ToolRunner.run(new DBAvgSalaryApplication(),args);
  59. }
  60. }

OutputFormat.classs

该类是Hadoop提供的顶层抽象类,该类主要实现写的逻辑负责将Reduce端的输出写出到外围系统,同时也提供了输出检查(仅仅限于文件系统),负责返回Committer,确保系统能正常的输出。

  1. public abstract class OutputFormat<K, V> {
  2. //创建RecordWriter
  3. public abstract RecordWriter<K, V>
  4. getRecordWriter(TaskAttemptContext context
  5. ) throws IOException, InterruptedException;
  6. //键查输出目录是否有效
  7. public abstract void checkOutputSpecs(JobContext context
  8. ) throws IOException,
  9. InterruptedException;
  10. //返回一个提交器
  11. public abstract
  12. OutputCommitter getOutputCommitter(TaskAttemptContext context
  13. ) throws IOException, InterruptedException;
  14. }
TextoutputFormat.class

将Reducer端的输出直接写入到文件系统中,其中在写入的时候会调用key、value的toString方法。

DBOutputFormat.class

将将Reducer端的输出直接写入到数据库系统中。

  1. public class URLCountDBWritable implements DBWritable {
  2. private String category;
  3. private Integer count;
  4. public URLCountDBWritable(String category, Integer count) {
  5. this.category = category;
  6. this.count = count;
  7. }
  8. public URLCountDBWritable() {
  9. }
  10. public String getCategory() {
  11. return category;
  12. }
  13. public void setCategory(String category) {
  14. this.category = category;
  15. }
  16. public Integer getCount() {
  17. return count;
  18. }
  19. public void setCount(Integer count) {
  20. this.count = count;
  21. }
  22. public void write(PreparedStatement statement) throws SQLException {
  23. statement.setString(1,category);
  24. statement.setInt(2,count);
  25. }
  26. public void readFields(ResultSet resultSet) throws SQLException {
  27. }
  28. }
  29. public class URLCountApplication extends Configured implements Tool {
  30. public int run(String[] strings) throws Exception {
  31. //1.创建一个Job对象
  32. Configuration conf = getConf();
  33. DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver",
  34. "jdbc:mysql://localhost:3306/test",
  35. "root","123456");
  36. Job job= Job.getInstance(conf,"URLCountApplication");
  37. //2.告诉job数据格式
  38. job.setInputFormatClass(TextInputFormat.class);
  39. job.setOutputFormatClass(DBOutputFormat.class);
  40. //3.设置数据路径
  41. TextInputFormat.addInputPath(job,new Path("file:///D:/data/click"));
  42. DBOutputFormat.setOutput(job,"url_click","url_category","url_count");
  43. //4.设置处理逻辑
  44. job.setMapperClass(URLMapper.class);
  45. job.setReducerClass(URLReducer.class);
  46. //5.设置输出的Key,value
  47. job.setMapOutputKeyClass(Text.class);
  48. job.setMapOutputValueClass(IntWritable.class);
  49. job.setOutputKeyClass(URLCountDBWritable.class);
  50. job.setOutputValueClass(NullWritable.class);
  51. //6.提交job
  52. return job.waitForCompletion(true)?1:0;
  53. }
  54. public static void main(String[] args) throws Exception {
  55. ToolRunner.run(new URLCountApplication(),args);
  56. }
  57. }
JedisOutputFormat.class
  1. public class JedisOutputFormat extends OutputFormat<String,String> {
  2. public final static String JEDIS_HOST="jedis.host";
  3. public final static String JEDIS_PORT="jedis.port";
  4. public static void setOutput(Job job, String host, Integer port) {
  5. job.getConfiguration().set(JEDIS_HOST,host);
  6. job.getConfiguration().setInt(JEDIS_PORT,port);
  7. }
  8. public RecordWriter<String, String> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
  9. Configuration config = context.getConfiguration();
  10. String host=config.get(JEDIS_HOST);
  11. Integer port=config.getInt(JEDIS_PORT,6379);
  12. return new JedisRecordWriter(host,port);
  13. }
  14. public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
  15. }
  16. public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
  17. return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
  18. context);
  19. }
  20. }
  21. public class JedisRecordWriter extends RecordWriter<String,String> {
  22. private Jedis jedis=null;
  23. public JedisRecordWriter(String host, Integer port) {
  24. jedis=new Jedis(host,port);
  25. }
  26. public void write(String key, String value) throws IOException, InterruptedException {
  27. jedis.set(key,value);
  28. }
  29. public void close(TaskAttemptContext context) throws IOException, InterruptedException {
  30. jedis.close();
  31. }
  32. }
  33. public class URLCountApplication extends Configured implements Tool {
  34. public int run(String[] strings) throws Exception {
  35. //1.创建一个Job对象
  36. Configuration conf = getConf();
  37. Job job= Job.getInstance(conf,"URLCountApplication");
  38. //2.告诉job数据格式
  39. job.setInputFormatClass(TextInputFormat.class);
  40. job.setOutputFormatClass(JedisOutputFormat.class);
  41. //3.设置数据路径
  42. TextInputFormat.addInputPath(job,new Path("file:///D:/data/click"));
  43. JedisOutputFormat.setOutput(job,"CentOS",6379);
  44. //4.设置处理逻辑
  45. job.setMapperClass(URLMapper.class);
  46. job.setReducerClass(URLReducer.class);
  47. //5.设置输出的Key,value
  48. job.setMapOutputKeyClass(Text.class);
  49. job.setMapOutputValueClass(IntWritable.class);
  50. job.setOutputKeyClass(String.class);
  51. job.setOutputValueClass(String.class);
  52. //6.提交job
  53. return job.waitForCompletion(true)?1:0;
  54. }
  55. public static void main(String[] args) throws Exception {
  56. ToolRunner.run(new URLCountApplication(),args);
  57. }
  58. }
依赖解决
  • 运行时依赖(Yarn Child依赖)

    • 方案1
      要求用户将依赖的jar包拷贝给所有的计算节点(NodeManager运行所在主机)

      1. [root@CentOS ~]# hadoop jar xxx.jar 入口类 -libjars 依赖jar1,依赖jar2,....
    • 方案2

      1. [root@CentOS ~]# hdfs dfs -mkdir /libs
      2. [root@CentOS ~]# hdfs dfs -put mysql-connector-java-5.1.46.jar /libs
      3. conf.setStrings("tmpjars","/libs/xxx1.jar,/libs/xxx2.jar,...");
  • 提交时依赖(client )

需要用户配置HADOOP_CLASSPATH环境变量(/root/.bashrc),通常这种依赖发生在切片计算阶段。

  1. HADOOP_CLASSPATH=/root/mysql-connector-java-5.1.46.jar
  2. export HADOOP_CLASSPATH
  3. [root@CentOS ~]# source .bashrc
  4. [root@CentOS ~]# hadoop classpath #查看hadoop的类路径
  5. /usr/hadoop-2.6.0/etc/hadoop:/usr/hadoop-2.6.0/share/hadoop/common/lib/*:/usr/hadoop-2.6.0/share/hadoop/common/*:/usr/hadoop-2.6.0/share/hadoop/hdfs:/usr/hadoop-2.6.0/share/hadoop/hdfs/lib/*:/usr/hadoop-2.6.0/share/hadoop/hdfs/*:/usr/hadoop-2.6.0/share/hadoop/yarn/lib/*:/usr/hadoop-2.6.0/share/hadoop/yarn/*:/usr/hadoop-2.6.0/share/hadoop/mapreduce/lib/*:/usr/hadoop-2.6.0/share/hadoop/mapreduce/*:`/root/mysql-connector-java-5.1.46.jar`:/usr/hadoop-2.6.0/contrib/capacity-scheduler/*.jar

InputFormat/OuputFormat与Mapper和Reducer

在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,55人围观)

还没有评论,来说两句吧...

相关阅读