canal+kafka实践——实时etl

忘是亡心i 2022-05-14 07:21 576阅读 0赞

canal解析sql数据库的binlog并格式化数据,然后同步到kafka消息,可以用来实现实时etl

yml:

  1. spring:
  2. application:
  3. name: canal
  4. canal:
  5. topic-prefix: etl_timely.
  6. destination:
  7. example: 0
  8. username:
  9. password:
  10. dealy-limit: 2000
  11. kafka:
  12. bootstrap-servers: 127.0.0.1:9092
  13. producer:
  14. acks: 1
  15. batch-size: 100
  16. client-id: canal
  17. retries: 3
  18. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  19. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  20. bootstrap-servers: 127.0.0.1:9092
  21. consumer:
  22. enable-auto-commit: true
  23. group-id: etl
  24. bootstrap-servers: 127.0.0.1:9092
  25. template:
  26. default-topic: etl_canal

pom:

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-test</artifactId>
  8. <scope>test</scope>
  9. </dependency>
  10. <!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.common -->
  11. <dependency>
  12. <groupId>com.alibaba.otter</groupId>
  13. <artifactId>canal.client</artifactId>
  14. <version>1.0.25</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.bouncycastle</groupId>
  18. <artifactId>bcprov-jdk15on</artifactId>
  19. <version>RELEASE</version>
  20. <scope>compile</scope>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.springframework.boot</groupId>
  24. <artifactId>spring-boot-configuration-processor</artifactId>
  25. <optional>true</optional>
  26. </dependency>
  27. <!--kafka-->
  28. <dependency>
  29. <groupId>org.apache.kafka</groupId>
  30. <artifactId>kafka_2.11</artifactId>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.apache.kafka</groupId>
  34. <artifactId>kafka-clients</artifactId>
  35. </dependency>
  36. <dependency>
  37. <groupId>org.springframework.kafka</groupId>
  38. <artifactId>spring-kafka</artifactId>
  39. </dependency>
  40. <dependency>
  41. <groupId>org.bouncycastle</groupId>
  42. <artifactId>bcprov-jdk15on</artifactId>
  43. <version>RELEASE</version>
  44. <scope>compile</scope>
  45. </dependency>
  46. <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
  47. <dependency>
  48. <groupId>org.projectlombok</groupId>
  49. <artifactId>lombok</artifactId>
  50. <version>1.18.0</version>
  51. <scope>provided</scope>
  52. </dependency>

CanalClient:

  1. package com.kexin.canal.client;
  2. import com.alibaba.otter.canal.client.CanalConnector;
  3. import com.alibaba.otter.canal.client.CanalConnectors;
  4. import com.alibaba.otter.canal.common.utils.AddressUtils;
  5. import com.alibaba.otter.canal.protocol.Message;
  6. import com.kexin.canal.config.DisruptorConfig;
  7. import com.kexin.canal.service.KafkaService;
  8. import com.lmax.disruptor.BlockingWaitStrategy;
  9. import com.lmax.disruptor.EventFactory;
  10. import com.lmax.disruptor.RingBuffer;
  11. import com.lmax.disruptor.dsl.Disruptor;
  12. import com.lmax.disruptor.dsl.ProducerType;
  13. import lombok.Getter;
  14. import lombok.Setter;
  15. import org.springframework.beans.factory.annotation.Autowired;
  16. import org.springframework.boot.CommandLineRunner;
  17. import org.springframework.boot.context.properties.ConfigurationProperties;
  18. import org.springframework.stereotype.Component;
  19. import java.net.InetSocketAddress;
  20. import java.util.Map;
  21. import java.util.concurrent.Executors;
  22. import java.util.concurrent.ThreadFactory;
  23. /**
  24. * @Author KeXin
  25. * @Date 2018/7/20 下午5:16
  26. **/
  27. @Component
  28. @ConfigurationProperties(prefix = "spring.canal")
  29. public class CanalClient implements CommandLineRunner{
  30. @Getter
  31. @Setter
  32. private String username;
  33. @Getter
  34. @Setter
  35. private String password;
  36. @Getter
  37. @Setter
  38. private Map<String, String> destination;
  39. @Autowired
  40. KafkaService kafkaService;
  41. @Override
  42. public void run(String... args) {
  43. int port = 11111;
  44. startClient(port,destination,username,password);
  45. }
  46. /**
  47. * 监测数据库变化,使用disruptor处理消息
  48. */
  49. public void startClient(int port,Map<String, String> canalDestination,String canalUsername,String canalPassword){
  50. canalDestination.forEach((database,partition)->{
  51. //启动不加锁队列
  52. RingBuffer<DisruptorConfig.Element> buffer = getDisruptor().start();
  53. new Thread(()->{
  54. // 创建链接
  55. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
  56. port), database, canalUsername, canalPassword);
  57. int batchSize = 100;
  58. try {
  59. connector.connect();
  60. connector.subscribe();
  61. connector.rollback();
  62. while (true) {
  63. Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
  64. long batchId = message.getId();
  65. int size = message.getEntries().size();
  66. if (batchId == -1 || size == 0) {
  67. try {
  68. Thread.sleep(1000);
  69. } catch (InterruptedException e) {
  70. e.printStackTrace();
  71. }
  72. } else {
  73. //发送至kafka
  74. kafkaService.sendMessage(message.getEntries());
  75. }
  76. connector.ack(batchId); // 提交确认
  77. // connector.rollback(batchId); // 处理失败, 回滚数据
  78. }
  79. } finally {
  80. connector.disconnect();
  81. }
  82. }).start();
  83. });
  84. }

解析binlog为自己需要的格式:

  1. package com.kexin.canal.service;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.otter.canal.protocol.CanalEntry;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.kafka.core.KafkaTemplate;
  7. import org.springframework.stereotype.Component;
  8. import java.util.HashMap;
  9. import java.util.List;
  10. import java.util.Map;
  11. /**
  12. * @Author KeXin
  13. * @Date 2018/7/20 下午4:56
  14. **/
  15. @Component
  16. public class KafkaService {
  17. @Autowired
  18. KafkaTemplate kafkaTemplate;
  19. @Value("${spring.canal.topic-prefix}")
  20. private String canalTopicPrefix;
  21. /**
  22. * 发送消息
  23. * @param entrys
  24. */
  25. public void sendMessage(List<CanalEntry.Entry> entrys) {
  26. for (CanalEntry.Entry entry : entrys) {
  27. if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
  28. continue;
  29. }
  30. CanalEntry.RowChange rowChange = null;
  31. try {
  32. rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
  33. } catch (Exception e) {
  34. throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
  35. e);
  36. }
  37. CanalEntry.EventType eventType = rowChange.getEventType();
  38. String tableName = entry.getHeader().getTableName();
  39. String schemaName = entry.getHeader().getSchemaName();
  40. long executeTime = entry.getHeader().getExecuteTime();
  41. //根据binlog的filename和position来定位
  42. System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
  43. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  44. entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
  45. eventType));
  46. for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
  47. Map<String, Object> map = new HashMap<>();
  48. map.put("event_timestamp", executeTime);
  49. map.put("table_name", tableName);
  50. map.put("database_name", schemaName);
  51. Map<String, Object> map_info = new HashMap<>();
  52. if (eventType == CanalEntry.EventType.DELETE) {
  53. map.put("event_op_type", "delete");
  54. for(CanalEntry.Column column : rowData.getBeforeColumnsList()){
  55. if(column.getValue()!=null&&!column.getValue().equals(""))
  56. map_info.put(column.getName(), column.getValue());
  57. }
  58. } else if(eventType == CanalEntry.EventType.INSERT){
  59. map.put("event_op_type", "insert");
  60. for(CanalEntry.Column column : rowData.getAfterColumnsList()){
  61. map_info.put(column.getName(), column.getValue());
  62. }
  63. }else {
  64. map.put("event_op_type", "update");
  65. for(CanalEntry.Column column : rowData.getAfterColumnsList()){
  66. map_info.put(column.getName(), column.getValue());
  67. }
  68. Map<String, Object> beforeMap = new HashMap<>();
  69. for(CanalEntry.Column column : rowData.getBeforeColumnsList()){
  70. if(column.getValue()!=null&&!column.getValue().equals(""))
  71. beforeMap.put(column.getName(), column.getValue());
  72. }
  73. map.put("beforeColumns", beforeMap);
  74. }
  75. map.put("map_info",map_info);
  76. System.out.println(map);
  77. kafkaTemplate.send( canalTopicPrefix + tableName, JSON.toJSONString(map));
  78. }
  79. }
  80. }
  81. }

发表评论

表情:
评论列表 (有 0 条评论,576人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Flink:实时ETL案例

    抽取(extract)、转换(transform)、加载(load) 需求 将国家编号变成地区编号(在跨国业务常用) 形式 数据源 \{"dt":"2020-

    相关 ETL

    做数据仓库系统,ETL是关键的一环。说大了,ETL是数据整合解决方案,说小了,就是倒数据的工具。回忆一下工作这么些年来,处理数据迁移、转换的工作倒还真的不少。但是那些工作基本上