Spring Boot 集成canal、RocketMq同步异构数据

待我称王封你为后i 2022-09-12 11:44 478阅读 0赞

一、canal环境搭建

1、下载canal服务至指定目录,解压压缩文件

  1. wget -P ./ https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
  2. tar -zxf canal.deployer-1.1.5.tar.gz

2、修改配置文件

  1. 修改canal.properties
  2. vim canal.properties
  3. canal.id = 40
  4. canal.ip = 192.168.81.200

修改instance.properties

  1. vim example/instance.properties
  2. canal.instance.master.address=192.168.81.200:3306

3、mysql 数据库配置修改

  1. vim my.cnf
  2. port = 3306
  3. server-id =1
  4. log-bin = mysql-bin
  5. binlog_format=ROW

4、分别启动mysql、canal、rocketmq服务 watermark_type_ZHJvaWRzYW5zZmFsbGJhY2s_shadow_50_text_Q1NETiBAZmVuZ2NoZW5nd3UyMDEy_size_18_color_FFFFFF_t_70_g_se_x_16

二、canal 整合 Spring Boot

1、加入依赖

  1. <dependency>
  2. <groupId>top.javatool</groupId>
  3. <artifactId>canal-spring-boot-starter</artifactId>
  4. <version>1.2.1-RELEASE</version>
  5. </dependency>

2、解析canal增量更新的数据

  1. @Component
  2. public class CanalHelper implements CommandLineRunner {
  3. private static volatile CanalConnector connector;
  4. private static final Logger LOGGER = LoggerFactory.getLogger(CanalHelper.class);
  5. @Autowired
  6. private RocketMQTemplate rocketMqTemplate;
  7. @Override
  8. public void run(String... args) throws Exception {
  9. readChange();
  10. }
  11. /***
  12. * 遍历侦听数据变化
  13. * @throws Exception
  14. */
  15. private void readChange() throws Exception {
  16. LOGGER.info("CanalHelper---------------------开始监听数据----------------");
  17. int batchSize = 1000;
  18. while (true) {
  19. Thread.sleep(5000);
  20. Message message = getCanalConnector().getWithoutAck(batchSize);
  21. ///LOGGER.info("监控数据变化数据 {} ",message.toString());
  22. long batchId = message.getId();
  23. int size = message.getEntries().size();
  24. if (batchId == -1 || size == 0) {
  25. Thread.sleep(1000);
  26. } else {
  27. ///dataHandle(message.getEntries());
  28. List<CanalEntry.Entry> entries = message.getEntries();
  29. printEntry(entries);
  30. }
  31. getCanalConnector().ack(batchId);
  32. ///当队列里面堆积的sql大于一定数值的时候就模拟执行
  33. // /executeQueueSql();
  34. }
  35. }
  36. /**
  37. * 解析变化数据
  38. * @param entries
  39. */
  40. private void printEntry(List<CanalEntry.Entry> entries) {
  41. for (CanalEntry.Entry entry : entries) {
  42. if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
  43. continue;
  44. }
  45. CanalEntry.RowChange rowChange;
  46. try {
  47. rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
  48. } catch (Exception e) {
  49. throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
  50. }
  51. CanalEntry.EventType eventType = rowChange.getEventType();
  52. LOGGER.info(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
  53. entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
  54. entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
  55. eventType));
  56. for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
  57. if (eventType == CanalEntry.EventType.DELETE) {
  58. printColumn(rowData.getBeforeColumnsList());
  59. } else if (eventType == CanalEntry.EventType.INSERT) {
  60. printColumn(rowData.getAfterColumnsList());
  61. } else {
  62. LOGGER.info("变化之前数据-------> before");
  63. printColumn(rowData.getBeforeColumnsList());
  64. LOGGER.info("变化之后数据-------> after");
  65. printColumn(rowData.getAfterColumnsList());
  66. }
  67. }
  68. }
  69. }
  70. private void printColumn(List<CanalEntry.Column> columns) {
  71. JSONObject jsonObject = new JSONObject();
  72. for (CanalEntry.Column column : columns) {
  73. ///LOGGER.info(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
  74. jsonObject.put(column.getName(),column.getValue());
  75. }
  76. LOGGER.info("canal监听数据发送至mq {} ",jsonObject.toString());
  77. String msgBody =jsonObject.toJSONString();
  78. rocketMqTemplate.sendOneWay(RocketMqConstant.TOPIC_CANAL, MessageBuilder.withPayload(msgBody).build());
  79. }
  80. private static CanalConnector getCanalConnector() {
  81. if (connector == null) {
  82. synchronized (CanalHelper.class) {
  83. if (connector == null) {
  84. connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.81.200", 11111), "example", "canal", "canal");
  85. connector.connect();
  86. connector.subscribe(".*\\..*");
  87. connector.rollback();
  88. }
  89. }
  90. }
  91. // 创建链接
  92. return connector;
  93. }
  94. private void disConnect() {
  95. if (connector != null) {
  96. connector.disconnect();
  97. }
  98. }
  99. }

3、mq消费增量数据

  1. @Slf4j
  2. @Component
  3. @RocketMQMessageListener(
  4. topic = RocketMqConstant.TOPIC_CANAL,
  5. selectorExpression="*",
  6. consumerGroup = RocketMqConstant.CONSUMER_CANAL_GROUP)
  7. public class CanalMsgListener implements RocketMQListener<MessageExt> {
  8. @Override
  9. public void onMessage(MessageExt message) {
  10. byte[] body = message.getBody();
  11. String msg = new String(body);
  12. log.info("主题:{} 消费组:{} 接收到消息:{}",RocketMqConstant.TOPIC_CANAL,RocketMqConstant.CONSUMER_CANAL_GROUP, msg);
  13. }
  14. }
  15. public interface RocketMqConstant {
  16. String TOPIC_CANAL ="canal_topic";
  17. String CONSUMER_CANAL_GROUP ="canal_consumer";
  18. }

发表评论

表情:
评论列表 (有 0 条评论,478人围观)

还没有评论,来说两句吧...

相关阅读