Flink SQL之Catalogs

逃离我推掉我的手 2022-11-17 13:36 499阅读 0赞

目录

    • (1)Catalogs主要定义
    • (2)Catalogs类型
    • (3)Catalogs在Flink SQL架构中的位置
    • (4)Catalogs 操作

(1)Catalogs主要定义

  • Catalog提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。
  • 元数据可以是临时的,例如临时表、或者通过TableEnvironment注册的UDF,也可以是持久化的,例如Hive Metastore中的元数据。
  • Catalog提供了一个统一的API,用于管理元数据,并使其可以从Table API和 SQL查询语句中来访问。

(2)Catalogs类型

GenericInMemoryCatalog
GenericInMemoryCatalog 是基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。

JdbcCatalog
JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。PostgresCatalog 是当前实现的唯一一种 JDBC Catalog。

HiveCatalog
HiveCatalog 有两个用途:作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口。

警告 Hive Metastore 以小写形式存储所有元数据对象名称。而 GenericInMemoryCatalog 区分大小写。

在这里插入图片描述

(4)Catalogs 操作

使用 SQL DDL

  1. TableEnvironment tableEnv = ...
  2. // Create a HiveCatalog
  3. Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>");
  4. // Register the catalog
  5. tableEnv.registerCatalog("myhive", catalog);
  6. // Create a catalog database
  7. tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");
  8. // Create a catalog table
  9. tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
  10. tableEnv.listTables(); // should return the tables in current catalog and database.

数据库操作

  1. // create database
  2. catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
  3. // drop database
  4. catalog.dropDatabase("mydb", false);
  5. // alter database
  6. catalog.alterDatabase("mydb", new CatalogDatabaseImpl(...), false);
  7. // get databse
  8. catalog.getDatabase("mydb");
  9. // check if a database exist
  10. catalog.databaseExists("mydb");
  11. // list databases in a catalog
  12. catalog.listDatabases("mycatalog");

表操作

  1. // create table
  2. catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
  3. // drop table
  4. catalog.dropTable(new ObjectPath("mydb", "mytable"), false);
  5. // alter table
  6. catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
  7. // rename table
  8. catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table");
  9. // get table
  10. catalog.getTable("mytable");
  11. // check if a table exist or not
  12. catalog.tableExists("mytable");
  13. // list tables in a database
  14. catalog.listTables("mydb");

相关Catalogs见官方文档

  • https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/catalogs.html

以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!

发表评论

表情:
评论列表 (有 0 条评论,499人围观)

还没有评论,来说两句吧...

相关阅读