7-1、Distributed分布式缓存代码 青旅半醒 2022-08-09 09:48 265阅读 0赞 **分布式缓存数据:** hdfs dfs -text libin/input/distributedDemo.txt hadoop hive hbase kafka spark stome pig sqoop flume elasticsearch docker **输入数据:** hdfs dfs -text libin/input/distributedinput.txt hive|safsdf asdg|dgjgd tryrtyh|Fghfgh spark|dfhgfdh dfg|rtyt storm|fghgh spark|ghff **代码:** package mapreduce.baozi; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; @SuppressWarnings("deprecation") public class DistributedDemo { public static void main(String[] arge) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.1.100:9000"); FileSystem fs = FileSystem.get(conf); fs.delete(new Path("libin/output/distributedout.txt")); conf.set("mapred.job.tracker", "192.168.1.100:9001"); conf.set("mapred.jar","/home/baozi/blb/distributed.jar"); Job job = Job.getInstance(conf, DistributedDemo.class.getSimpleName()); DistributedCache.createSymlink(job.getConfiguration()); try { // HDFS中的libin/input/distributedDemo.txt为分布式缓存 DistributedCache.addCacheFile(new URI("libin/input/distributedDemo.txt"),job.getConfiguration()); } catch (URISyntaxException e1) { e1.printStackTrace(); } job.setMapperClass(DistributedMaper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path("libin/input/distributedinput.txt")); FileOutputFormat.setOutputPath(job, new Path("libin/output/distributedout.txt")); job.waitForCompletion(true); } public static class DistributedMaper extends Mapper<LongWritable, Text, Text, Text> { String[] splitedValue; String info; private List<String> DistributediList = new ArrayList<String>(); protected void setup(Context context) throws IOException, InterruptedException { try { Path[] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration()); if (cacheFiles != null && cacheFiles.length > 0) { String line; BufferedReader br = new BufferedReader(new FileReader(cacheFiles[0].toString())); try { line = br.readLine(); while ((line = br.readLine()) != null) { DistributediList.add(line); } } finally { br.close(); } } } catch (IOException e) { System.err.println("Exception reading DistributedCache: " + e); } } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { try { splitedValue = value.toString().split("\\|"); info = splitedValue[0]; if (DistributediList.contains(info)) { context.write(new Text(splitedValue[0]), value); } } catch (Exception ex) { } } } } **运行:** hadoop jar distributed.jar 15/10/19 09:18:13 INFO Configuration.deprecation: fs.default.name is deprecated. Instead, use fs.defaultFS 15/10/19 09:18:15 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address 15/10/19 09:18:15 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar 15/10/19 09:18:15 INFO client.RMProxy: Connecting to ResourceManager at sh-rslog1/27.115.29.102:8032 15/10/19 09:18:15 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 15/10/19 09:18:16 INFO input.FileInputFormat: Total input paths to process : 1 15/10/19 09:18:16 INFO mapreduce.JobSubmitter: number of splits:1 15/10/19 09:18:16 INFO Configuration.deprecation: fs.default.name is deprecated. Instead, use fs.defaultFS 15/10/19 09:18:16 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1435558921826_11703 15/10/19 09:18:16 INFO impl.YarnClientImpl: Submitted application application_1435558921826_11703 15/10/19 09:18:16 INFO mapreduce.Job: The url to track the job: http://sh-rslog1:8088/proxy/application_1435558921826_11703/ 15/10/19 09:18:16 INFO mapreduce.Job: Running job: job_1435558921826_11703 15/10/19 09:18:24 INFO mapreduce.Job: Job job_1435558921826_11703 running in uber mode : false 15/10/19 09:18:24 INFO mapreduce.Job: map 0% reduce 0% 15/10/19 09:18:32 INFO mapreduce.Job: map 100% reduce 0% 15/10/19 09:18:40 INFO mapreduce.Job: map 100% reduce 100% 15/10/19 09:18:40 INFO mapreduce.Job: Job job_1435558921826_11703 completed successfully 15/10/19 09:18:40 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=68 FILE: Number of bytes written=213001 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=220 HDFS: Number of bytes written=54 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=5410 Total time spent by all reduces in occupied slots (ms)=12318 Total time spent by all map tasks (ms)=5410 Total time spent by all reduce tasks (ms)=6159 Total vcore-seconds taken by all map tasks=5410 Total vcore-seconds taken by all reduce tasks=6159 Total megabyte-seconds taken by all map tasks=8309760 Total megabyte-seconds taken by all reduce tasks=15767040 Map-Reduce Framework Map input records=7 Map output records=3 Map output bytes=54 Map output materialized bytes=60 Input split bytes=136 Combine input records=0 Combine output records=0 Reduce input groups=2 Reduce shuffle bytes=60 Reduce input records=3 Reduce output records=3 Spilled Records=6 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=58 CPU time spent (ms)=4200 Physical memory (bytes) snapshot=1125724160 Virtual memory (bytes) snapshot=5105754112 Total committed heap usage (bytes)=2022703104 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=84 File Output Format Counters Bytes Written=54 **查看结果:** hdfs dfs -text libin/output/distributedout.txt/* hive hive|safsdf spark spark|ghff spark spark|dfhgfdh
还没有评论,来说两句吧...