FlinkSql-cdc 实时ETL kafka 数据

以你之姓@ 2022-09-01 13:58 450阅读 0赞

文章目录

  • 资源参考
  • 1, 配置 debezium-mysql kafka 连接器
    • a, mysql开启binlog, 创建mysql 表和cdc用户
    • b, 使用 ksql 创建kafka连接器:debezium
    • c, flink sql 读写数据( mysql— > hbase)
  • 2, 配置 debezium-oracle kafka 连接器
    • a, oracle启用archive-log,并配置用户权限
    • b, 使用 ksql 创建kafka连接器:debezium
    • c, flink sql 读写数据 (oracle — > hbase)
  • 3, 使用 web 提交flink sql 任务
  • 4, flink 读取 confluent avro topic数据

资源参考

  • 参考云邪: https://github.com/ververica/flink-cdc-connectors
  • flink 社区:https://www.jianshu.com/u/504bc225cb7a,
    Bilibili视频(flink社区):https://www.bilibili.com/video/BV1zt4y1D7kt/
  • flink join语法: https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/index.html

1, 配置 debezium-mysql kafka 连接器

a, mysql开启binlog, 创建mysql 表和cdc用户

配置mysql: https://debezium.io/documentation/reference/1.3/connectors/mysql.html#setup-the-mysql-server

  1. mysql> show master status;
  2. +------------------+----------+--------------+------------------+
  3. | File | Position | Binlog_Do_DB | Binlog_Ignore_DB |
  4. +------------------+----------+--------------+------------------+
  5. | mysql-bin.000001 | 10290780 | | |
  6. +------------------+----------+--------------+------------------+
  7. 1 row in set (0.00 sec)
  8. mysql> use test2;
  9. mysql> show create table employee;
  10. +----------+--------------------------------------------------------------------
  11. | Table | Create Table
  12. +----------+-------------------------------------------------------------------- +
  13. | employee | CREATE TABLE `employee` (
  14. `id` int(11) DEFAULT NULL,
  15. `dept_id` int(11) DEFAULT NULL,
  16. `name` varchar(20) COLLATE utf8_unicode_ci DEFAULT NULL
  17. ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci |
  18. +----------+--------------------------------------------------------------------
  19. 1 row in set (0.00 sec)

b, 使用 ksql 创建kafka连接器:debezium

  1. ksql> CREATE SOURCE CONNECTOR `mysql-dbz-src2` WITH(
  2. "connector.class"='io.debezium.connector.mysql.MySqlConnector',
  3. "tasks.max"='1',
  4. "database.hostname"='127.0.0.1',
  5. "database.port"='3306',
  6. "database.server.name"='dbz2',
  7. "database.user"='cdc',
  8. "database.password"='cdc',
  9. "database.whitelist"='test2',
  10. "database.history.kafka.bootstrap.servers"='localhost:9092',
  11. "database.history.kafka.topic"='schema-changes2.inventory2'
  12. );
  13. ksql> show connectors;
  14. Connector Name | Type | Class | Status
  15. --------------------------------------------------------------------------------------------------------
  16. mysql-dbz-src2 | SOURCE | io.debezium.connector.mysql.MySqlConnector | RUNNING (1/1 tasks RUNNING)
  17. --------------------------------------------------------------------------------------------------------
  18. ksql> show topics;
  19. Kafka Topic | Partitions | Partition Replicas
  20. ---------------------------------------------------------------
  21. dbz2 | 1 | 1
  22. dbz2.test2.dept | 1 | 1
  23. dbz2.test2.employee | 1 | 1
  24. dbz2.test2.enriched_orders | 1 | 1
  25. dbz2.test2.orders | 1 | 1
  26. dbz2.test2.products | 1 | 1
  27. dbz2.test2.salary | 1 | 1
  28. dbz2.test2.shipments | 1 | 1
  29. default_ksql_processing_log | 1 | 1
  30. ---------------------------------------------------------------

通过confluent platform查看kafka数据:
在这里插入图片描述
查看topic schema中:表字段和数据类型
在这里插入图片描述

  • dedebezium 连接器参数:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/debezium.html
  • hbase 连接器参数:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hbase.html

mysql join查询结果:(flink sql 执行类型sql来合并数据并写入hbase)
在这里插入图片描述

  1. [root@localhost bin]$ ./sql-client.sh "embedded"
  2. | ____| (_) | | / ____|/ __ \| | / ____| (_) | |
  3. | |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
  4. | __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
  5. | | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
  6. |_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
  7. Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
  8. Flink SQL> // dbz2.test2.employee: (字段类型和 registry.url中保持一致,才能解析数据)
  9. CREATE TABLE topic_test2_employee (
  10. id int,
  11. dept_id int,
  12. name string
  13. ) WITH (
  14. 'connector' = 'kafka',
  15. 'topic' = 'dbz2.test2.employee',
  16. 'properties.bootstrap.servers' = 'localhost:9092',
  17. 'scan.startup.mode' = 'earliest-offset',
  18. 'format' = 'debezium-avro-confluent',
  19. 'debezium-avro-confluent.schema-registry.url' = 'http://localhost:9081'
  20. );
  21. CREATE TABLE topic_test2_dept (
  22. id int,
  23. name string
  24. ) WITH (
  25. 'connector' = 'kafka',
  26. 'topic' = 'dbz2.test2.dept',
  27. 'properties.bootstrap.servers' = 'localhost:9092',
  28. 'scan.startup.mode' = 'earliest-offset',
  29. 'format' = 'debezium-avro-confluent',
  30. 'debezium-avro-confluent.schema-registry.url' = 'http://localhost:9081'
  31. );
  32. CREATE TABLE topic_test2_salary (
  33. id int,
  34. emp_id int,
  35. money double
  36. ) WITH (
  37. 'connector' = 'kafka',
  38. 'topic' = 'dbz2.test2.salary',
  39. 'properties.bootstrap.servers' = 'localhost:9092',
  40. 'scan.startup.mode' = 'earliest-offset',
  41. 'format' = 'debezium-avro-confluent',
  42. 'debezium-avro-confluent.schema-registry.url' = 'http://localhost:9081'
  43. );
  44. CREATE TABLE hbase_www (
  45. emp_id string,
  46. f ROW<emp_dept_id string, name string , deptname string, salary string>,
  47. PRIMARY KEY (emp_id) NOT ENFORCED
  48. ) WITH (
  49. 'connector' = 'hbase-1.4',
  50. 'table-name' = 'www',
  51. 'zookeeper.quorum' = '192.168.56.117:2181'
  52. );
  53. #关联查询
  54. insert into hbase_www
  55. select cast(emp.id as string) emp_id,
  56. ROW(cast( emp.dept_id as string) , emp.name , dept.name , cast( salary.money as string) )
  57. from topic_test2_employee emp
  58. inner join topic_test2_dept dept on dept.id=emp.dept_id
  59. inner join topic_test2_salary salary on salary.emp_id= emp.id
  60. [INFO] Submitting SQL update statement to the cluster...
  61. [INFO] Table update statement has been successfully submitted to the cluster:
  62. Job ID: 137c51550417ab7003c00c95d74f842b

登录flink web ui: 查看运行的任务
在这里插入图片描述查看hbase 数据是否实时变化(mysql insert/update/delete数据后,是否同步变化)

  1. hbase(main):016:0> scan 'xx_t1'
  2. ROW COLUMN+CELL
  3. 1 column=f:dept_id, timestamp=1627454335726, value=1
  4. 1 column=f:name, timestamp=1627454335726, value=Li si
  5. 2 column=f:dept_id, timestamp=1627454335726, value=2
  6. 2 column=f:name, timestamp=1627454335726, value=Test 1
  7. 2 row(s) in 0.0120 seconds

2, 配置 debezium-oracle kafka 连接器

https://debezium.io/documentation/reference/1.6/connectors/oracle.html#oracle-deploying-a-connector
oracle join 查询的数据:
在这里插入图片描述

a, oracle启用archive-log,并配置用户权限

  1. SQL> shutdown immediate
  2. startup mount
  3. alter database archivelog;
  4. alter database open;
  5. -- Should now "Database log mode: Archive Mode"
  6. -- archive log list
  7. ALTER TABLE user1.table1 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
  8. ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
  9. SQL> CREATE TABLESPACE logminer_tbs DATAFILE '/home/oracle/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
  10. CREATE USER dbz IDENTIFIED BY dbz DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs ;
  11. GRANT CREATE SESSION TO dbz ;
  12. GRANT SELECT ON V_$DATABASE to dbz ;
  13. GRANT FLASHBACK ANY TABLE TO dbz ;
  14. GRANT SELECT ANY TABLE TO dbz ;
  15. GRANT SELECT_CATALOG_ROLE TO dbz ;
  16. GRANT EXECUTE_CATALOG_ROLE TO dbz ;
  17. GRANT SELECT ANY TRANSACTION TO dbz ;
  18. --GRANT LOGMINING TO dbz ; 11g err: 替换为以下两行
  19. GRANT EXECUTE ON SYS.DBMS_LOGMNR TO dbz;
  20. GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbz;
  21. GRANT CREATE TABLE TO dbz ;
  22. GRANT LOCK ANY TABLE TO dbz ;
  23. GRANT ALTER ANY TABLE TO dbz ;
  24. GRANT CREATE SEQUENCE TO dbz ;
  25. GRANT EXECUTE ON DBMS_LOGMNR TO dbz ;
  26. GRANT EXECUTE ON DBMS_LOGMNR_D TO dbz ;
  27. GRANT SELECT ON V_$LOG TO dbz ;
  28. GRANT SELECT ON V_$LOG_HISTORY TO dbz ;
  29. GRANT SELECT ON V_$LOGMNR_LOGS TO dbz ;
  30. GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbz ;
  31. GRANT SELECT ON V_$LOGMNR_PARAMETERS TO dbz ;
  32. GRANT SELECT ON V_$LOGFILE TO dbz ;
  33. GRANT SELECT ON V_$ARCHIVED_LOG TO dbz ;
  34. GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbz ;

b, 使用 ksql 创建kafka连接器:debezium

  • The Debezium Oracle connector by default ingests changes using native Oracle LogMiner.

    ksql> CREATE SOURCE CONNECTOR oracle_cdc4 WITH(

    1. "connector.class"='io.debezium.connector.oracle.OracleConnector',
    2. "tasks.max" ='1',
    3. "database.server.name" ='helowin_cdc4',
    4. "database.hostname" ='localhost',
    5. "database.port" ='15211',
    6. "database.user" ='dbz',
    7. "database.password" ='dbz',
    8. "schema.include.list"='SCOTT',
    9. "database.dbname" ='helowin',
    10. "decimal.handling.mode"='string',
    11. "database.history.kafka.bootstrap.servers" ='localhost:9092',
    12. "database.history.kafka.topic"='schema-changes.oracle_cdc4',
    13. "database.history.skip.unparseable.ddl"=true

    );

    ksql> show connectors;

    Connector Name | Type | Class | Status

    oracle_cdc4 | SOURCE | io.debezium.connector.oracle.OracleConnector | RUNNING (1/1 tasks RUNNING)

    ksql> show topics;

    Kafka Topic | Partitions | Partition Replicas

    helowin_cdc4 | 1 | 1
    helowin_cdc4.SCOTT.DEPT | 1 | 1
    helowin_cdc4.SCOTT.EMP | 1 | 1
    helowin_cdc4.SCOTT.SALGRADE | 1 | 1
    helowin_cdc4.SCOTT.TEST | 1 | 1

    ksql> set ‘auto.offset.reset’=’earliest’;
    Successfully changed local property ‘auto.offset.reset’ from ‘earliest’ to ‘earliest’.
    ksql> print helowin_cdc4.SCOTT.TEST;
    Key format: SESSION(AVRO) or HOPPING(AVRO) or TUMBLING(AVRO) or AVRO or SESSION(PROTOBUF) or HOPPING(PROTOBUF) or TUMBLING(PROTOBUF) or PROTOBUF or SESSION(JSON) or HOPPING(JSON) or TUMBLING(JSON) or JSON or SESSION(JSON_SR) or HOPPING(JSON_SR) or TUMBLING(JSON_SR) or JSON_SR or SESSION(KAFKA_INT) or HOPPING(KAFKA_INT) or TUMBLING(KAFKA_INT) or KAFKA_INT or SESSION(KAFKA_BIGINT) or HOPPING(KAFKA_BIGINT) or TUMBLING(KAFKA_BIGINT) or KAFKA_BIGINT or SESSION(KAFKA_DOUBLE) or HOPPING(KAFKA_DOUBLE) or TUMBLING(KAFKA_DOUBLE) or KAFKA_DOUBLE or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
    Value format: AVRO
    rowtime: 7/29/21 11:33:09 AM CST, key: , value: { “before”: null, “after”: { “ID”: “2”, “NAME”: “b”, “MARK”: “b”}, “source”: { “version”: “1.6.1.Final”, “connector”: “oracle”, “name”: “helowin_cdc4”, “ts_ms”: 1627529588336, “snapshot”: “last”, “db”: “HELOWIN”, “sequence”: null, “schema”: “SCOTT”, “table”: “TEST”, “txId”: null, “scn”: “1103657”, “commit_scn”: null, “lcr_position”: null}, “op”: “r”, “ts_ms”: 1627529588336, “transaction”: null}
    rowtime: 7/29/21 12:11:18 PM CST, key: , value: { “before”: null, “after”: { “ID”: “3”, “NAME”: “c”, “MARK”: “c”}, “source”: { “version”: “1.6.1.Final”, “connector”: “oracle”, “name”: “helowin_cdc4”, “ts_ms”: 1627511996000, “snapshot”: “false”, “db”: “HELOWIN”, “sequence”: null, “schema”: “SCOTT”, “table”: “TEST”, “txId”: “02001f0051030000”, “scn”: “1145746”, “commit_scn”: “1145747”, “lcr_position”: null}, “op”: “c”, “ts_ms”: 1627531878417, “transaction”: null}
    rowtime: 7/29/21 12:12:07 PM CST, key: , value: { “before”: { “ID”: “3”, “NAME”: “c”, “MARK”: “c”}, “after”: { “ID”: “3”, “NAME”: “c2”, “MARK”: “c2”}, “source”: { “version”: “1.6.1.Final”, “connector”: “oracle”, “name”: “helowin_cdc4”, “ts_ms”: 1627512679000, “snapshot”: “false”, “db”: “HELOWIN”, “sequence”: null, “schema”: “SCOTT”, “table”: “TEST”, “txId”: “0200080068030000”, “scn”: “1145746”, “commit_scn”: “1174275”, “lcr_position”: null}, “op”: “u”, “ts_ms”: 1627531927799, “transaction”: null}

  1. Flink SQL> CREATE TABLE emp (
  2. EMPNO int, ENAME string, JOB string, MGR int, HIREDATE BIGINT , SAL string , COMM string, DEPTNO int
  3. ) WITH (
  4. 'connector' = 'kafka',
  5. 'properties.bootstrap.servers' = 'localhost:9092',
  6. 'topic' = 'helowin_cdc4.SCOTT.EMP',
  7. 'format' = 'debezium-avro-confluent',
  8. 'debezium-avro-confluent.schema-registry.url' = 'http://localhost:9081',
  9. 'scan.startup.mode' = 'earliest-offset'
  10. );
  11. CREATE TABLE dept (
  12. DEPTNO int, DNAME string, LOC string
  13. ) WITH (
  14. 'connector' = 'kafka',
  15. 'properties.bootstrap.servers' = 'localhost:9092',
  16. 'topic' = 'helowin_cdc4.SCOTT.DEPT',
  17. 'format' = 'debezium-avro-confluent',
  18. 'debezium-avro-confluent.schema-registry.url' = 'http://localhost:9081',
  19. 'scan.startup.mode' = 'earliest-offset'
  20. );
  21. CREATE TABLE hbase_www2 (
  22. EMPNO string ,
  23. f ROW<ENAME string, JOB string , MGR string,
  24. HIREDATE string, SAL string, COMM string,
  25. DEPTNO string, DNAME string, LOC string>,
  26. PRIMARY KEY (EMPNO ) NOT ENFORCED
  27. ) WITH (
  28. 'connector' = 'hbase-1.4',
  29. 'table-name' = 'www',
  30. 'zookeeper.quorum' = 'localhost:2181'
  31. );
  32. //合并多个表的flink 视图
  33. create view dept_emp_v as
  34. select cast(EMPNO as string) EMPNO, ENAME , JOB, cast(MGR as string) MGR, cast(HIREDATE as string) HIREDATE, SAL, COMM, cast(emp.DEPTNO as string) DEPTNO, dept.DNAME ,dept.LOC
  35. from emp
  36. INNER JOIN dept
  37. ON emp.DEPTNO =dept.DEPTNO
  38. //从视图中检索数据并插入hbase
  39. insert into hbase_www2
  40. select EMPNO , ROW( ENAME , JOB, MGR , HIREDATE, SAL, COMM, DEPTNO , DNAME ,LOC )
  41. from dept_emp_v
  42. [INFO] Submitting SQL update statement to the cluster...
  43. [INFO] Table update statement has been successfully submitted to the cluster:
  44. Job ID: e73a8f4c8860a3bc83879e1a50543350

3, 使用 web 提交flink sql 任务

https://gitee.com/zhuhuipei/flink-streaming-platform-web/blob/master/docs/deploy.md
登录web ui: http://$\{ip或者hostname\}:9084/ , admin 123456 (编辑sql任务)

  • 注意:创建任务时,三方jar地址 那一栏最好留空,把相关jar包手动下载到 flink_home/lib, 排除http://ccblog.cn/jars/flink-streaming-udf.jar 这个jar包避免和flink内置的函数有冲突
  • platform 工作原理:本质是调用了flink客户端,提交一个jar包任务
    flink run -d -p 2
    -c com.flink.streaming.core.JobApplication
    flink-streaming-platform-web/lib/flink-streaming-core_flink_1.12.0-1.2.0.RELEASE.jar
    -sql flink-streaming-platform-web/sql/job_sql_8.sql

    常用jar包

    https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro-confluent.html
    https://repo1.maven.org/maven2/com/alibaba/ververica/flink-sql-connector-mysql-cdc/1.1.0/flink-sql-connector-mysql-cdc-1.1.0.jar
    https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hbase-1.4_2.11/1.12.3/flink-sql-connector-hbase-1.4_2.11-1.12.3.jar
    http://ccblog.cn/jars/flink-connector-jdbc_2.11-1.12.0.jar
    http://ccblog.cn/jars/flink-sql-connector-kafka_2.11-1.12.0.jar

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

4, flink 读取 confluent avro topic数据

在这里插入图片描述

  1. ksql> CREATE STREAM t1 (id int , name VARCHAR) WITH ( KAFKA_TOPIC='t1', PARTITIONS='2',VALUE_FORMAT='avro');
  2. ksql> insert into t1(id,name) values (1,'a');
  3. ksql> select * from t1 emit changes;
  4. +------------------+------------------+------------------+------------------+
  5. |ROWTIME |ROWKEY |ID |NAME |
  6. +------------------+------------------+------------------+------------------+
  7. |1627631954797 |null |1 |a |
  8. Flink SQL> CREATE TABLE t1 (
  9. ID int,
  10. NAME string
  11. ) WITH (
  12. 'connector' = 'kafka',
  13. 'properties.bootstrap.servers' = 'localhost:9092',
  14. 'topic' = 't1',
  15. 'format' = 'avro-confluent',
  16. 'avro-confluent.schema-registry.url' = 'http://localhost:9081',
  17. 'scan.startup.mode' = 'earliest-offset'
  18. );

发表评论

表情:
评论列表 (有 0 条评论,450人围观)

还没有评论,来说两句吧...

相关阅读

    相关 ETL数据清洗

    ![45197bba580040bfb64f3e7562cc9ea4.png][] 大多[数据仓库][Link 1]的数据架构可以概括为: 数据源-->ODS(操作型数据存

    相关 Flink:实时ETL案例

    抽取(extract)、转换(transform)、加载(load) 需求 将国家编号变成地区编号(在跨国业务常用) 形式 数据源 \{"dt":"2020-