canal--基于mysql数据库binlog实践

小咪咪 2023-01-23 11:57 234阅读 0赞

canal是阿里开源的中间件,主要用于同步mysql数据库变更,本文演示基本使用。
一、项目搭建
新建一个Maven项目,引入依赖:

  1. <dependencies>
  2. <dependency>
  3. <groupId>com.alibaba.otter</groupId>
  4. <artifactId>canal.client</artifactId>
  5. <version>1.1.0</version>
  6. </dependency>
  7. </dependencies>

二、数据库准备
新建数据库mytest,新建表student,确保binlog设置开启且格式是ROW:

  1. show VARIABLES like '%log_bin%';
  2. show VARIABLES like 'binlog_format';

三、Canal准备
①下载canal.deployer-1.1.5.tar.gz,解压
②修改配置/conf/example/下instance.properties:

  1. #################################################
  2. canal.instance.gtidon=false
  3. # position info
  4. canal.instance.master.address=127.0.0.1:3306
  5. canal.instance.master.journal.name=
  6. canal.instance.master.position=
  7. canal.instance.master.timestamp=
  8. canal.instance.master.gtid=
  9. # rds oss binlog
  10. canal.instance.rds.accesskey=
  11. canal.instance.rds.secretkey=
  12. canal.instance.rds.instanceId=
  13. # table meta tsdb info
  14. canal.instance.tsdb.enable=true
  15. # username/password
  16. canal.instance.dbUsername=root
  17. canal.instance.dbPassword=root
  18. canal.instance.connectionCharset = UTF-8
  19. # enable druid Decrypt database password
  20. canal.instance.enableDruid=false
  21. #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
  22. # table regex
  23. canal.instance.filter.regex=mytest.student
  24. # table black regex
  25. canal.instance.filter.black.regex=mysql\\.slave_.*
  26. # mq config
  27. canal.mq.topic=example
  28. canal.mq.partition=0
  29. #################################################

③启动/bin下startup.bat
四、编写测试类:

  1. import java.net.InetSocketAddress;
  2. import java.util.List;
  3. import com.alibaba.otter.canal.client.CanalConnectors;
  4. import com.alibaba.otter.canal.client.CanalConnector;
  5. import com.alibaba.otter.canal.common.utils.AddressUtils;
  6. import com.alibaba.otter.canal.protocol.Message;
  7. import com.alibaba.otter.canal.protocol.CanalEntry.Column;
  8. import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
  9. import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
  10. import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
  11. import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
  12. import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
  13. /**
  14. * @Author ludl
  15. * @Classname CanalDemo
  16. * @Date 2021/7/14 20:51
  17. */
  18. public class CanalDemo {
  19. public static void main(String args[]) {
  20. // 创建链接
  21. CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
  22. 11111), "example", "", "");
  23. int batchSize = 1000;
  24. int emptyCount = 0;
  25. try {
  26. connector.connect();
  27. connector.subscribe("mytest.student");
  28. connector.rollback();
  29. int totalEmptyCount = 120;
  30. while (emptyCount < totalEmptyCount) {
  31. // 获取指定数量的数据
  32. Message message = connector.getWithoutAck(batchSize);
  33. long batchId = message.getId();
  34. int size = message.getEntries().size();
  35. if (batchId == -1 || size == 0) {
  36. emptyCount++;
  37. try {
  38. Thread.sleep(1000);
  39. } catch (InterruptedException e) {
  40. }
  41. } else {
  42. emptyCount = 0;
  43. printEntry(message.getEntries());
  44. }
  45. // 提交确认
  46. connector.ack(batchId);
  47. // 处理失败, 回滚数据
  48. // connector.rollback(batchId);
  49. }
  50. System.out.println("empty too many times, exit");
  51. } finally {
  52. connector.disconnect();
  53. }
  54. }
  55. private static void printEntry(List<Entry> entrys) {
  56. for (Entry entry : entrys) {
  57. if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
  58. continue;
  59. }
  60. RowChange rowChage = null;
  61. try {
  62. rowChage = RowChange.parseFrom(entry.getStoreValue());
  63. } catch (Exception e) {
  64. throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
  65. e);
  66. }
  67. EventType eventType = rowChage.getEventType();
  68. System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
  69. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  70. entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
  71. eventType));
  72. for (RowData rowData : rowChage.getRowDatasList()) {
  73. if (eventType == EventType.DELETE) {
  74. printColumn(rowData.getBeforeColumnsList());
  75. rowData.getAfterColumnsList();
  76. } else if (eventType == EventType.INSERT) {
  77. printColumn(rowData.getAfterColumnsList());
  78. } else {
  79. System.out.println("-------> before");
  80. printColumn(rowData.getBeforeColumnsList());
  81. System.out.println("-------> after");
  82. printColumn(rowData.getAfterColumnsList());
  83. }
  84. }
  85. }
  86. }
  87. private static void printColumn(List<Column> columns) {
  88. for (Column column : columns) {
  89. System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
  90. }
  91. }
  92. }

五、测试验证
在数据库新增一条数据,控制台输出:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xkbGxvdmVneWg_size_16_color_FFFFFF_t_70

测试验证OK。

发表评论

表情:
评论列表 (有 0 条评论,234人围观)

还没有评论,来说两句吧...

相关阅读

    相关 canal监听mysql实践

    canal监听mysql实践  canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析