MapReduce 读取ORC格式文件 不念不忘少年蓝@ 2021-06-24 15:57 455阅读 0赞 1、创建orc格式hive表: create table test_orc(name string,age int) stored as orc 2、查看表结构: show create table test_orc CREATE TABLE `test_orc`( `name` string, `age` int) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION 'hdfs://localhost:9000/user/work/warehouse/test_orc' TBLPROPERTIES ( 'transient_lastDdlTime'='1502868725') 3、插入测试数据: insert into table test_orc select name ,age from test limit 10; 4、读取mr: 1)pom.xml: <dependency> <groupId>org.apache.orc</groupId> <artifactId>orc-core</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>org.apache.orc</groupId> <artifactId>orc-mapreduce</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.6.0</version> </dependency> 2)代码: package com.fan.hadoop.orc; import com.fan.hadoop.parquet.thrift.ParquetThriftWriterMR; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.orc.mapred.OrcStruct; import org.apache.orc.mapreduce.OrcInputFormat; import java.io.IOException; public class OrcReaderMR { public static class OrcMap extends Mapper<NullWritable,OrcStruct,Text,IntWritable> { // Assume the ORC file has type: struct<s:string,i:int> public void map(NullWritable key, OrcStruct value, Context output) throws IOException, InterruptedException { // take the first field as the key and the second field as the value output.write((Text) value.getFieldValue(0), (IntWritable) value.getFieldValue(1)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(ParquetThriftWriterMR.class); job.setJobName("parquetthrfit"); String in = "hdfs://localhost:9000/user/work/warehouse/test_orc"; String out = "hdfs://localhost:9000/test/orc"; job.setMapperClass(OrcMap.class); OrcInputFormat.addInputPath(job, new Path(in)); job.setInputFormatClass(OrcInputFormat.class); job.setNumReduceTasks(0); job.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(job, new Path(out)); job.waitForCompletion(true); } } 3)查看生成的文件: hadoop dfs -cat /test/orc/part-m-00000 kafka 14 tensflow 98 hadoop 34 hbase 68 flume 57 kafka 99 kafka 28 flume 24 tensflow 35 flume 44 5、mr写orc文件: 1)代码: package com.fan.hadoop.orc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.orc.OrcConf; import org.apache.orc.TypeDescription; import org.apache.orc.mapred.OrcStruct; import org.apache.orc.mapreduce.OrcOutputFormat; import java.io.IOException; public class OrcWriterMR { public static class OrcWriterMapper extends Mapper<LongWritable,Text,NullWritable,OrcStruct> { private TypeDescription schema = TypeDescription.fromString("struct<name:string,age:int>"); private OrcStruct pair = (OrcStruct) OrcStruct.createValue(schema); private final NullWritable nada = NullWritable.get(); private Text name = new Text(); private IntWritable age = new IntWritable(); public void map(LongWritable key, Text value, Context output ) throws IOException, InterruptedException { if(!"".equals(value.toString())){ String[] arr = value.toString().split("\t"); name.set(arr[0]); age.set(Integer.valueOf(arr[1])); pair.setFieldValue(0, name); pair.setFieldValue(1,age); output.write(nada, pair); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf,"struct<name:string,age:int>"); Job job = Job.getInstance(conf); job.setJarByClass(OrcWriterMR.class); job.setJobName("OrcWriterMR"); String in = "hdfs://localhost:9000/user/work/warehouse/test/ddd.txt"; String out = "hdfs://localhost:9000/test/orc2"; job.setMapperClass(OrcWriterMapper.class); job.setInputFormatClass(TextInputFormat.class); job.setNumReduceTasks(0); job.setOutputFormatClass(OrcOutputFormat.class); FileInputFormat.addInputPath(job, new Path(in)); OrcOutputFormat.setOutputPath(job, new Path(out)); job.waitForCompletion(true); } } 2)查看: #### 生成orc文件 hadoop dfs -ls /test/orc2 -rw-r--r-- 3 work supergroup 0 2017-08-16 17:45 /test/orc2/_SUCCESS -rw-r--r-- 3 work supergroup 6314874 2017-08-16 17:45 /test/orc2/part-m-00000.orc 3)导入到hive: hadoop fs -cp /test/orc2/part-m-00000.orc /user/work/warehouse/test_orc/ hive> select * from test_orc limit 13; OK kafka 14 tensflow 98 hadoop 34 hbase 68 flume 57 kafka 99 kafka 28 flume 24 tensflow 35 flume 44 flume 44 tensflow 35 flume 24 Time taken: 0.045 seconds, Fetched: 13 row(s)
还没有评论,来说两句吧...