mapreduce之partition分区

短命女 2022-08-25 12:51 308阅读 0赞

听了超哥的一席课后逐渐明白了partition,记录一下自己的理解!(thanks 超哥)

  1. package partition;
  2. import java.io.IOException;
  3. import java.net.URI;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.FileSystem;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  16. /**
  17. * @ClassName: FlowCount2
  18. * @Description: TODO(这里用一句话描述这个类的作用)
  19. * @author zhangweixiang
  20. * @date 2014年3月6日 下午3:27:56
  21. */
  22. /**
  23. * 分区的例子必须打成jar运行
  24. * 用处: 1.根据业务需要,产生多个输出文件
  25. * 2.多个reduce任务在运行,提高整体job的运行效率
  26. */
  27. public class FlowCount2 {
  28. public static final String INPUT_PATH = "hdfs://192.168.0.9:9000/wlan2";
  29. public static final String OUT_PATH = "hdfs://192.168.0.9:9000/myout";
  30. public static void main(String[] args) throws Exception {
  31. Configuration conf = new Configuration();
  32. Job job = new Job(conf, FlowCount2.class.getSimpleName());
  33. //指定打包的jar
  34. job.setJarByClass(FlowCount2.class);
  35. // 1.1指定输入文件的路径
  36. FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
  37. // 指定输入信息的格式化类
  38. job.setInputFormatClass(TextInputFormat.class);
  39. // 1.2指定自定义map类
  40. job.setMapperClass(MyMapper.class);
  41. // 设置map输出类型
  42. job.setMapOutputKeyClass(Text.class);
  43. job.setMapOutputValueClass(FlowWritable.class);
  44. // 1.3指定分区
  45. job.setPartitionerClass(MyPartition.class);
  46. // 设置reduce的任务个数,由于map输出后建立了两个分区,所以应该设置两个reduce任务输出到不同的文件(一个分区对应一个reduce任务)
  47. job.setNumReduceTasks(2);
  48. // 1.4排序,分组
  49. // 1.5规约
  50. // 2.2指定自定义的reduce类
  51. job.setReducerClass(MyReduce.class);
  52. // 设置输出类型
  53. job.setOutputKeyClass(Text.class);
  54. job.setOutputValueClass(FlowWritable.class);
  55. // 设置输出格式化类
  56. job.setOutputFormatClass(TextOutputFormat.class);
  57. // 如果输出文件路径存在则删除
  58. FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH),
  59. new Configuration());
  60. Path path = new Path(OUT_PATH);
  61. if (fileSystem.exists(path)) {
  62. fileSystem.delete(path, true);
  63. }
  64. // 2.3指定输出路径
  65. FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
  66. // 提交任务
  67. job.waitForCompletion(true);
  68. }
  69. static class MyMapper extends
  70. Mapper<LongWritable, Text, Text, FlowWritable> {
  71. @Override
  72. protected void map(LongWritable key, Text value, Context context)
  73. throws IOException, InterruptedException {
  74. // 分割行
  75. String[] split = value.toString().split("\t");
  76. // 获取用户电话号码
  77. String mobile = "";
  78. long upPackNum = 0l;
  79. long downPackNum = 0l;
  80. long upPayLoad = 0l;
  81. long downPayLoad = 0l;
  82. // 符合规范的电话号码
  83. if (!("".equals(split[2]))) {
  84. mobile = split[2];
  85. // 获取流量信息
  86. if (!("".equals(split[21]))) {
  87. upPackNum = Long.parseLong(split[21]);
  88. }
  89. if (!("".equals(split[22]))) {
  90. downPackNum = Long.parseLong(split[22]);
  91. }
  92. if (!("".equals(split[23]))) {
  93. upPayLoad = Long.parseLong(split[23]);
  94. }
  95. if (!("".equals(split[24]))) {
  96. downPayLoad = Long.parseLong(split[24]);
  97. }
  98. FlowWritable flowWritable = new FlowWritable(upPackNum,
  99. downPackNum, upPayLoad, downPayLoad);
  100. context.write(new Text(mobile), flowWritable);
  101. }
  102. }
  103. }
  104. static class MyReduce extends
  105. Reducer<Text, FlowWritable, Text, FlowWritable> {
  106. @Override
  107. protected void reduce(Text k2, Iterable<FlowWritable> v2s,
  108. Context context) throws IOException, InterruptedException {
  109. long upPackNum = 0l;
  110. long downPackNum = 0l;
  111. long upPayLoad = 0l;
  112. long downPayLoad = 0l;
  113. for (FlowWritable flowWritable : v2s) {
  114. upPackNum += flowWritable.upPackNum;
  115. downPackNum += flowWritable.downPackNum;
  116. upPayLoad += flowWritable.upPayLoad;
  117. downPayLoad += flowWritable.downPayLoad;
  118. }
  119. FlowWritable flowWritable = new FlowWritable(upPackNum,
  120. downPackNum, upPayLoad, downPayLoad);
  121. context.write(k2, flowWritable);
  122. }
  123. }
  124. }
  125. package partition;
  126. import java.io.DataInput;
  127. import java.io.DataOutput;
  128. import java.io.IOException;
  129. import org.apache.hadoop.io.Writable;
  130. /**
  131. * @ClassName: flowWritable
  132. * @Description: 自定义类型实现Writable接口,包含四个参数(upPackNum 上行包, downPackNum 下行包,
  133. * upPayLoad 发送流量,downPayLoad 下载流量)
  134. * @author zhangweixiang
  135. * @date 2014年3月5日 上午11:37:10
  136. */
  137. public class FlowWritable implements Writable {
  138. public long upPackNum;
  139. public long downPackNum;
  140. public long upPayLoad;
  141. public long downPayLoad;
  142. public FlowWritable() {
  143. // TODO Auto-generated constructor stub
  144. }
  145. public FlowWritable(long upPackNum, long downPackNum, long upPayLoad,
  146. long downPayLoad) {
  147. this.upPackNum = upPackNum;
  148. this.downPackNum = downPackNum;
  149. this.upPayLoad = upPayLoad;
  150. this.downPayLoad = downPayLoad;
  151. }
  152. @Override
  153. public void write(DataOutput out) throws IOException {
  154. out.writeLong(upPackNum);
  155. out.writeLong(downPackNum);
  156. out.writeLong(upPackNum);
  157. out.writeLong(downPayLoad);
  158. }
  159. @Override
  160. public void readFields(DataInput in) throws IOException {
  161. this.upPackNum = in.readLong();
  162. this.downPackNum = in.readLong();
  163. this.upPayLoad = in.readLong();
  164. this.downPayLoad = in.readLong();
  165. }
  166. /*
  167. * (非 Javadoc)
  168. *
  169. *
  170. * @return
  171. *
  172. * @see java.lang.Object#toString()
  173. */
  174. @Override
  175. public String toString() {
  176. return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t"
  177. + downPayLoad;
  178. }
  179. }
  180. package partition;
  181. import org.apache.hadoop.io.Text;
  182. import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
  183. /**
  184. * @ClassName: MyPartition
  185. * @Description: 根据电话号码分区,正规号码分区代号为0,非正规号码分区为1(在此建立了两个分区,即会产生两个reduce任务输出到不同的文件0和1)
  186. * @param K k2(map输出的键), V v2(map输出的值)
  187. * @author zhangweixiang
  188. * @date 2014年3月6日 下午3:02:29
  189. */
  190. public class MyPartition extends HashPartitioner<Text,FlowWritable>{
  191. @Override
  192. public int getPartition(Text key, FlowWritable value, int numReduceTasks) {
  193. int p=0;
  194. if(key.toString().length()!=11){
  195. p=1;
  196. }
  197. return p;
  198. }
  199. }

注:必须要达成jar包上传到linux下执行(我开始没有打成jar包直接在eclipse下执行抛了异常)

执行完成后会产生两个文件(part-r-00000和part-r-00001)分别记录不同条件的信息。

eclipse直接运行抛的异常:

14/03/06 15:41:13 WARN mapred.LocalJobRunner: job_local_0001
java.io.IOException: Illegal partition for 10.80.203.79 (1)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1073)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:691)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at partition.FlowCount2$MyMapper.map(FlowCount2.java:120)
at partition.FlowCount2$MyMapper.map(FlowCount2.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:214)
14/03/06 15:41:14 INFO mapred.JobClient: map 0% reduce 0%
14/03/06 15:41:14 INFO mapred.JobClient: Job complete: job_local_0001
14/03/06 15:41:14 INFO mapred.JobClient: Counters: 0

记录超哥的总结:

  1. 分区的例子必须打成jar运行
  2. * 用处:
  3. *1.根据业务需要,产生多个输出文件
  4. * 2.多个reduce任务在运行,提高整体job的运行效率

发表评论

表情:
评论列表 (有 0 条评论,308人围观)

还没有评论,来说两句吧...

相关阅读

    相关 hive分区partition介绍

    一、简要概述 1. Hive分区更方便于数据管理,常见的有时间分区和业务分区。 二、hive分区原理 通过实例来理解Hive分区的原理: (一)多分区操作: 1.

    相关 Hive partition 分区

    分区表实际上就是对应一个HDFS文件系统上的独立的文件夹,该文件夹下是该分区所有的数据文件。Hive中的分区就是分目录,把一个大的数据集根据业务需要分割成小的数据集。在查询时通

    相关 MySQL数据库:分区Partition

    一、分区: 分区就是将表的数据按照特定规则存放在不同的区域,也就是将表的数据文件分割成多个小块,在查询数据的时候,只要知道数据数据存储在哪些区域,然后直接在对应的区域进行