基于Flink的实时数仓

忘是亡心i 2022-12-11 09:10 358阅读 0赞

转自博客园大佬:鼬手牵佐手

实时数仓主要解决传统数仓数据时效性低的问题,实时数仓通常会用在实时的OLAP分析,实时大屏展示,实时监控报警各个场景。虽然关于实时数仓架构及技术选型与传统的离线数仓会存在差异,但是关于数仓建设的基本方法论是一致的。接下来主要介绍Flink SQL从0到1搭建一个实时数仓的demo,涉及到数据采集、存储、计算、可视化整个流程。通过本文你可以了解到:

  • 实时数仓的基本架构
  • 实时数仓的数据处理流程
  • Flink1.11的SQL新特性
  • Flink1.11存在的bug
  • 完整的操作案例

案例简介

本文以电商业务为例,展示实时数仓的数据处理流程。另外,本文旨在说明实时数仓的构建流程,所以不会涉及复杂的数据计算。为了保证案例的可操作性和完整性,本文会给出详细的操作步骤。为了方便演示,本文的所有操作都是在Flink SQL Cli中完成。

架构设计

具体的架构设计如图所示:首先通过canal解析MySQL的binlog日志,将数据存储在Kafka中。然后使用Flink SQL对原始数据进行清洗关联,并将处理之后的明细宽表写入Kafka中。维表数据存储在MySQL中,通过Flink SQL对明细宽表与维表进行join,将聚合后的数据写入MySQL,最后通过FineBI进行可视化展示。

f08ddffc314c771dd6ef216c86933dd7.png

业务数据准备

  • 订单表

    复制代码

    1. CREATE TABLE `order_info` (
    2. `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
    3. `consignee` varchar(100) DEFAULT NULL COMMENT '收货人',
    4. `consignee_tel` varchar(20) DEFAULT NULL COMMENT '收件人电话',
    5. `total_amount` decimal(10,2) DEFAULT NULL COMMENT '总金额',
    6. `order_status` varchar(20) DEFAULT NULL COMMENT '订单状态',
    7. `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
    8. `payment_way` varchar(20) DEFAULT NULL COMMENT '付款方式',
    9. `delivery_address` varchar(1000) DEFAULT NULL COMMENT '送货地址',
    10. `order_comment` varchar(200) DEFAULT NULL COMMENT '订单备注',
    11. `out_trade_no` varchar(50) DEFAULT NULL COMMENT '订单交易编号(第三方支付用)',
    12. `trade_body` varchar(200) DEFAULT NULL COMMENT '订单描述(第三方支付用)',
    13. `create_time` datetime DEFAULT NULL COMMENT '创建时间',
    14. `operate_time` datetime DEFAULT NULL COMMENT '操作时间',
    15. `expire_time` datetime DEFAULT NULL COMMENT '失效时间',
    16. `tracking_no` varchar(100) DEFAULT NULL COMMENT '物流单编号',
    17. `parent_order_id` bigint(20) DEFAULT NULL COMMENT '父订单编号',
    18. `img_url` varchar(200) DEFAULT NULL COMMENT '图片路径',
    19. `province_id` int(20) DEFAULT NULL COMMENT '地区',
    20. PRIMARY KEY (`id`)
    21. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单表';

    复制代码

  • 订单详情表(order_detail)

    复制代码

    1. CREATE TABLE `order_detail` (
    2. `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
    3. `order_id` bigint(20) DEFAULT NULL COMMENT '订单编号',
    4. `sku_id` bigint(20) DEFAULT NULL COMMENT 'sku_id',
    5. `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称(冗余)',
    6. `img_url` varchar(200) DEFAULT NULL COMMENT '图片名称(冗余)',
    7. `order_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格(下单时sku价格)',
    8. `sku_num` varchar(200) DEFAULT NULL COMMENT '购买个数',
    9. `create_time` datetime DEFAULT NULL COMMENT '创建时间',
    10. PRIMARY KEY (`id`)
    11. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='订单详情表';

    复制代码

  • 商品表(sku_info)

    复制代码

    1. CREATE TABLE `sku_info` (
    2. `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'skuid(itemID)',
    3. `spu_id` bigint(20) DEFAULT NULL COMMENT 'spuid',
    4. `price` decimal(10,0) DEFAULT NULL COMMENT '价格',
    5. `sku_name` varchar(200) DEFAULT NULL COMMENT 'sku名称',
    6. `sku_desc` varchar(2000) DEFAULT NULL COMMENT '商品规格描述',
    7. `weight` decimal(10,2) DEFAULT NULL COMMENT '重量',
    8. `tm_id` bigint(20) DEFAULT NULL COMMENT '品牌(冗余)',
    9. `category3_id` bigint(20) DEFAULT NULL COMMENT '三级分类id(冗余)',
    10. `sku_default_img` varchar(200) DEFAULT NULL COMMENT '默认显示图片(冗余)',
    11. `create_time` datetime DEFAULT NULL COMMENT '创建时间',
    12. PRIMARY KEY (`id`)
    13. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='商品表';

    复制代码

  • 商品一级类目表(base_category1)

    1. CREATE TABLE `base_category1` (
    2. `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
    3. `name` varchar(10) NOT NULL COMMENT '分类名称',
    4. PRIMARY KEY (`id`)
    5. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='一级分类表';
  • 商品二级类目表(base_category2)

    复制代码

    1. CREATE TABLE `base_category2` (
    2. `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
    3. `name` varchar(200) NOT NULL COMMENT '二级分类名称',
    4. `category1_id` bigint(20) DEFAULT NULL COMMENT '一级分类编号',
    5. PRIMARY KEY (`id`)
    6. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='二级分类表';

    复制代码

  • 商品三级类目表(base_category3)

    复制代码

    1. CREATE TABLE `base_category3` (
    2. `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '编号',
    3. `name` varchar(200) NOT NULL COMMENT '三级分类名称',
    4. `category2_id` bigint(20) DEFAULT NULL COMMENT '二级分类编号',
    5. PRIMARY KEY (`id`)
    6. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='三级分类表';

    复制代码

  • 省份表(base_province)

    复制代码

    1. CREATE TABLE `base_province` (
    2. `id` int(20) DEFAULT NULL COMMENT 'id',
    3. `name` varchar(20) DEFAULT NULL COMMENT '省名称',
    4. `region_id` int(20) DEFAULT NULL COMMENT '大区id',
    5. `area_code` varchar(20) DEFAULT NULL COMMENT '行政区位码'
    6. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

    复制代码

  • 区域表(base_region)

    1. CREATE TABLE `base_region` (
    2. `id` int(20) NOT NULL COMMENT '大区id',
    3. `region_name` varchar(20) DEFAULT NULL COMMENT '大区名称',
    4. PRIMARY KEY (`id`)
    5. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

数据处理流程

ods层数据同步

关于ODS层的数据同步参见我的另一篇文章基于Canal与Flink实现数据实时增量同步(一)。主要使用canal解析MySQL的binlog日志,然后将其写入到Kafka对应的topic中。由于篇幅限制,不会对具体的细节进行说明。同步之后的结果如下图所示:

357f0c94e608c0cec64da02ba0b3ab3c.png

DIM层数据准备

本案例中将维表存储在了MySQL中,实际生产中会用HBase存储维表数据。我们主要用到两张维表:区域维表和商品维表。处理过程如下:

  • 区域维表
    首先将mydw.base_provincemydw.base_region这个主题对应的数据抽取到MySQL中,主要使用Flink SQL的Kafka数据源对应的canal-json格式,注意:在执行装载之前,需要先在MySQL中创建对应的表,本文使用的MySQL数据库的名字为dim,用于存放维表数据。如下:

    复制代码

    1. -- -------------------------
    2. -- 省份
    3. -- kafka Source
    4. -- -------------------------
    5. DROP TABLE IF EXISTS `ods_base_province`;
    6. CREATE TABLE `ods_base_province` (
    7. `id` INT,
    8. `name` STRING,
    9. `region_id` INT ,
    10. `area_code`STRING
    11. ) WITH(
    12. 'connector' = 'kafka',
    13. 'topic' = 'mydw.base_province',
    14. 'properties.bootstrap.servers' = 'kms-3:9092',
    15. 'properties.group.id' = 'testGroup',
    16. 'format' = 'canal-json' ,
    17. 'scan.startup.mode' = 'earliest-offset'
    18. ) ;
    19. -- -------------------------
    20. -- 省份
    21. -- MySQL Sink
    22. -- -------------------------
    23. DROP TABLE IF EXISTS `base_province`;
    24. CREATE TABLE `base_province` (
    25. `id` INT,
    26. `name` STRING,
    27. `region_id` INT ,
    28. `area_code`STRING,
    29. PRIMARY KEY (id) NOT ENFORCED
    30. ) WITH (
    31. 'connector' = 'jdbc',
    32. 'url' = 'jdbc:mysql://kms-1:3306/dim',
    33. 'table-name' = 'base_province', -- MySQL中的待插入数据的表
    34. 'driver' = 'com.mysql.jdbc.Driver',
    35. 'username' = 'root',
    36. 'password' = '123qwe',
    37. 'sink.buffer-flush.interval' = '1s'
    38. );
    39. -- -------------------------
    40. -- 省份
    41. -- MySQL Sink Load Data
    42. -- -------------------------
    43. INSERT INTO base_province
    44. SELECT *
    45. FROM ods_base_province;
    46. -- -------------------------
    47. -- 区域
    48. -- kafka Source
    49. -- -------------------------
    50. DROP TABLE IF EXISTS `ods_base_region`;
    51. CREATE TABLE `ods_base_region` (
    52. `id` INT,
    53. `region_name` STRING
    54. ) WITH(
    55. 'connector' = 'kafka',
    56. 'topic' = 'mydw.base_region',
    57. 'properties.bootstrap.servers' = 'kms-3:9092',
    58. 'properties.group.id' = 'testGroup',
    59. 'format' = 'canal-json' ,
    60. 'scan.startup.mode' = 'earliest-offset'
    61. ) ;
    62. -- -------------------------
    63. -- 区域
    64. -- MySQL Sink
    65. -- -------------------------
    66. DROP TABLE IF EXISTS `base_region`;
    67. CREATE TABLE `base_region` (
    68. `id` INT,
    69. `region_name` STRING,
    70. PRIMARY KEY (id) NOT ENFORCED
    71. ) WITH (
    72. 'connector' = 'jdbc',
    73. 'url' = 'jdbc:mysql://kms-1:3306/dim',
    74. 'table-name' = 'base_region', -- MySQL中的待插入数据的表
    75. 'driver' = 'com.mysql.jdbc.Driver',
    76. 'username' = 'root',
    77. 'password' = '123qwe',
    78. 'sink.buffer-flush.interval' = '1s'
    79. );
    80. -- -------------------------
    81. -- 区域
    82. -- MySQL Sink Load Data
    83. -- -------------------------
    84. INSERT INTO base_region
    85. SELECT *
    86. FROM ods_base_region;

    复制代码

    经过上面的步骤,将创建维表所需要的原始数据已经存储到了MySQL中,接下来就需要在MySQL中创建维表,我们使用上面的两张表,创建一张视图:dim_province作为维表:

    复制代码

    1. -- ---------------------------------
    2. -- DIM层,区域维表,
    3. -- MySQL中创建视图
    4. -- ---------------------------------
    5. DROP VIEW IF EXISTS dim_province;
    6. CREATE VIEW dim_province AS
    7. SELECT
    8. bp.id AS province_id,
    9. bp.name AS province_name,
    10. br.id AS region_id,
    11. br.region_name AS region_name,
    12. bp.area_code AS area_code
    13. FROM base_region br
    14. JOIN base_province bp ON br.id= bp.region_id
    15. ;

    复制代码

    这样我们所需要的维表:dim_province就创建好了,只需要在维表join时,使用Flink SQL创建JDBC的数据源,就可以使用该维表了。同理,我们使用相同的方法创建商品维表,具体如下:

    复制代码

    1. -- -------------------------
    2. -- 一级类目表
    3. -- kafka Source
    4. -- -------------------------
    5. DROP TABLE IF EXISTS `ods_base_category1`;
    6. CREATE TABLE `ods_base_category1` (
    7. `id` BIGINT,
    8. `name` STRING
    9. )WITH(
    10. 'connector' = 'kafka',
    11. 'topic' = 'mydw.base_category1',
    12. 'properties.bootstrap.servers' = 'kms-3:9092',
    13. 'properties.group.id' = 'testGroup',
    14. 'format' = 'canal-json' ,
    15. 'scan.startup.mode' = 'earliest-offset'
    16. ) ;
    17. -- -------------------------
    18. -- 一级类目表
    19. -- MySQL Sink
    20. -- -------------------------
    21. DROP TABLE IF EXISTS `base_category1`;
    22. CREATE TABLE `base_category1` (
    23. `id` BIGINT,
    24. `name` STRING,
    25. PRIMARY KEY (id) NOT ENFORCED
    26. ) WITH (
    27. 'connector' = 'jdbc',
    28. 'url' = 'jdbc:mysql://kms-1:3306/dim',
    29. 'table-name' = 'base_category1', -- MySQL中的待插入数据的表
    30. 'driver' = 'com.mysql.jdbc.Driver',
    31. 'username' = 'root',
    32. 'password' = '123qwe',
    33. 'sink.buffer-flush.interval' = '1s'
    34. );
    35. -- -------------------------
    36. -- 一级类目表
    37. -- MySQL Sink Load Data
    38. -- -------------------------
    39. INSERT INTO base_category1
    40. SELECT *
    41. FROM ods_base_category1;
    42. -- -------------------------
    43. -- 二级类目表
    44. -- kafka Source
    45. -- -------------------------
    46. DROP TABLE IF EXISTS `ods_base_category2`;
    47. CREATE TABLE `ods_base_category2` (
    48. `id` BIGINT,
    49. `name` STRING,
    50. `category1_id` BIGINT
    51. )WITH(
    52. 'connector' = 'kafka',
    53. 'topic' = 'mydw.base_category2',
    54. 'properties.bootstrap.servers' = 'kms-3:9092',
    55. 'properties.group.id' = 'testGroup',
    56. 'format' = 'canal-json' ,
    57. 'scan.startup.mode' = 'earliest-offset'
    58. ) ;
    59. -- -------------------------
    60. -- 二级类目表
    61. -- MySQL Sink
    62. -- -------------------------
    63. DROP TABLE IF EXISTS `base_category2`;
    64. CREATE TABLE `base_category2` (
    65. `id` BIGINT,
    66. `name` STRING,
    67. `category1_id` BIGINT,
    68. PRIMARY KEY (id) NOT ENFORCED
    69. ) WITH (
    70. 'connector' = 'jdbc',
    71. 'url' = 'jdbc:mysql://kms-1:3306/dim',
    72. 'table-name' = 'base_category2', -- MySQL中的待插入数据的表
    73. 'driver' = 'com.mysql.jdbc.Driver',
    74. 'username' = 'root',
    75. 'password' = '123qwe',
    76. 'sink.buffer-flush.interval' = '1s'
    77. );
    78. -- -------------------------
    79. -- 二级类目表
    80. -- MySQL Sink Load Data
    81. -- -------------------------
    82. INSERT INTO base_category2
    83. SELECT *
    84. FROM ods_base_category2;
    85. -- -------------------------
    86. -- 三级类目表
    87. -- kafka Source
    88. -- -------------------------
    89. DROP TABLE IF EXISTS `ods_base_category3`;
    90. CREATE TABLE `ods_base_category3` (
    91. `id` BIGINT,
    92. `name` STRING,
    93. `category2_id` BIGINT
    94. )WITH(
    95. 'connector' = 'kafka',
    96. 'topic' = 'mydw.base_category3',
    97. 'properties.bootstrap.servers' = 'kms-3:9092',
    98. 'properties.group.id' = 'testGroup',
    99. 'format' = 'canal-json' ,
    100. 'scan.startup.mode' = 'earliest-offset'
    101. ) ;
    102. -- -------------------------
    103. -- 三级类目表
    104. -- MySQL Sink
    105. -- -------------------------
    106. DROP TABLE IF EXISTS `base_category3`;
    107. CREATE TABLE `base_category3` (
    108. `id` BIGINT,
    109. `name` STRING,
    110. `category2_id` BIGINT,
    111. PRIMARY KEY (id) NOT ENFORCED
    112. ) WITH (
    113. 'connector' = 'jdbc',
    114. 'url' = 'jdbc:mysql://kms-1:3306/dim',
    115. 'table-name' = 'base_category3', -- MySQL中的待插入数据的表
    116. 'driver' = 'com.mysql.jdbc.Driver',
    117. 'username' = 'root',
    118. 'password' = '123qwe',
    119. 'sink.buffer-flush.interval' = '1s'
    120. );
    121. -- -------------------------
    122. -- 三级类目表
    123. -- MySQL Sink Load Data
    124. -- -------------------------
    125. INSERT INTO base_category3
    126. SELECT *
    127. FROM ods_base_category3;
    128. -- -------------------------
    129. -- 商品表
    130. -- Kafka Source
    131. -- -------------------------
    132. DROP TABLE IF EXISTS `ods_sku_info`;
    133. CREATE TABLE `ods_sku_info` (
    134. `id` BIGINT,
    135. `spu_id` BIGINT,
    136. `price` DECIMAL(10,0),
    137. `sku_name` STRING,
    138. `sku_desc` STRING,
    139. `weight` DECIMAL(10,2),
    140. `tm_id` BIGINT,
    141. `category3_id` BIGINT,
    142. `sku_default_img` STRING,
    143. `create_time` TIMESTAMP(0)
    144. ) WITH(
    145. 'connector' = 'kafka',
    146. 'topic' = 'mydw.sku_info',
    147. 'properties.bootstrap.servers' = 'kms-3:9092',
    148. 'properties.group.id' = 'testGroup',
    149. 'format' = 'canal-json' ,
    150. 'scan.startup.mode' = 'earliest-offset'
    151. ) ;
    152. -- -------------------------
    153. -- 商品表
    154. -- MySQL Sink
    155. -- -------------------------
    156. DROP TABLE IF EXISTS `sku_info`;
    157. CREATE TABLE `sku_info` (
    158. `id` BIGINT,
    159. `spu_id` BIGINT,
    160. `price` DECIMAL(10,0),
    161. `sku_name` STRING,
    162. `sku_desc` STRING,
    163. `weight` DECIMAL(10,2),
    164. `tm_id` BIGINT,
    165. `category3_id` BIGINT,
    166. `sku_default_img` STRING,
    167. `create_time` TIMESTAMP(0),
    168. PRIMARY KEY (tm_id) NOT ENFORCED
    169. ) WITH (
    170. 'connector' = 'jdbc',
    171. 'url' = 'jdbc:mysql://kms-1:3306/dim',
    172. 'table-name' = 'sku_info', -- MySQL中的待插入数据的表
    173. 'driver' = 'com.mysql.jdbc.Driver',
    174. 'username' = 'root',
    175. 'password' = '123qwe',
    176. 'sink.buffer-flush.interval' = '1s'
    177. );
    178. -- -------------------------
    179. -- 商品
    180. -- MySQL Sink Load Data
    181. -- -------------------------
    182. INSERT INTO sku_info
    183. SELECT *
    184. FROM ods_sku_info;

    复制代码

    经过上面的步骤,我们可以将创建商品维表的基础数据表同步到MySQL中,同样需要提前创建好对应的数据表。接下来我们使用上面的基础表在mySQL的dim库中创建一张视图:dim_sku_info,用作后续使用的维表。

    复制代码

    1. -- ---------------------------------
    2. -- DIM层,商品维表,
    3. -- MySQL中创建视图
    4. -- ---------------------------------
    5. CREATE VIEW dim_sku_info AS
    6. SELECT
    7. si.id AS id,
    8. si.sku_name AS sku_name,
    9. si.category3_id AS c3_id,
    10. si.weight AS weight,
    11. si.tm_id AS tm_id,
    12. si.price AS price,
    13. si.spu_id AS spu_id,
    14. c3.name AS c3_name,
    15. c2.id AS c2_id,
    16. c2.name AS c2_name,
    17. c3.id AS c1_id,
    18. c3.name AS c1_name
    19. FROM
    20. (
    21. sku_info si
    22. JOIN base_category3 c3 ON si.category3_id = c3.id
    23. JOIN base_category2 c2 ON c3.category2_id =c2.id
    24. JOIN base_category1 c1 ON c2.category1_id = c1.id
    25. );

    复制代码

    至此,我们所需要的维表数据已经准备好了,接下来开始处理DWD层的数据。

DWD层数据处理

经过上面的步骤,我们已经将所用的维表已经准备好了。接下来我们将对ODS的原始数据进行处理,加工成DWD层的明细宽表。具体过程如下:

  

复制代码

  1. -- -------------------------
  2. -- 订单详情
  3. -- Kafka Source
  4. -- -------------------------
  5. DROP TABLE IF EXISTS `ods_order_detail`;
  6. CREATE TABLE `ods_order_detail`(
  7. `id` BIGINT,
  8. `order_id` BIGINT,
  9. `sku_id` BIGINT,
  10. `sku_name` STRING,
  11. `img_url` STRING,
  12. `order_price` DECIMAL(10,2),
  13. `sku_num` INT,
  14. `create_time` TIMESTAMP(0)
  15. ) WITH(
  16. 'connector' = 'kafka',
  17. 'topic' = 'mydw.order_detail',
  18. 'properties.bootstrap.servers' = 'kms-3:9092',
  19. 'properties.group.id' = 'testGroup',
  20. 'format' = 'canal-json' ,
  21. 'scan.startup.mode' = 'earliest-offset'
  22. ) ;
  23. -- -------------------------
  24. -- 订单信息
  25. -- Kafka Source
  26. -- -------------------------
  27. DROP TABLE IF EXISTS `ods_order_info`;
  28. CREATE TABLE `ods_order_info` (
  29. `id` BIGINT,
  30. `consignee` STRING,
  31. `consignee_tel` STRING,
  32. `total_amount` DECIMAL(10,2),
  33. `order_status` STRING,
  34. `user_id` BIGINT,
  35. `payment_way` STRING,
  36. `delivery_address` STRING,
  37. `order_comment` STRING,
  38. `out_trade_no` STRING,
  39. `trade_body` STRING,
  40. `create_time` TIMESTAMP(0) ,
  41. `operate_time` TIMESTAMP(0) ,
  42. `expire_time` TIMESTAMP(0) ,
  43. `tracking_no` STRING,
  44. `parent_order_id` BIGINT,
  45. `img_url` STRING,
  46. `province_id` INT
  47. ) WITH(
  48. 'connector' = 'kafka',
  49. 'topic' = 'mydw.order_info',
  50. 'properties.bootstrap.servers' = 'kms-3:9092',
  51. 'properties.group.id' = 'testGroup',
  52. 'format' = 'canal-json' ,
  53. 'scan.startup.mode' = 'earliest-offset'
  54. ) ;
  55. -- ---------------------------------
  56. -- DWD层,支付订单明细表dwd_paid_order_detail
  57. -- ---------------------------------
  58. DROP TABLE IF EXISTS dwd_paid_order_detail;
  59. CREATE TABLE dwd_paid_order_detail
  60. (
  61. detail_id BIGINT,
  62. order_id BIGINT,
  63. user_id BIGINT,
  64. province_id INT,
  65. sku_id BIGINT,
  66. sku_name STRING,
  67. sku_num INT,
  68. order_price DECIMAL(10,0),
  69. create_time STRING,
  70. pay_time STRING
  71. ) WITH (
  72. 'connector' = 'kafka',
  73. 'topic' = 'dwd_paid_order_detail',
  74. 'scan.startup.mode' = 'earliest-offset',
  75. 'properties.bootstrap.servers' = 'kms-3:9092',
  76. 'format' = 'changelog-json'
  77. );
  78. -- ---------------------------------
  79. -- DWD层,已支付订单明细表
  80. -- dwd_paid_order_detail装载数据
  81. -- ---------------------------------
  82. INSERT INTO dwd_paid_order_detail
  83. SELECT
  84. od.id,
  85. oi.id order_id,
  86. oi.user_id,
  87. oi.province_id,
  88. od.sku_id,
  89. od.sku_name,
  90. od.sku_num,
  91. od.order_price,
  92. oi.create_time,
  93. oi.operate_time
  94. FROM
  95. (
  96. SELECT *
  97. FROM ods_order_info
  98. WHERE order_status = '2' -- 已支付
  99. ) oi JOIN
  100. (
  101. SELECT *
  102. FROM ods_order_detail
  103. ) od
  104. ON oi.id = od.order_id;

复制代码

ADS层数据

经过上面的步骤,我们创建了一张dwd_paid_order_detail明细宽表,并将该表存储在了Kafka中。接下来我们将使用这张明细宽表与维表进行JOIN,得到我们ADS应用层数据。

  • ads_province_index

首先在MySQL中创建对应的ADS目标表:ads_province_index

  

复制代码

  1. CREATE TABLE ads.ads_province_index(
  2. province_id INT(10),
  3. area_code VARCHAR(100),
  4. province_name VARCHAR(100),
  5. region_id INT(10),
  6. region_name VARCHAR(100),
  7. order_amount DECIMAL(10,2),
  8. order_count BIGINT(10),
  9. dt VARCHAR(100),
  10. PRIMARY KEY (province_id, dt)
  11. ) ;

复制代码

向MySQL的ADS层目标装载数据:

复制代码

  1. -- Flink SQL Cli操作
  2. -- ---------------------------------
  3. -- 使用 DDL创建MySQL中的ADS层表
  4. -- 指标:1.每天每个省份的订单数
  5. -- 2.每天每个省份的订单金额
  6. -- ---------------------------------
  7. CREATE TABLE ads_province_index(
  8. province_id INT,
  9. area_code STRING,
  10. province_name STRING,
  11. region_id INT,
  12. region_name STRING,
  13. order_amount DECIMAL(10,2),
  14. order_count BIGINT,
  15. dt STRING,
  16. PRIMARY KEY (province_id, dt) NOT ENFORCED
  17. ) WITH (
  18. 'connector' = 'jdbc',
  19. 'url' = 'jdbc:mysql://kms-1:3306/ads',
  20. 'table-name' = 'ads_province_index',
  21. 'driver' = 'com.mysql.jdbc.Driver',
  22. 'username' = 'root',
  23. 'password' = '123qwe'
  24. );
  25. -- ---------------------------------
  26. -- dwd_paid_order_detail已支付订单明细宽表
  27. -- ---------------------------------
  28. CREATE TABLE dwd_paid_order_detail
  29. (
  30. detail_id BIGINT,
  31. order_id BIGINT,
  32. user_id BIGINT,
  33. province_id INT,
  34. sku_id BIGINT,
  35. sku_name STRING,
  36. sku_num INT,
  37. order_price DECIMAL(10,2),
  38. create_time STRING,
  39. pay_time STRING
  40. ) WITH (
  41. 'connector' = 'kafka',
  42. 'topic' = 'dwd_paid_order_detail',
  43. 'scan.startup.mode' = 'earliest-offset',
  44. 'properties.bootstrap.servers' = 'kms-3:9092',
  45. 'format' = 'changelog-json'
  46. );
  47. -- ---------------------------------
  48. -- tmp_province_index
  49. -- 订单汇总临时表
  50. -- ---------------------------------
  51. CREATE TABLE tmp_province_index(
  52. province_id INT,
  53. order_count BIGINT,-- 订单数
  54. order_amount DECIMAL(10,2), -- 订单金额
  55. pay_date DATE
  56. )WITH (
  57. 'connector' = 'kafka',
  58. 'topic' = 'tmp_province_index',
  59. 'scan.startup.mode' = 'earliest-offset',
  60. 'properties.bootstrap.servers' = 'kms-3:9092',
  61. 'format' = 'changelog-json'
  62. );
  63. -- ---------------------------------
  64. -- tmp_province_index
  65. -- 订单汇总临时表数据装载
  66. -- ---------------------------------
  67. INSERT INTO tmp_province_index
  68. SELECT
  69. province_id,
  70. count(distinct order_id) order_count,-- 订单数
  71. sum(order_price * sku_num) order_amount, -- 订单金额
  72. TO_DATE(pay_time,'yyyy-MM-dd') pay_date
  73. FROM dwd_paid_order_detail
  74. GROUP BY province_id,TO_DATE(pay_time,'yyyy-MM-dd')
  75. ;
  76. -- ---------------------------------
  77. -- tmp_province_index_source
  78. -- 使用该临时汇总表,作为数据源
  79. -- ---------------------------------
  80. CREATE TABLE tmp_province_index_source(
  81. province_id INT,
  82. order_count BIGINT,-- 订单数
  83. order_amount DECIMAL(10,2), -- 订单金额
  84. pay_date DATE,
  85. proctime as PROCTIME() -- 通过计算列产生一个处理时间列
  86. ) WITH (
  87. 'connector' = 'kafka',
  88. 'topic' = 'tmp_province_index',
  89. 'scan.startup.mode' = 'earliest-offset',
  90. 'properties.bootstrap.servers' = 'kms-3:9092',
  91. 'format' = 'changelog-json'
  92. );
  93. -- ---------------------------------
  94. -- DIM层,区域维表,
  95. -- 创建区域维表数据源
  96. -- ---------------------------------
  97. DROP TABLE IF EXISTS `dim_province`;
  98. CREATE TABLE dim_province (
  99. province_id INT,
  100. province_name STRING,
  101. area_code STRING,
  102. region_id INT,
  103. region_name STRING ,
  104. PRIMARY KEY (province_id) NOT ENFORCED
  105. ) WITH (
  106. 'connector' = 'jdbc',
  107. 'url' = 'jdbc:mysql://kms-1:3306/dim',
  108. 'table-name' = 'dim_province',
  109. 'driver' = 'com.mysql.jdbc.Driver',
  110. 'username' = 'root',
  111. 'password' = '123qwe',
  112. 'scan.fetch-size' = '100'
  113. );
  114. -- ---------------------------------
  115. -- ads_province_index装载数据
  116. -- 维表JOIN
  117. -- ---------------------------------
  118. INSERT INTO ads_province_index
  119. SELECT
  120. pc.province_id,
  121. dp.area_code,
  122. dp.province_name,
  123. dp.region_id,
  124. dp.region_name,
  125. pc.order_amount,
  126. pc.order_count,
  127. cast(pc.pay_date as VARCHAR)
  128. FROM
  129. tmp_province_index_source pc
  130. JOIN dim_province FOR SYSTEM_TIME AS OF pc.proctime as dp
  131. ON dp.province_id = pc.province_id;

复制代码

当提交任务之后:观察Flink WEB UI:

0832a3fc10fd0c1e594bd0849af17fc4.png

查看ADS层的ads_province_index表数据:

9339e133aac3bc225887c227d3bcd166.png

  • ads_sku_index

首先在MySQL中创建对应的ADS目标表:ads_sku_index

复制代码

  1. CREATE TABLE ads_sku_index
  2. (
  3. sku_id BIGINT(10),
  4. sku_name VARCHAR(100),
  5. weight DOUBLE,
  6. tm_id BIGINT(10),
  7. price DOUBLE,
  8. spu_id BIGINT(10),
  9. c3_id BIGINT(10),
  10. c3_name VARCHAR(100) ,
  11. c2_id BIGINT(10),
  12. c2_name VARCHAR(100),
  13. c1_id BIGINT(10),
  14. c1_name VARCHAR(100),
  15. order_amount DOUBLE,
  16. order_count BIGINT(10),
  17. sku_count BIGINT(10),
  18. dt varchar(100),
  19. PRIMARY KEY (sku_id,dt)
  20. );

复制代码

向MySQL的ADS层目标装载数据:

复制代码

  1. -- ---------------------------------
  2. -- 使用 DDL创建MySQL中的ADS层表
  3. -- 指标:1.每天每个商品对应的订单个数
  4. -- 2.每天每个商品对应的订单金额
  5. -- 3.每天每个商品对应的数量
  6. -- ---------------------------------
  7. CREATE TABLE ads_sku_index
  8. (
  9. sku_id BIGINT,
  10. sku_name VARCHAR,
  11. weight DOUBLE,
  12. tm_id BIGINT,
  13. price DOUBLE,
  14. spu_id BIGINT,
  15. c3_id BIGINT,
  16. c3_name VARCHAR ,
  17. c2_id BIGINT,
  18. c2_name VARCHAR,
  19. c1_id BIGINT,
  20. c1_name VARCHAR,
  21. order_amount DOUBLE,
  22. order_count BIGINT,
  23. sku_count BIGINT,
  24. dt varchar,
  25. PRIMARY KEY (sku_id,dt) NOT ENFORCED
  26. ) WITH (
  27. 'connector' = 'jdbc',
  28. 'url' = 'jdbc:mysql://kms-1:3306/ads',
  29. 'table-name' = 'ads_sku_index',
  30. 'driver' = 'com.mysql.jdbc.Driver',
  31. 'username' = 'root',
  32. 'password' = '123qwe'
  33. );
  34. -- ---------------------------------
  35. -- dwd_paid_order_detail已支付订单明细宽表
  36. -- ---------------------------------
  37. CREATE TABLE dwd_paid_order_detail
  38. (
  39. detail_id BIGINT,
  40. order_id BIGINT,
  41. user_id BIGINT,
  42. province_id INT,
  43. sku_id BIGINT,
  44. sku_name STRING,
  45. sku_num INT,
  46. order_price DECIMAL(10,2),
  47. create_time STRING,
  48. pay_time STRING
  49. ) WITH (
  50. 'connector' = 'kafka',
  51. 'topic' = 'dwd_paid_order_detail',
  52. 'scan.startup.mode' = 'earliest-offset',
  53. 'properties.bootstrap.servers' = 'kms-3:9092',
  54. 'format' = 'changelog-json'
  55. );
  56. -- ---------------------------------
  57. -- tmp_sku_index
  58. -- 商品指标统计
  59. -- ---------------------------------
  60. CREATE TABLE tmp_sku_index(
  61. sku_id BIGINT,
  62. order_count BIGINT,-- 订单数
  63. order_amount DECIMAL(10,2), -- 订单金额
  64. order_sku_num BIGINT,
  65. pay_date DATE
  66. )WITH (
  67. 'connector' = 'kafka',
  68. 'topic' = 'tmp_sku_index',
  69. 'scan.startup.mode' = 'earliest-offset',
  70. 'properties.bootstrap.servers' = 'kms-3:9092',
  71. 'format' = 'changelog-json'
  72. );
  73. -- ---------------------------------
  74. -- tmp_sku_index
  75. -- 数据装载
  76. -- ---------------------------------
  77. INSERT INTO tmp_sku_index
  78. SELECT
  79. sku_id,
  80. count(distinct order_id) order_count,-- 订单数
  81. sum(order_price * sku_num) order_amount, -- 订单金额
  82. sum(sku_num) order_sku_num,
  83. TO_DATE(pay_time,'yyyy-MM-dd') pay_date
  84. FROM dwd_paid_order_detail
  85. GROUP BY sku_id,TO_DATE(pay_time,'yyyy-MM-dd')
  86. ;
  87. -- ---------------------------------
  88. -- tmp_sku_index_source
  89. -- 使用该临时汇总表,作为数据源
  90. -- ---------------------------------
  91. CREATE TABLE tmp_sku_index_source(
  92. sku_id BIGINT,
  93. order_count BIGINT,-- 订单数
  94. order_amount DECIMAL(10,2), -- 订单金额
  95. order_sku_num BIGINT,
  96. pay_date DATE,
  97. proctime as PROCTIME() -- 通过计算列产生一个处理时间列
  98. ) WITH (
  99. 'connector' = 'kafka',
  100. 'topic' = 'tmp_sku_index',
  101. 'scan.startup.mode' = 'earliest-offset',
  102. 'properties.bootstrap.servers' = 'kms-3:9092',
  103. 'format' = 'changelog-json'
  104. );
  105. -- ---------------------------------
  106. -- DIM层,商品维表,
  107. -- 创建商品维表数据源
  108. -- ---------------------------------
  109. DROP TABLE IF EXISTS `dim_sku_info`;
  110. CREATE TABLE dim_sku_info (
  111. id BIGINT,
  112. sku_name STRING,
  113. c3_id BIGINT,
  114. weight DECIMAL(10,2),
  115. tm_id BIGINT,
  116. price DECIMAL(10,2),
  117. spu_id BIGINT,
  118. c3_name STRING,
  119. c2_id BIGINT,
  120. c2_name STRING,
  121. c1_id BIGINT,
  122. c1_name STRING,
  123. PRIMARY KEY (id) NOT ENFORCED
  124. ) WITH (
  125. 'connector' = 'jdbc',
  126. 'url' = 'jdbc:mysql://kms-1:3306/dim',
  127. 'table-name' = 'dim_sku_info',
  128. 'driver' = 'com.mysql.jdbc.Driver',
  129. 'username' = 'root',
  130. 'password' = '123qwe',
  131. 'scan.fetch-size' = '100'
  132. );
  133. -- ---------------------------------
  134. -- ads_sku_index装载数据
  135. -- 维表JOIN
  136. -- ---------------------------------
  137. INSERT INTO ads_sku_index
  138. SELECT
  139. sku_id ,
  140. sku_name ,
  141. weight ,
  142. tm_id ,
  143. price ,
  144. spu_id ,
  145. c3_id ,
  146. c3_name,
  147. c2_id ,
  148. c2_name ,
  149. c1_id ,
  150. c1_name ,
  151. sc.order_amount,
  152. sc.order_count ,
  153. sc.order_sku_num ,
  154. cast(sc.pay_date as VARCHAR)
  155. FROM
  156. tmp_sku_index_source sc
  157. JOIN dim_sku_info FOR SYSTEM_TIME AS OF sc.proctime as ds
  158. ON ds.id = sc.sku_id
  159. ;

复制代码

当提交任务之后:观察Flink WEB UI:

5849e327e167e99e683181ec571d549a.png

查看ADS层的ads_sku_index表数据:

94211199ad2db603803b9fdda20435b9.png

FineBI展示

7572da4dabbebf1ba45feb51b53cb7b2.png

其他注意点

Flink1.11.0存在的bug

当在代码中使用Flink1.11.0版本时,如果将一个change-log的数据源insert到一个upsert sink时,会报如下异常:

  1. [ERROR] Could not execute SQL statement. Reason:
  2. org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue.
  3. Current node is TableSourceScan(table=[[default_catalog, default_database, t_pick_order]], fields=[order_no, status])

该bug目前已被修复,修复可以在Flink1.11.1中使用。

发表评论

表情:
评论列表 (有 0 条评论,358人围观)

还没有评论,来说两句吧...

相关阅读

    相关 基于Flink实时

    转自博客园大佬:[鼬手牵佐手][Link 1]   实时数仓主要解决传统数仓数据时效性低的问题,实时数仓通常会用在实时的OLAP分析,实时大屏展示,实时监控报警各个场景。虽