fume+kafka+storm+hbase的集成代码 太过爱你忘了你带给我的痛 2022-03-02 09:44 147阅读 0赞 ### 1.启动hbase(前提启动了zookeeper和hdfs) ### ![20190320101835932.png][] 查看进程:![20190320101920493.png][] 进入hbaseshell终端:![20190320101946422.png][] 查看hbase状态: ![20190320102207765.png][] 查看表列表: ![20190320102241890.png][] 查询new\_music\_table数据: scan 'new\_music\_table' ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzI4Mjg2MDI3_size_16_color_FFFFFF_t_70][] 2.启动kafka: ![20190320103221783.png][] ![20190320103252538.png][] 3.编写storm代码: stormKafka.java: package stormHbase; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import storm.kafka.BrokerHosts; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.StringScheme; import storm.kafka.ZkHosts; public class stormKafka { public static void main(String[] args) throws Exception { String topic = "badou_storm_kafka_test"; String zkRoot = "/badou_storm_kafka_test"; String spoutId = "kafkaSpout"; BrokerHosts brokerHosts = new ZkHosts("master:2181"); SpoutConfig kafkaConf = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId); kafkaConf.forceFromStart = true; kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(kafkaConf); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", kafkaSpout, 2); builder.setBolt("printer", new PrinterBolt()).shuffleGrouping("spout"); Config config = new Config(); config.setDebug(false); if (args != null && args.length > 0) { config.setNumWorkers(3); StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } else { config.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("kafka", config, builder.createTopology()); // Thread.sleep(10000); // cluster.shutdown(); } } } PrinterBolt.java: package stormHbase; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; public class PrinterBolt extends BaseBasicBolt { public static final String TableName = "new_music_table"; //public static final String ColumnFamily = "rec_list"; public static Configuration conf = HBaseConfiguration.create(); private static HTable table; public static void selectRowKey(String tablename, String rowKey) throws IOException { System.out.println("*****************"); // table = new HTable(conf, tablename); System.out.println("*****************"); Get g = new Get(rowKey.getBytes()); System.out.println("*****************"); Result rs = table.get(g); System.out.println("*****************"); System.out.println("==> " + new String(rs.getRow())); /*for (Cell kv : rs.rawCells()) { System.out.println("--------------------" + new String(kv.getRow()) + "----------------------------"); System.out.println("Column Family: " + new String(kv.getFamily())); System.out.println("Column :" + new String(kv.getQualifier())); System.out.println("value : " + new String(kv.getValue())); }*/ } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { System.out.println(tuple.getString(0)); conf.set("hbase.master", "192.168.87.10:60000"); conf.set("hbase.zookeeper.quorum", "192.168.87.10,192.168.87.11,192.168.87.12"); // TODO Auto-generated method stub try { System.out.println("[1]============="); selectRowKey(TableName, tuple.getString(0)); System.out.println("[2]============="); } catch (Exception e) { // TODO Auto-generated catch block System.out.println("[3]============="); System.out.println(tuple); e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer ofd) { } } ### 3.运行storm命令: ### ![20190320102346765.png][] 4.启动flume: ![20190320103400734.png][] ### 5.测试及结果: ### ![20190320102508462.png][] ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzI4Mjg2MDI3_size_16_color_FFFFFF_t_70 1][] [20190320101835932.png]: /images/20220302/e8e461341d2e42f6b0cc43639e5202de.png [20190320101920493.png]: /images/20220302/9f514a3a084544eebd46f6f9b87a4026.png [20190320101946422.png]: /images/20220302/f249920689804a5b87f1eb1c98a10b87.png [20190320102207765.png]: /images/20220302/97a2787aba1d4368846c8a4c08609552.png [20190320102241890.png]: /images/20220302/00a0814233194b60bee510d3479c0d05.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzI4Mjg2MDI3_size_16_color_FFFFFF_t_70]: /images/20220302/97263e3825f44673bf36252d3646f8a6.png [20190320103221783.png]: /images/20220302/f6596b0a0b8949028675b43137db1a75.png [20190320103252538.png]: /images/20220302/7e5292a42989423ebc95ef25d2c70f9a.png [20190320102346765.png]: /images/20220302/5545c120726f4484a04a710adc48717b.png [20190320103400734.png]: /images/20220302/7217d8983a944100b99c889570e73903.png [20190320102508462.png]: /images/20220302/817948d9d3bb44bb9c5d343eeb5733df.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzI4Mjg2MDI3_size_16_color_FFFFFF_t_70 1]: /images/20220302/8b0706d374734b2ab9a1bcd516bef6ce.png
还没有评论,来说两句吧...