flume+kafka+storm的集成 ゞ 浴缸里的玫瑰 2022-03-02 08:49 203阅读 0赞 ## 第一步:启动storm: ## ### 1.1启动storm集群 ### master: python bin/storm nimbus & python bin/storm ui & python bin/storm logviewer & slave: python bin/storm supervisor & python bin/storm logviewer & ### 1.2开发storm+kafka的集成代码: ### stormKafka.java: package stormKafkaPackage; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; public class stormKafka { public static void main(String[] args) throws Exception { String topic = "badou_storm_kafka_test"; String zkRoot = "/badou_storm_kafka_test"; String spoutId = "kafkaSpout"; BrokerHosts brokerHosts = new ZkHosts("master:2181"); SpoutConfig kafkaConf = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId); kafkaConf.forceFromStart = true; kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(kafkaConf); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", kafkaSpout, 2); builder.setBolt("printer", new PrinterBolt()) .shuffleGrouping("spout"); Config config = new Config(); config.setDebug(false); if(args!=null && args.length > 0) { config.setNumWorkers(3); StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } else { config.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("kafka", config, builder.createTopology()); // Thread.sleep(10000); // cluster.shutdown(); } } } PrinterBolt.java: package stormKafkaPackage; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; public class PrinterBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { System.out.println(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer ofd) { } } ### 1.3运行storm程序脚本: ### python /usr/local/src/apache-storm-0.9.3/bin/storm jar \\ /root/IdeaProjects/stormtest/target/stormtest-1.0-SNAPSHOT.jar \\ stormKafkaPackage.stormKafka \\ guoqing\_remote ## 第二步:启动kafka: ./bin/kafka-server-start.sh config/server.properties ## ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzI4Mjg2MDI3_size_16_color_FFFFFF_t_70][] ## 第三步:启动flume: ## ### 3.1编写flume文件: ### ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzI4Mjg2MDI3_size_16_color_FFFFFF_t_70 1][] 3.2启动命令: ./bin/flume-ng agent --conf conf --conf-file ./conf/flume\_kafka.conf --name a1 -Dflume.root.logger=INFO,console 3.3测试: ![20190320092935968.png][] [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzI4Mjg2MDI3_size_16_color_FFFFFF_t_70]: /images/20220302/227afbc2045c492d934480d9c2506483.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzI4Mjg2MDI3_size_16_color_FFFFFF_t_70 1]: /images/20220302/1c926a42cabc4380b5e4706b11b98c14.png [20190320092935968.png]: /images/20220302/44a47900595d4768adb6879f187f7106.png
还没有评论,来说两句吧...