【Hadoop】Hadoop生态系列之InputForamt.class与OutputFormat.class分析
上一篇:Hadoop生态系列之MapReduce概述及MapReduce任务开发与发布
指路牌
- InputForamt&OutputFormat
- 整体设计
- InputFormat.class
- TextInputFormat.class
- NLineInputFormat.class
- KeyValueTextInputFormat.class
- MultipleInputs.class
- CombineFileInputFormat.class
- DBInputFormat.class
- OutputFormat.classs
- TextoutputFormat.class
- DBOutputFormat.class
- JedisOutputFormat.class
- 依赖解决
- InputFormat/OuputFormat与Mapper和Reducer
InputForamt&OutputFormat
整体设计
InputFormat.class
该类是Hadoop提供的顶层抽象类,该类主要定制切片计算逻辑和切片数据的读取逻辑。
public abstract class InputFormat<K, V> {
public InputFormat() {
}
//计算切片/ 数据拆分逻辑 区间
public abstract List<InputSplit> getSplits(JobContext var1)
throws IOException, InterruptedException;
//实现逻辑区间的读取逻辑,将读取的数据传递给Mapper
public abstract RecordReader<K, V> createRecordReader(InputSplit var1,TaskAttemptContext var2)
throws IOException, InterruptedException;
}
在Hadoop的实现包中提供了InputFormat.class接口预实现主要有:
CompositeInputFormat.class
- 主要实现在Map段实现大规模数据集的joinDBInputFormat
- 主要提供针对RDBMS数据库的读取实现,主要针对Oracle和MySQL数据库。FileInputFormat
-主要针对分布式文件系统提供的预实现。
TextInputFormat.class
默认将文件按照128MB的大小为单位进行切割,切割的区间称为一个Split ,然后使用LineRecordReader对区间数据进行读取,该LineRecordReader会给Mapper提供每一行文本数据作为value,同时提供该value在文本行中的字节偏移量该偏移量是一个Long类型的参数,通常没有什么作用。
注意:默认FileInputFormat 的所有子类,在没有覆盖 getSplits方法的时候,默认计算的切片大小的区间(0,140.8MB]因为底层在计算文件切片的时候是通过
(文件的大小/128MB > 1.1)?切分新块:不切分
代码参考:Hadoop生态系列之MapReduce概述及MapReduce任务开发与发布
NLineInputFormat.class
默认将文件按照N行对文件进行切割,切割的区间称为一个Split ,然后使用LineRecordReader对区间数据进行读取,该LineRecordReader会给Mapper提供每一行文本数据作为value,同时提供该value在文本行中的字节偏移量该偏移量是一个Long类型的参数,通常没有什么作用。
覆盖了FileInputFormat的getSplits,因此我们在使用NLineInputFormat的时候一般需要设置行数。
NLineInputFormat.setNumLinesPerSplit(job,1000);
public class URLCountApplication extends Configured implements Tool {
public int run(String[] strings) throws Exception {
//1.创建一个Job对象
Configuration conf = getConf();
Job job= Job.getInstance(conf,"URLCountApplication");
//2.告诉job数据格式
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.setNumLinesPerSplit(job,1000);
job.setOutputFormatClass(TextOutputFormat.class);
//3.设置数据路径
TextInputFormat.addInputPath(job,new Path("D:/data/click"));
//系统自动创建,如果在执行前存在,则放弃执行
TextOutputFormat.setOutputPath(job,new Path("D:/data/result"));
//4.设置处理逻辑
job.setMapperClass(URLMapper.class);
job.setReducerClass(URLReducer.class);
//5.设置输出的Key,value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6.提交job
return job.waitForCompletion(true)?1:0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new URLCountApplication(),args);
}
}
KeyValueTextInputFormat.class
默认将文件按照128MB的大小为单位进行切割,切割的区间称为一个Split ,然后使用KeyValueLineRecordReader对区间数据进行读取,该KeyValueLineRecordReader会给Mapper提供key和value都是Text类型,该格式输入默认按照制表符\t
进行分隔Key/Value如果没有正确拆分,会将整行作为key,value为null
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,",");
public class URLCountApplication extends Configured implements Tool {
public int run(String[] strings) throws Exception {
//1.创建一个Job对象
Configuration conf = getConf();
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,",");
Job job= Job.getInstance(conf,"AvgCostAplication");
//2.告诉job数据格式
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//3.设置数据路径
TextInputFormat.addInputPath(job,new Path("file:///D:/data/keyvalue"));
//系统自动创建,如果在执行前存在,则放弃执行
TextOutputFormat.setOutputPath(job,new Path("file:///D:/data/result"));
//4.设置处理逻辑
job.setMapperClass(AvgCostMapper.class);
job.setReducerClass(AvgCostReducer.class);
//5.设置输出的Key,value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
//6.提交job
return job.waitForCompletion(true)?1:0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new URLCountApplication(),args);
}
}
MultipleInputs.class
这是一个复合的输入格式,主要适用于将多个不同格式的InputFormat组合使用,要求Map端的输出格式还必须保持一致。
public class SumCostApplication extends Configured implements Tool {
public int run(String[] strings) throws Exception {
//1.创建一个Job对象
Configuration conf = getConf();
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR,",");
Job job= Job.getInstance(conf,"SumCostCountApplication");
//2.告诉job数据格式
job.setOutputFormatClass(TextOutputFormat.class);
//3.设置数据路径
TextOutputFormat.setOutputPath(job,new Path("file:///D:/data/result"));
//4.设置处理逻辑
MultipleInputs.addInputPath(job,new Path("file:///D:/data/mul/keyvalue"), KeyValueTextInputFormat.class,KeyVlaueCostMapper.class);
MultipleInputs.addInputPath(job,new Path("file:///D:/data/mul/text"), TextInputFormat.class,TextCostMapper.class);
job.setReducerClass(CostSumReducer.class);
//5.设置输出的Key,value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
//6.提交job
return job.waitForCompletion(true)?1:0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new SumCostApplication(),args);
}
}
CombineFileInputFormat.class
上述的所有FileInputFormat都是以文件为单位计算文件切片,也就意味着如果计算的目录下有很多小文件,就会导致第一 阶段的Map任务过多。因此默认FileInputFormat对小文件处理不是太友好,因此Hadoop提供了CombineFileInputFormat格式类,该类专门用于处理小文件场景下的切片计算,会将多个小文件对应同一个切片。但是要求这些小文件的格式必须一致。我们可以使用CombineTextInputFormat该类和TextInputFormat用法一致,不同的是在于切片的计算上。
public class URLCountApplication extends Configured implements Tool {
public int run(String[] strings) throws Exception {
//1.创建一个Job对象
Configuration conf = getConf();
Job job= Job.getInstance(conf,"URLCountApplication");
//2.告诉job数据格式
job.setInputFormatClass(CombineTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//3.设置数据路径
CombineTextInputFormat.addInputPath(job,new Path("file:///D:/data/click"));
//系统自动创建,如果在执行前存在,则放弃执行
TextOutputFormat.setOutputPath(job,new Path("file:///D:/data/result"));
//4.设置处理逻辑
job.setMapperClass(URLMapper.class);
job.setReducerClass(URLReducer.class);
//5.设置输出的Key,value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6.提交job
return job.waitForCompletion(true)?1:0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new URLCountApplication(),args);
}
}
DBInputFormat.class
主要负责读取RDBMS(关系型数据库)中的数据目前仅仅支持MySQL / Oracle数据库
public class UserDBWritable implements DBWritable {
private Boolean sex;
private Double salary;
/**
* DBOutputFormat使用的
* @param statement
* @throws SQLException
*/
public void write(PreparedStatement statement) throws SQLException {
}
public void readFields(ResultSet resultSet) throws SQLException {
this.sex=resultSet.getBoolean("sex");
this.salary=resultSet.getDouble("salary");
}
public Boolean getSex() {
return sex;
}
public void setSex(Boolean sex) {
this.sex = sex;
}
public Double getSalary() {
return salary;
}
public void setSalary(Double salary) {
this.salary = salary;
}
}
public class DBAvgSalaryApplication extends Configured implements Tool {
public int run(String[] strings) throws Exception {
//1.创建一个Job对象
Configuration conf = getConf();
//设置并行度
conf.setInt(MRJobConfig.NUM_MAPS,5);
DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver",
"jdbc:mysql://localhost:3306/test","root","123456");
Job job= Job.getInstance(conf,"DBAvgSalaryApplication");
//2.告诉job数据格式
job.setInputFormatClass(DBInputFormat.class);
String query="select sex,salary from t_user";
String countQuery="select count(*) from t_user";
DBInputFormat.setInput(job,UserDBWritable.class,query,countQuery);
job.setOutputFormatClass(TextOutputFormat.class);
//3.设置数据路径
//系统自动创建,如果在执行前存在,则放弃执行
TextOutputFormat.setOutputPath(job,new Path("D:/data/result"));
//4.设置处理逻辑
job.setMapperClass(UserAvgMapper.class);
job.setReducerClass(UserAvgReducer.class);
//5.设置输出的Key,value
job.setMapOutputKeyClass(BooleanWritable.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
//6.提交job
return job.waitForCompletion(true)?1:0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new DBAvgSalaryApplication(),args);
}
}
OutputFormat.classs
该类是Hadoop提供的顶层抽象类,该类主要实现写的逻辑负责将Reduce端的输出写出到外围系统,同时也提供了输出检查(仅仅限于文件系统),负责返回Committer,确保系统能正常的输出。
public abstract class OutputFormat<K, V> {
//创建RecordWriter
public abstract RecordWriter<K, V>
getRecordWriter(TaskAttemptContext context
) throws IOException, InterruptedException;
//键查输出目录是否有效
public abstract void checkOutputSpecs(JobContext context
) throws IOException,
InterruptedException;
//返回一个提交器
public abstract
OutputCommitter getOutputCommitter(TaskAttemptContext context
) throws IOException, InterruptedException;
}
TextoutputFormat.class
将Reducer端的输出直接写入到文件系统中,其中在写入的时候会调用key、value的toString方法。
DBOutputFormat.class
将将Reducer端的输出直接写入到数据库系统中。
public class URLCountDBWritable implements DBWritable {
private String category;
private Integer count;
public URLCountDBWritable(String category, Integer count) {
this.category = category;
this.count = count;
}
public URLCountDBWritable() {
}
public String getCategory() {
return category;
}
public void setCategory(String category) {
this.category = category;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
public void write(PreparedStatement statement) throws SQLException {
statement.setString(1,category);
statement.setInt(2,count);
}
public void readFields(ResultSet resultSet) throws SQLException {
}
}
public class URLCountApplication extends Configured implements Tool {
public int run(String[] strings) throws Exception {
//1.创建一个Job对象
Configuration conf = getConf();
DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver",
"jdbc:mysql://localhost:3306/test",
"root","123456");
Job job= Job.getInstance(conf,"URLCountApplication");
//2.告诉job数据格式
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(DBOutputFormat.class);
//3.设置数据路径
TextInputFormat.addInputPath(job,new Path("file:///D:/data/click"));
DBOutputFormat.setOutput(job,"url_click","url_category","url_count");
//4.设置处理逻辑
job.setMapperClass(URLMapper.class);
job.setReducerClass(URLReducer.class);
//5.设置输出的Key,value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(URLCountDBWritable.class);
job.setOutputValueClass(NullWritable.class);
//6.提交job
return job.waitForCompletion(true)?1:0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new URLCountApplication(),args);
}
}
JedisOutputFormat.class
public class JedisOutputFormat extends OutputFormat<String,String> {
public final static String JEDIS_HOST="jedis.host";
public final static String JEDIS_PORT="jedis.port";
public static void setOutput(Job job, String host, Integer port) {
job.getConfiguration().set(JEDIS_HOST,host);
job.getConfiguration().setInt(JEDIS_PORT,port);
}
public RecordWriter<String, String> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
Configuration config = context.getConfiguration();
String host=config.get(JEDIS_HOST);
Integer port=config.getInt(JEDIS_PORT,6379);
return new JedisRecordWriter(host,port);
}
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
}
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
context);
}
}
public class JedisRecordWriter extends RecordWriter<String,String> {
private Jedis jedis=null;
public JedisRecordWriter(String host, Integer port) {
jedis=new Jedis(host,port);
}
public void write(String key, String value) throws IOException, InterruptedException {
jedis.set(key,value);
}
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
jedis.close();
}
}
public class URLCountApplication extends Configured implements Tool {
public int run(String[] strings) throws Exception {
//1.创建一个Job对象
Configuration conf = getConf();
Job job= Job.getInstance(conf,"URLCountApplication");
//2.告诉job数据格式
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(JedisOutputFormat.class);
//3.设置数据路径
TextInputFormat.addInputPath(job,new Path("file:///D:/data/click"));
JedisOutputFormat.setOutput(job,"CentOS",6379);
//4.设置处理逻辑
job.setMapperClass(URLMapper.class);
job.setReducerClass(URLReducer.class);
//5.设置输出的Key,value
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(String.class);
job.setOutputValueClass(String.class);
//6.提交job
return job.waitForCompletion(true)?1:0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new URLCountApplication(),args);
}
}
依赖解决
运行时依赖(Yarn Child依赖)
方案1
要求用户将依赖的jar包拷贝给所有的计算节点(NodeManager运行所在主机)[root@CentOS ~]# hadoop jar xxx.jar 入口类 -libjars 依赖jar包1,依赖jar包2,....
方案2
[root@CentOS ~]# hdfs dfs -mkdir /libs
[root@CentOS ~]# hdfs dfs -put mysql-connector-java-5.1.46.jar /libs
conf.setStrings("tmpjars","/libs/xxx1.jar,/libs/xxx2.jar,...");
- 提交时依赖(client )
需要用户配置HADOOP_CLASSPATH环境变量(/root/.bashrc),通常这种依赖发生在切片计算阶段。
HADOOP_CLASSPATH=/root/mysql-connector-java-5.1.46.jar
export HADOOP_CLASSPATH
[root@CentOS ~]# source .bashrc
[root@CentOS ~]# hadoop classpath #查看hadoop的类路径
/usr/hadoop-2.6.0/etc/hadoop:/usr/hadoop-2.6.0/share/hadoop/common/lib/*:/usr/hadoop-2.6.0/share/hadoop/common/*:/usr/hadoop-2.6.0/share/hadoop/hdfs:/usr/hadoop-2.6.0/share/hadoop/hdfs/lib/*:/usr/hadoop-2.6.0/share/hadoop/hdfs/*:/usr/hadoop-2.6.0/share/hadoop/yarn/lib/*:/usr/hadoop-2.6.0/share/hadoop/yarn/*:/usr/hadoop-2.6.0/share/hadoop/mapreduce/lib/*:/usr/hadoop-2.6.0/share/hadoop/mapreduce/*:`/root/mysql-connector-java-5.1.46.jar`:/usr/hadoop-2.6.0/contrib/capacity-scheduler/*.jar
还没有评论,来说两句吧...