Hadoop 之 Hbase 配置与使用(四)

àì夳堔傛蜴生んèń 2023-10-13 19:46 123阅读 0赞

Hadoop 之 Hbase 配置与使用

  • 一.Hbase 下载
    • 1.Hbase 下载
  • 二.Hbase 配置
    • 1.单机部署
    • 2.伪集群部署(基于单机配置)
    • 3.集群部署
      • 1.启动 hadoop 集群
      • 2.启动 zookeeper 集群
      • 3.启动 hbase 集群
      • 4.集群启停脚本
  • 三.测试
    • 1.Pom 配置
    • 2.Yml 配置
    • 3.Hbase 配置类
    • 4.Hbase 连接池配置
    • 5.测试类
    • 6.启动类
    • 7.测试

一.Hbase 下载

HBase 是一个分布式的、面向列的开源数据库:Hbase API

1.Hbase 下载

Hbase 下载

在这里插入图片描述

跳转到下载链接

在这里插入图片描述

二.Hbase 配置

1.单机部署

  1. ## 1.创建安装目录
  2. mkdir -p /usr/local/hbase
  3. ## 2.将压缩包拷贝到虚拟机并解压缩
  4. tar zxvf hbase-3.0.0-alpha-4-bin.tar.gz -C /usr/local/hbase/
  5. ## 3.添加环境变量
  6. echo 'export HBASE_HOME=/usr/local/hbase/hbase-3.0.0-alpha-4' >> /etc/profile
  7. echo 'export PATH=${HBASE_HOME}/bin:${PATH}' >> /etc/profile
  8. source /etc/profile
  9. ## 4.指定 JDK 版本
  10. echo 'export JAVA_HOME=/usr/local/java/jdk-11.0.19' >> $HBASE_HOME/conf/hbase-env.sh
  11. ## 5.创建 hbase 存储目录
  12. mkdir -p /home/hbase/data
  13. ## 6.修改配置
  14. vim $HBASE_HOME/conf/hbase-site.xml
  15. 添加如下信息
  16. <property>
  17. <name>hbase.rootdir</name>
  18. <value>file:///home/hbase/data</value>
  19. </property>

在这里插入图片描述

  1. ## 1.进入安装目录
  2. cd $HBASE_HOME
  3. ## 2.启动服务
  4. ./bin/start-hbase.sh

在这里插入图片描述

  1. ## 1.进入安装目录
  2. cd $HBASE_HOME
  3. ## 2.关闭服务
  4. ./bin/stop-hbase.sh

在这里插入图片描述

2.伪集群部署(基于单机配置)

  1. ## 1.修改 hbase-env.sh
  2. echo 'export JAVA_HOME=/usr/local/java/jdk-11.0.19' >> $HBASE_HOME/conf/hbase-env.sh
  3. echo 'export HBASE_MANAGES_ZK=true' >> $HBASE_HOME/conf/hbase-env.sh
  4. ## 2.修改 hbase_site.xml
  5. vim $HBASE_HOME/conf/hbase-site.xml
  6. <!-- hbase 数据保存到 hdfs -->
  7. <property>
  8. <name>hbase.rootdir</name>
  9. <value>hdfs://nn/hbase</value>
  10. </property>
  11. <!-- 分布式配置 -->
  12. <property>
  13. <name>hbase.cluster.distributed</name>
  14. <value>true</value>
  15. </property>
  16. <!-- 配置 ZK 地址 -->
  17. <property>
  18. <name>hbase.zookeeper.quorum</name>
  19. <value>nn</value>
  20. </property>
  21. <!-- 配置 JK 地址 -->
  22. <property>
  23. <name>dfs.replication</name>
  24. <value>1</value>
  25. </property>
  26. ## 3.修改 regionservers 的 localhost 为 nn
  27. echo nn > $HBASE_HOME/conf/regionservers

在这里插入图片描述

  1. ## 1.进入安装目录
  2. cd $HADOOP_HOME
  3. ## 2.启动 hadoop 服务
  4. ./sbin/start-all.sh

在这里插入图片描述

  1. ## 1.进入安装目录
  2. cd $HBASE_HOME
  3. ## 2.启动服务
  4. ./bin/start-hbase.sh

在这里插入图片描述

  1. ## 1.进入安装目录
  2. cd $HBASE_HOME
  3. ## 2.关闭主节点服务(直接关服务是关不掉的,如图)
  4. . bin/hbase-daemon.sh stop master
  5. ## 3.关闭服务
  6. ./bin/stop-hbase.sh

在这里插入图片描述

3.集群部署

  1. ## 1.创建 zookeeper 数据目录
  2. mkdir -p $HBASE_HOME/zookeeper/data
  3. ## 2.进入安装目录
  4. cd $HBASE_HOME/conf
  5. ## 3.修改环境配置
  6. vim hbase-env.sh
  7. ## 添加 JDK / 启动外置 Zookeeper
  8. # JDK
  9. export JAVA_HOME=/usr/local/java/jdk-11.0.19
  10. # Disable Zookeeper
  11. export HBASE_MANAGES_ZK=false
  12. ## 4.修改 hbase-site.xml
  13. vim hbase-site.xml
  14. ## 配置如下信息
  15. <!--允许的最大同步时钟偏移-->
  16. <property>
  17. <name>hbase.master.maxclockskew</name>`
  18. <value>6000</value>
  19. </property>
  20. <!--配置 HDFS 存储实例-->
  21. <property>
  22. <name>hbase.rootdir</name>
  23. <value>hdfs://nn:9000/hbase</value>
  24. </property>
  25. <!--启用分布式配置-->
  26. <property>
  27. <name>hbase.cluster.distributed</name>
  28. <value>true</value>
  29. </property>
  30. <!--配置 zookeeper 集群节点-->
  31. <property>
  32. <name>hbase.zookeeper.quorum</name>
  33. <value>zk1,zk2,zk3</value>
  34. </property>
  35. <!--配置 zookeeper 数据目录-->
  36. <property>
  37. <name>hbase.zookeeper.property.dataDir</name>
  38. <value>/usr/local/hbase/hbase-3.0.0-alpha-4/zookeeper/data</value>
  39. </property>
  40. <!-- Server is not running yet -->
  41. <property>
  42. <name>hbase.wal.provider</name>
  43. <value>filesystem</value>
  44. </property>
  45. ## 5.清空 regionservers 并添加集群节点域名
  46. echo '' > regionservers
  47. echo 'nn' >> regionservers
  48. echo 'nd1' >> regionservers
  49. echo 'nd2' >> regionservers
  50. ## 6.分别为 nd1 / nd2 创建 hbase 目录
  51. mkdir -p /usr/local/hbase
  52. ## 7.分发 hbase 配置到另外两台虚拟机 nd1 / nd2
  53. scp -r /usr/local/hbase/hbase-3.0.0-alpha-4 root@nd1:/usr/local/hbase
  54. scp -r /usr/local/hbase/hbase-3.0.0-alpha-4 root@nd2:/usr/local/hbase
  55. ## 8.分发环境变量配置
  56. scp /etc/profile root@nd1:/etc/profile
  57. scp /etc/profile root@nd2:/etc/profile

1.启动 hadoop 集群

Hadoop 集群搭建参考:Hadoop 搭建

  1. ## 1.启动 hadoop
  2. cd $HADOOP_HOME
  3. . sbin/start-all.sh

在这里插入图片描述

  1. ## 1.关闭 hadoop 安全模式
  2. hadoop dfsadmin -safemode leave

在这里插入图片描述

2.启动 zookeeper 集群

ZOOKEEPER 集群搭建说明

  1. ## 1.启动 zookeeper 集群
  2. zkServer.sh start && ssh root@zk2 "source /etc/profile && zkServer.sh start && exit" && ssh root@zk3 "source /etc/profile && zkServer.sh start && exit"
  3. ## 2.查看状态
  4. zkServer.sh status && ssh root@zk2 "source /etc/profile && zkServer.sh status && exit" && ssh root@zk3 "source /etc/profile && zkServer.sh status && exit"

在这里插入图片描述

3.启动 hbase 集群

  1. ## 1.分别为 nn /nd1 / nd2 配置 zookeeper 域名解析
  2. echo '192.168.1.100 zk1' >> /etc/hosts
  3. echo '192.168.1.101 zk2' >> /etc/hosts
  4. echo '192.168.1.102 zk3' >> /etc/hosts
  5. ## 2.启动 habase
  6. cd $HBASE_HOME
  7. . bin/start-hbase.sh
  8. ## 3.停止服务
  9. . bin/hbase-daemon.sh stop master
  10. . bin/hbase-daemon.sh stop regionserver
  11. . bin/stop-hbase.sh

在这里插入图片描述

查看 UI 监控:http://192.168.1.6:16010/master-status

在这里插入图片描述

4.集群启停脚本

  1. #!/bin/bash
  2. case $1 in
  3. "start")
  4. ## start hadoop
  5. start-all.sh
  6. ## start zookeeper (先配置免密登录)
  7. zkServer.sh start && ssh root@zk2 "source /etc/profile && zkServer.sh start && exit" && ssh root@zk3 "source /etc/profile && zkServer.sh start && exit"
  8. ## start hbase
  9. start-hbase.sh
  10. ;;
  11. "stop")
  12. ## stop hbase
  13. ssh root@nd1 "source /etc/profile && hbase-daemon.sh stop regionserver && stop-hbase.sh && exit"
  14. ssh root@nd2 "source /etc/profile && hbase-daemon.sh stop regionserver && stop-hbase.sh && exit"
  15. hbase-daemon.sh stop master && hbase-daemon.sh stop regionserver && stop-hbase.sh
  16. ## stop zookeeper
  17. zkServer.sh stop && ssh root@zk2 "source /etc/profile && zkServer.sh stop && exit" && ssh root@zk3 "source /etc/profile && zkServer.sh stop && exit"
  18. ## stop hadoop
  19. stop-all.sh
  20. ;;
  21. *)
  22. echo "pls inout start|stop"
  23. ;;
  24. esac

三.测试

  1. ## 1.为 Windows 增加 Hosts 配置,添加 Hbase 集群域名解析 编辑如下文件
  2. C:\Windows\System32\drivers\etc\hosts
  3. ## 2.增加如下信息
  4. 192.168.1.6 nn
  5. 192.168.1.7 nd1
  6. 192.168.1.8 nd2

测试配置效果

在这里插入图片描述

JDK 版本

在这里插入图片描述

工程结构

在这里插入图片描述

1.Pom 配置

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.example</groupId>
  7. <artifactId>hbase-demo</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>11</maven.compiler.source>
  11. <maven.compiler.target>11</maven.compiler.target>
  12. <spring.version>2.7.8</spring.version>
  13. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  14. </properties>
  15. <dependencies>
  16. <dependency>
  17. <groupId>org.springframework.boot</groupId>
  18. <artifactId>spring-boot-starter-web</artifactId>
  19. <version>${spring.version}</version>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.projectlombok</groupId>
  23. <artifactId>lombok</artifactId>
  24. <version>1.18.28</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>com.alibaba</groupId>
  28. <artifactId>fastjson</artifactId>
  29. <version>2.0.32</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.apache.hbase</groupId>
  33. <artifactId>hbase-client</artifactId>
  34. <version>3.0.0-alpha-4</version>
  35. </dependency>
  36. </dependencies>
  37. </project>

2.Yml 配置

  1. hbase:
  2. zookeeper:
  3. quorum: 192.168.1.100,192.168.1.101,192.168.1.102
  4. property:
  5. clientPort: 2181
  6. master:
  7. ip: 192.168.1.6
  8. port: 16000

3.Hbase 配置类

  1. package org.example.config;
  2. import org.apache.hadoop.hbase.HBaseConfiguration;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. /**
  7. * @author Administrator
  8. * @Description
  9. * @create 2023-07-25 0:26
  10. */
  11. @Configuration
  12. public class HbaseConfig {
  13. @Value("${hbase.zookeeper.quorum}")
  14. private String zookeeperQuorum;
  15. @Value("${hbase.zookeeper.property.clientPort}")
  16. private String clientPort;
  17. @Value("${hbase.master.ip}")
  18. private String ip;
  19. @Value("${hbase.master.port}")
  20. private int masterPort;
  21. @Bean
  22. public org.apache.hadoop.conf.Configuration hbaseConfiguration(){
  23. org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
  24. conf.set("hbase.zookeeper.quorum",zookeeperQuorum);
  25. conf.set("hbase.zookeeper.property.clientPort",clientPort);
  26. conf.set("hbase.masters", ip + ":" + masterPort);
  27. conf.set("hbase.client.keyvalue.maxsize","20971520");
  28. return HBaseConfiguration.create(conf);
  29. }
  30. }

4.Hbase 连接池配置

  1. package org.example.config;
  2. import lombok.Data;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.hbase.client.Connection;
  6. import org.apache.hadoop.hbase.client.ConnectionFactory;
  7. import org.springframework.stereotype.Component;
  8. import javax.annotation.PostConstruct;
  9. import javax.annotation.Resource;
  10. import java.util.Enumeration;
  11. import java.util.Vector;
  12. /**
  13. * @author Administrator
  14. * @Description
  15. * @create 2023-07-25 22:39
  16. */
  17. @Slf4j
  18. @Component
  19. public class HbaseConnectionPool {
  20. /**
  21. * 连接池的初始大小
  22. * 连接池的创建步长
  23. * 连接池最大的大小
  24. */
  25. private int nInitConnectionAmount = 3;
  26. private int nIncrConnectionAmount = 3;
  27. private int nMaxConnections = 20;
  28. /**
  29. * 存放连接池中数据库连接的向量
  30. */
  31. private Vector vcConnections = new Vector();
  32. /**
  33. * 注入连接配置
  34. */
  35. @Resource
  36. private Configuration hbaseConfiguration;
  37. /**
  38. * 初始化连接
  39. */
  40. @PostConstruct
  41. public void init() {
  42. createConnections(nInitConnectionAmount);
  43. }
  44. /**
  45. * 获取可用连接
  46. * @return
  47. */
  48. public synchronized Connection getConnection() {
  49. Connection conn;
  50. while (null == (conn =getFreeConnection())){
  51. try {
  52. wait(1000);
  53. } catch (InterruptedException e) {
  54. e.printStackTrace();
  55. }
  56. }
  57. // 返回获得的可用的连接
  58. return conn;
  59. }
  60. /**
  61. * 释放连接
  62. * @param conn
  63. */
  64. public synchronized void releaseConnection(Connection conn) {
  65. ConnectionWrapper connWrapper;
  66. Enumeration enumerate = this.vcConnections.elements();
  67. while(enumerate.hasMoreElements()) {
  68. connWrapper = (ConnectionWrapper) enumerate.nextElement();
  69. if (conn == connWrapper.getConnection()) {
  70. connWrapper.setBusy(false);
  71. break;
  72. }
  73. }
  74. }
  75. /**
  76. * 获取可用连接 当前无可用连接则创建 如果已达到最大连接数则返回 null 阻塞后重试获取
  77. * @return
  78. */
  79. private Connection getFreeConnection() {
  80. Connection conn;
  81. if (null == (conn = findFreeConnection())) {
  82. // 创建新连接
  83. createConnections(nIncrConnectionAmount);
  84. // 查看是否有可用连接
  85. if (null == (conn = findFreeConnection())) {
  86. return null;
  87. }
  88. }
  89. return conn;
  90. }
  91. /**
  92. * 查找可用连接
  93. * @return
  94. */
  95. private Connection findFreeConnection() {
  96. ConnectionWrapper connWrapper;
  97. //遍历向量内连接对象
  98. Enumeration enumerate = vcConnections.elements();
  99. while (enumerate.hasMoreElements()) {
  100. connWrapper = (ConnectionWrapper) enumerate.nextElement();
  101. //判断当前连接是否被占用
  102. if (!connWrapper.isBusy()) {
  103. connWrapper.setBusy(true);
  104. return connWrapper.getConnection();
  105. }
  106. }
  107. // 返回 NULL
  108. return null;
  109. }
  110. /**
  111. * 创建新连接
  112. * @param counts
  113. */
  114. private void createConnections(int counts) {
  115. // 循环创建指定数目的数据库连接
  116. try {
  117. for (int i = 0; i < counts; i++) {
  118. if (this.nMaxConnections > 0 && this.vcConnections.size() >= this.nMaxConnections) {
  119. log.warn("已达到最大连接数...");
  120. break;
  121. }
  122. // 创建一个新连接并加到向量
  123. vcConnections.addElement(new ConnectionWrapper(newConnection()));
  124. }
  125. } catch (Exception e) {
  126. log.error("创建连接失败...");
  127. }
  128. }
  129. /**
  130. * 创建新连接
  131. * @return
  132. */
  133. private Connection newConnection() {
  134. /** hbase 连接 */
  135. Connection conn = null;
  136. // 创建一个数据库连接
  137. try {
  138. conn = ConnectionFactory.createConnection(hbaseConfiguration);
  139. } catch (Exception e) {
  140. log.error("HBase 连接失败...");
  141. }
  142. // 返回创建的新的数据库连接
  143. return conn;
  144. }
  145. /**
  146. * 封装连接对象
  147. */
  148. @Data
  149. class ConnectionWrapper {
  150. /**
  151. * 数据库连接
  152. */
  153. private Connection connection;
  154. /**
  155. * 此连接是否正在使用的标志,默认没有正在使用
  156. */
  157. private boolean busy = false;
  158. /**
  159. * 构造函数,根据一个 Connection 构告一个 PooledConnection 对象
  160. */
  161. public ConnectionWrapper(Connection connection) {
  162. this.connection = connection;
  163. }
  164. }
  165. }

5.测试类

  1. package org.example.controller;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.hadoop.hbase.Cell;
  4. import org.apache.hadoop.hbase.CompareOperator;
  5. import org.apache.hadoop.hbase.TableName;
  6. import org.apache.hadoop.hbase.client.*;
  7. import org.apache.hadoop.hbase.filter.ColumnValueFilter;
  8. import org.apache.hadoop.hbase.filter.Filter;
  9. import org.apache.hadoop.hbase.util.Bytes;
  10. import org.example.config.HbaseConnectionPool;
  11. import org.springframework.web.bind.annotation.GetMapping;
  12. import org.springframework.web.bind.annotation.RequestMapping;
  13. import org.springframework.web.bind.annotation.RestController;
  14. import javax.annotation.Resource;
  15. import java.io.IOException;
  16. import java.util.*;
  17. /**
  18. * @author Administrator
  19. *
  20. * 可利用 aop 进行连接获取和释放处理
  21. *
  22. * @Description
  23. * @create 2023-07-25 23:06
  24. */
  25. @Slf4j
  26. @RestController
  27. @RequestMapping("/hbase")
  28. public class HbaseController {
  29. @Resource
  30. private HbaseConnectionPool pool;
  31. /**
  32. * 表名
  33. */
  34. private String tbl_user = "tbl_user";
  35. /**
  36. * 创建表(不允许重复创建)
  37. */
  38. @GetMapping("/create")
  39. public void createTable(){
  40. Connection conn = null;
  41. //获取连接
  42. try {
  43. conn = pool.getConnection();
  44. Admin admin = conn.getAdmin();
  45. TableName tableName = TableName.valueOf(tbl_user);
  46. if (!admin.tableExists(tableName)){
  47. //指定表名
  48. TableDescriptorBuilder tdb_user = TableDescriptorBuilder.newBuilder(tableName);
  49. //添加列族(info,data)
  50. ColumnFamilyDescriptor hcd_info = ColumnFamilyDescriptorBuilder.of("name");
  51. ColumnFamilyDescriptor hcd_data = ColumnFamilyDescriptorBuilder.of("age");
  52. tdb_user.setColumnFamily(hcd_info);
  53. tdb_user.setColumnFamily(hcd_data);
  54. //创建表
  55. TableDescriptor td = tdb_user.build();
  56. admin.createTable(td);
  57. }
  58. } catch (IOException e) {
  59. throw new RuntimeException(e);
  60. } finally {
  61. if (null != conn){
  62. pool.releaseConnection(conn);
  63. }
  64. }
  65. }
  66. /**
  67. * 删除表(不允许删除不存在的表)
  68. */
  69. @GetMapping("/drop")
  70. public void dropTable(){
  71. Connection conn = null;
  72. try {
  73. conn = pool.getConnection();
  74. Admin admin = conn.getAdmin();
  75. TableName tableName = TableName.valueOf(tbl_user);
  76. if (admin.tableExists(tableName)){
  77. admin.disableTable(tableName);
  78. admin.deleteTable(tableName);
  79. }
  80. } catch (IOException e) {
  81. throw new RuntimeException(e);
  82. } finally {
  83. if (null != conn){
  84. pool.releaseConnection(conn);
  85. }
  86. }
  87. }
  88. /**
  89. * 插入测试
  90. */
  91. @GetMapping("/insert")
  92. public void insert(){
  93. log.info("---插入一列数据---1");
  94. putData(tbl_user, "row1", "name", "a", "zhangSan");
  95. putData(tbl_user, "row1", "age", "a", "18");
  96. log.info("---插入多列数据---2");
  97. putData(tbl_user, "row2", "name",
  98. Arrays.asList("a", "b", "c"), Arrays.asList("liSi", "wangWu", "zhaoLiu"));
  99. log.info("---插入多列数据---3");
  100. putData(tbl_user, "row3", "age",
  101. Arrays.asList("a", "b", "c"), Arrays.asList("18","19","20"));
  102. log.info("---插入多列数据---4");
  103. putData(tbl_user, "row4", "age",
  104. Arrays.asList("a", "b", "c"), Arrays.asList("30","19","20"));
  105. }
  106. /**
  107. * 插入数据(单条)
  108. * @param tableName 表名
  109. * @param rowKey rowKey
  110. * @param columnFamily 列族
  111. * @param column 列
  112. * @param value 值
  113. * @return true/false
  114. */
  115. public boolean putData(String tableName, String rowKey, String columnFamily, String column,
  116. String value) {
  117. return putData(tableName, rowKey, columnFamily, Arrays.asList(column),
  118. Arrays.asList(value));
  119. }
  120. /**
  121. * 插入数据(批量)
  122. * @param tableName 表名
  123. * @param rowKey rowKey
  124. * @param columnFamily 列族
  125. * @param columns 列
  126. * @param values 值
  127. * @return true/false
  128. */
  129. public boolean putData(String tableName, String rowKey, String columnFamily,
  130. List<String> columns, List<String> values) {
  131. Connection conn = null;
  132. try {
  133. conn = pool.getConnection();
  134. Table table = conn.getTable(TableName.valueOf(tableName));
  135. Put put = new Put(Bytes.toBytes(rowKey));
  136. for (int i=0; i<columns.size(); i++) {
  137. put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columns.get(i)), Bytes.toBytes(values.get(i)));
  138. }
  139. table.put(put);
  140. table.close();
  141. return true;
  142. } catch (IOException e) {
  143. e.printStackTrace();
  144. return false;
  145. } finally {
  146. if (null != conn){
  147. pool.releaseConnection(conn);
  148. }
  149. }
  150. }
  151. /**
  152. * 查询测试
  153. */
  154. @GetMapping("/query")
  155. public void getResultScanner(){
  156. log.info("全表数据:{}",getData(tbl_user));
  157. log.info("过滤器,按年龄 [18]:{}",getData(tbl_user,new ColumnValueFilter(Bytes.toBytes("age"), Bytes.toBytes("a"), CompareOperator.EQUAL, Bytes.toBytes("18"))));
  158. log.info("根据 rowKey [row1]:{}",getData(tbl_user,"row1"));
  159. log.info("根据 rowKey 列族 列 [row2 name a]:{}",getData(tbl_user,"row2","name","a"));
  160. }
  161. /**
  162. * 获取数据(全表数据)
  163. * @param tableName 表名
  164. * @return map
  165. */
  166. public List<Map<String, String>> getData(String tableName) {
  167. List<Map<String, String>> list = new ArrayList<>();
  168. Connection conn = null;
  169. try {
  170. conn = pool.getConnection();
  171. Table table = conn.getTable(TableName.valueOf(tableName));
  172. Scan scan = new Scan();
  173. ResultScanner resultScanner = table.getScanner(scan);
  174. for(Result result : resultScanner) {
  175. HashMap<String, String> map = new HashMap<>(result.listCells().size());
  176. map.put("row", Bytes.toString(result.getRow()));
  177. for (Cell cell : result.listCells()) {
  178. //列族
  179. String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
  180. //列
  181. String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
  182. //值
  183. String data = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
  184. map.put(family + ":" + qualifier, data);
  185. }
  186. list.add(map);
  187. }
  188. table.close();
  189. } catch (IOException e) {
  190. e.printStackTrace();
  191. } finally {
  192. if (null != conn){
  193. pool.releaseConnection(conn);
  194. }
  195. }
  196. return list;
  197. }
  198. /**
  199. * 获取数据(根据 filter)
  200. * @param tableName 表名
  201. * @param filter 过滤器
  202. * @return map
  203. */
  204. public List<Map<String, String>> getData(String tableName, Filter filter) {
  205. List<Map<String, String>> list = new ArrayList<>();
  206. Connection conn = null;
  207. try {
  208. conn = pool.getConnection();
  209. Table table = conn.getTable(TableName.valueOf(tableName));
  210. Scan scan = new Scan();
  211. // 添加过滤器
  212. scan.setFilter(filter);
  213. ResultScanner resultScanner = table.getScanner(scan);
  214. for(Result result : resultScanner) {
  215. HashMap<String, String> map = new HashMap<>(result.listCells().size());
  216. map.put("row", Bytes.toString(result.getRow()));
  217. for (Cell cell : result.listCells()) {
  218. String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
  219. String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
  220. String data = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
  221. map.put(family + ":" + qualifier, data);
  222. }
  223. list.add(map);
  224. }
  225. table.close();
  226. } catch (IOException e) {
  227. e.printStackTrace();
  228. } finally {
  229. if (null != conn){
  230. pool.releaseConnection(conn);
  231. }
  232. }
  233. return list;
  234. }
  235. /**
  236. * 获取数据(根据 rowKey)
  237. * @param tableName 表名
  238. * @param rowKey rowKey
  239. * @return map
  240. */
  241. public Map<String, String> getData(String tableName, String rowKey) {
  242. HashMap<String, String> map = new HashMap<>();
  243. Connection conn = null;
  244. try {
  245. conn = pool.getConnection();
  246. Table table = conn.getTable(TableName.valueOf(tableName));
  247. Get get = new Get(Bytes.toBytes(rowKey));
  248. Result result = table.get(get);
  249. if (result != null && !result.isEmpty()) {
  250. for (Cell cell : result.listCells()) {
  251. String family = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
  252. String qualifier = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength());
  253. String data = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
  254. map.put(family + ":" + qualifier, data);
  255. }
  256. }
  257. table.close();
  258. } catch (IOException e) {
  259. e.printStackTrace();
  260. } finally {
  261. if (null != conn){
  262. pool.releaseConnection(conn);
  263. }
  264. }
  265. return map;
  266. }
  267. /**
  268. * 获取数据(根据 rowKey 列族 列)
  269. * @param tableName 表名
  270. * @param rowKey rowKey
  271. * @param columnFamily 列族
  272. * @param columnQualifier 列
  273. * @return map
  274. */
  275. public String getData(String tableName, String rowKey, String columnFamily,
  276. String columnQualifier) {
  277. String data = "";
  278. Connection conn = null;
  279. try {
  280. conn = pool.getConnection();
  281. Table table = conn.getTable(TableName.valueOf(tableName));
  282. Get get = new Get(Bytes.toBytes(rowKey));
  283. get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier));
  284. Result result = table.get(get);
  285. if (result != null && !result.isEmpty()) {
  286. Cell cell = result.listCells().get(0);
  287. data = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
  288. }
  289. table.close();
  290. } catch (IOException e) {
  291. e.printStackTrace();
  292. } finally {
  293. if (null != conn){
  294. pool.releaseConnection(conn);
  295. }
  296. }
  297. return data;
  298. }
  299. /**
  300. * 删除数据
  301. */
  302. @GetMapping("/delete")
  303. public void delete(){
  304. log.info("---删除 rowKey --- row1 ");
  305. deleteData(tbl_user,"row1");
  306. log.info("---删除 rowKey 列族 --- row2 age ");
  307. deleteData(tbl_user,"row2","age");
  308. }
  309. /**
  310. * 删除数据(根据 rowKey)
  311. * @param tableName 表名
  312. * @param rowKey rowKey
  313. */
  314. public void deleteData(String tableName, String rowKey) {
  315. Connection conn = null;
  316. try {
  317. conn = pool.getConnection();
  318. Table table = conn.getTable(TableName.valueOf(tableName));
  319. Delete delete = new Delete(Bytes.toBytes(rowKey));
  320. table.delete(delete);
  321. table.close();
  322. } catch (IOException e) {
  323. e.printStackTrace();
  324. } finally {
  325. if (null != conn){
  326. pool.releaseConnection(conn);
  327. }
  328. }
  329. }
  330. /**
  331. * 删除数据(根据 row key,列族)
  332. * @param tableName 表名
  333. * @param rowKey rowKey
  334. * @param columnFamily 列族
  335. */
  336. public void deleteData(String tableName, String rowKey, String columnFamily) {
  337. Connection conn = null;
  338. try {
  339. conn = pool.getConnection();
  340. Table table = conn.getTable(TableName.valueOf(tableName));
  341. Delete delete = new Delete(Bytes.toBytes(rowKey));
  342. delete.addFamily(columnFamily.getBytes());
  343. table.delete(delete);
  344. table.close();
  345. } catch (IOException e) {
  346. e.printStackTrace();
  347. } finally {
  348. if (null != conn){
  349. pool.releaseConnection(conn);
  350. }
  351. }
  352. }
  353. }

6.启动类

  1. package org.example;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. /**
  5. * @author Administrator
  6. */
  7. @SpringBootApplication
  8. public class HbaseDemo {
  9. public static void main(String[] args) {
  10. SpringApplication.run(HbaseDemo.class,args);
  11. }
  12. }

7.测试

  1. 创建表:http://127.0.0.1:8080/hbase/create
  2. 插入:http://127.0.0.1:8080/hbase/insert
  3. 查询:http://127.0.0.1:8080/hbase/query
  4. 删除:http://127.0.0.1:8080/hbase/delete
  5. 删除表:http://127.0.0.1:8080/hbase/drop

查看 UI

在这里插入图片描述

测试输出日志

在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,123人围观)

还没有评论,来说两句吧...

相关阅读