ClickHouse系列--消费kafka数据

青旅半醒 2023-01-21 03:26 73阅读 0赞

1.使用方式

主要是使用ClickHouse的表引擎。

  1. CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
  2. (
  3. name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
  4. name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
  5. ...
  6. ) ENGINE = Kafka()
  7. SETTINGS
  8. kafka_broker_list = 'host:port',
  9. kafka_topic_list = 'topic1,topic2,...',
  10. kafka_group_name = 'group_name',
  11. kafka_format = 'data_format'[,]
  12. [kafka_row_delimiter = 'delimiter_symbol',]
  13. [kafka_schema = '',]
  14. [kafka_num_consumers = N,]
  15. [kafka_max_block_size = 0,]
  16. [kafka_skip_broken_messages = N,]
  17. [kafka_commit_every_batch = 0,]
  18. [kafka_thread_per_consumer = 0]

kafka_broker_list :逗号分隔的brokers地址 (localhost:9092).
kafka_topic_list :Kafka 主题列表,多个主题用逗号分隔.
kafka_group_name :消费者组.
kafka_format – Message format. 比如JSONEachRow、JSON、CSV等等

2.示例

2.1在kafka中创建user_behavior主题,并向该主题写入数据,数据示例为:

  1. { "user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}
  2. { "user_id":9164,"item_id":2817,"cat_id":611,"action":"fav","province":28,"ts":1573420486}
  3. { "user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}

在ClickHouse中创建表,选择表引擎为Kafka(),如下:

  1. CREATE TABLE kafka_user_behavior (
  2. user_id UInt64 COMMENT '用户id',
  3. item_id UInt64 COMMENT '商品id',
  4. cat_id UInt16 COMMENT '品类id',
  5. action String COMMENT '行为',
  6. province UInt8 COMMENT '省份id',
  7. ts UInt64 COMMENT '时间戳'
  8. ) ENGINE = Kafka()
  9. SETTINGS
  10. kafka_broker_list = 'cdh04:9092',
  11. kafka_topic_list = 'user_behavior',
  12. kafka_group_name = 'group1',
  13. kafka_format = 'JSONEachRow'
  14. ;
  15. -- 查询
  16. cdh04 :) select * from kafka_user_behavior ;
  17. -- 再次查看数据,发现数据为空
  18. cdh04 :) select count(*) from kafka_user_behavior;
  19. SELECT count(*)
  20. FROM kafka_user_behavior
  21. ┌─count()─┐
  22. 0
  23. └─────────┘

2.2通过物化视图将kafka数据导入ClickHouse

当我们一旦查询完毕之后,ClickHouse会删除表内的数据,其实Kafka表引擎只是一个数据管道,我们可以通过物化视图的方式访问Kafka中的数据。

  • 首先创建一张Kafka表引擎的表,用于从Kafka中读取数据
  • 然后再创建一张普通表引擎的表,比如MergeTree,面向终端用户使用
  • 最后创建物化视图,用于将Kafka引擎表实时同步到终端用户所使用的表中

    — 创建Kafka引擎表
    CREATE TABLE kafka_user_behavior_src (

    1. user_id UInt64 COMMENT '用户id',
    2. item_id UInt64 COMMENT '商品id',
    3. cat_id UInt16 COMMENT '品类id',
    4. action String COMMENT '行为',
    5. province UInt8 COMMENT '省份id',
    6. ts UInt64 COMMENT '时间戳'

    ) ENGINE = Kafka()

    1. SETTINGS
    2. kafka_broker_list = 'cdh04:9092',
    3. kafka_topic_list = 'user_behavior',
    4. kafka_group_name = 'group1',
    5. kafka_format = 'JSONEachRow'

    ;

    — 创建一张终端用户使用的表
    CREATE TABLE kafka_user_behavior (

    1. user_id UInt64 COMMENT '用户id',
    2. item_id UInt64 COMMENT '商品id',
    3. cat_id UInt16 COMMENT '品类id',
    4. action String COMMENT '行为',
    5. province UInt8 COMMENT '省份id',
    6. ts UInt64 COMMENT '时间戳'

    ) ENGINE = MergeTree()

    1. ORDER BY user_id

    ;
    — 创建物化视图,同步数据
    CREATE MATERIALIZED VIEW user_behavior_consumer TO kafka_user_behavior

    1. AS SELECT * FROM kafka_user_behavior_src ;

    — 查询,多次查询,已经被查询的数据依然会被输出
    cdh04 :) select * from kafka_user_behavior;

Note:
Kafka消费表不能直接作为结果表使用。Kafka消费表只是用来消费Kafka数据,没有真正的存储所有数据。

这里还有一个疑问:
在众多资料中,kafka示例消息都是最简单的json格式,如果消息格式是复杂类型呢?是否支持?

发表评论

表情:
评论列表 (有 0 条评论,73人围观)

还没有评论,来说两句吧...

相关阅读

    相关 查看kafka消费数据

    storm jar接收程序,如果指定了forceFromStart=false,则从最新的数据开始读,最新是指多长时间的,有具体的参数设置 如果指定了为true,则从最老的

    相关 Kafka重复消费数据问题

      kafka重复消费的问题,主要的原因还是在指定的时间内,没有进行kafka的位移提交,导致根据上一次的位移重新poll出新的数据,而这个数据就是上一次没有消费处理完全的(

    相关 Flink 二 消费kafka数据

    前言: Flink的DataSoures模块中,定义了DataStream API的数据输入操作,Flink将数据源主要分为内置和第三方数据源,内置数据源包含文件、Soc...