基于canal的实时数据同步

傷城~ 2022-04-12 05:01 376阅读 0赞

适用场景

使用canal做数据备份而不用mysql自带的主从备份的场景主要为:

  1. 跨数据库的数据备份,例如mysql => oracle
  2. 数据异构,即对同一份数据做不同的分库分表查询。例如卖家和买家各自分库索引

maven

  1. <dependency>
  2. <groupId>com.alibaba.otter</groupId>
  3. <artifactId>canal.client</artifactId>
  4. <version>1.1.2</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>com.alibaba.otter</groupId>
  8. <artifactId>canal.protocol</artifactId>
  9. <version>1.1.2</version>
  10. </dependency>

java

  1. import java.net.InetSocketAddress;
  2. import java.util.List;
  3. import com.alibaba.otter.canal.client.CanalConnector;
  4. import com.alibaba.otter.canal.client.CanalConnectors;
  5. import com.alibaba.otter.canal.protocol.CanalEntry;
  6. import com.alibaba.otter.canal.protocol.CanalEntry.Column;
  7. import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
  8. import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
  9. import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
  10. import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
  11. import com.alibaba.otter.canal.protocol.Message;
  12. import org.apache.commons.lang.StringUtils;
  13. public class SimpleCanalClient {
  14. public static void main(String[] args) throws Exception {
  15. String destination = "example";
  16. CanalConnector connector = CanalConnectors.newSingleConnector(
  17. new InetSocketAddress("127.0.0.1", 11111), destination, "", "");
  18. connector.connect();
  19. connector.subscribe(".*\\..*");
  20. connector.rollback();
  21. int batchSize = 5 * 1024;
  22. while (true) {
  23. Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
  24. long batchId = message.getId();
  25. int size = message.getEntries().size();
  26. if (batchId == -1 || size == 0) {
  27. // try {
  28. // Thread.sleep(1000);
  29. // } catch (InterruptedException e) {
  30. // }
  31. } else {
  32. synchronizedData(message.getEntries());
  33. }
  34. connector.ack(batchId); // 提交确认
  35. // connector.rollback(batchId); // 处理失败, 回滚数据
  36. }
  37. }
  38. /** * 同步数据 * @param entries * @throws Exception */
  39. private static void synchronizedData(List<Entry> entries) throws Exception {
  40. for (Entry entry : entries) {
  41. if (entry.getEntryType() != EntryType.ROWDATA) {
  42. continue;
  43. }
  44. RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
  45. String tableName = entry.getHeader().getTableName();
  46. for (RowData rowData : rowChange.getRowDatasList()) {
  47. String sql = getSql(rowChange.getEventType(),tableName,rowData);
  48. System.out.println(sql);
  49. // TODO 执行sql语句
  50. }
  51. }
  52. }
  53. /** * 获取增删改的sql * @param eventType * @param tableName * @param rowData * @return */
  54. private static String getSql(CanalEntry.EventType eventType,String tableName,RowData rowData){
  55. String sql = null;
  56. switch (eventType) {
  57. case INSERT:
  58. sql = getInsertSql(tableName,rowData.getAfterColumnsList());
  59. break;
  60. case UPDATE:
  61. sql = getUpdateSql(tableName,rowData.getAfterColumnsList());
  62. break;
  63. case DELETE:
  64. sql = getDeleteSql(tableName,rowData.getBeforeColumnsList());
  65. break;
  66. default:
  67. break;
  68. }
  69. return sql;
  70. }
  71. private static String getInsertSql(String tableName,List<Column> columns){
  72. if(columns.size() == 0 || StringUtils.isBlank(tableName)){
  73. return null;
  74. }
  75. String keys = "";
  76. String values = "";
  77. for(int i=0;i<columns.size();i++){
  78. if(i != 0) {
  79. keys += ",";
  80. values += ",";
  81. }
  82. keys += columns.get(i).getName();
  83. values += getValue(columns.get(i));
  84. }
  85. String format = "INSERT INTO %s (%s) VALUES (%s)";
  86. return String.format(format,tableName,keys,values);
  87. }
  88. private static String getUpdateSql(String tableName,List<Column> columns){
  89. if(columns.size() == 0 || StringUtils.isBlank(tableName)){
  90. return null;
  91. }
  92. String sets = "";
  93. String where = "";
  94. for(Column column : columns){
  95. if(column.getIsKey()){
  96. where = column.getName() + "=" + getValue(column);
  97. continue;
  98. }
  99. if(!StringUtils.isBlank(sets)) {
  100. sets += ",";
  101. }
  102. sets += column.getName() + "=" + getValue(column);
  103. }
  104. String format = "UPDATE %s SET %s WHERE %s";
  105. return String.format(format,tableName,sets,where);
  106. }
  107. private static String getDeleteSql(String tableName,List<Column> columns){
  108. if(columns.size() == 0 || StringUtils.isBlank(tableName)){
  109. return null;
  110. }
  111. String where = "";
  112. for(Column column : columns){
  113. if(column.getIsKey()){
  114. where = column.getName() + "=" + getValue(column);
  115. continue;
  116. }
  117. }
  118. String format = "DELETE FROM %s WHERE %s";
  119. return String.format(format,tableName,where);
  120. }
  121. private static String getValue(Column column){
  122. if(column.getIsNull()){
  123. return "null";
  124. }
  125. return String.format("'%s'",column.getValue());
  126. }
  127. }

数据一致性

单机单点消费mysql的log-bin后直接更新到备份数据库中,数据一致性没有问题。但是如果变成分布式环境以及消费mysql的log-bin后将更新数据推到MQ中由多节点消费更新到多个备份数据库中,则会出现数据更新时序和数据一致性的问题。

而以上代码在update sql中除了获取值变化了的字段,也反查数据库获取了未变化的字段。因此每次update的sql实际上是该条记录的全量数据。

通过在表中加上时间戳字段作为记录的版本号,用逻辑删除取代物理删除delete,修改以上代码的sql拼接,insert操作时忽略主键冲突、update操作时仅更新版本号(时间戳)旧的记录,可以极大避免数据不一致的现象,也解决了MQ重复消费的问题。

  1. `last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP

再通过定时任务,每天一次增量数据更新,每周一次全量数据更新,保证数据的最终一致性。

发表评论

表情:
评论列表 (有 0 条评论,376人围观)

还没有评论,来说两句吧...

相关阅读