Spring Boot 集成canal、RocketMq同步异构数据 待我称王封你为后i 2022-09-12 11:44 346阅读 0赞 **一、canal环境搭建** 1、下载canal服务至指定目录,解压压缩文件 wget -P ./ https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz tar -zxf canal.deployer-1.1.5.tar.gz 2、修改配置文件 修改canal.properties vim canal.properties canal.id = 40 canal.ip = 192.168.81.200 修改instance.properties vim example/instance.properties canal.instance.master.address=192.168.81.200:3306 3、mysql 数据库配置修改 vim my.cnf port = 3306 server-id =1 log-bin = mysql-bin binlog_format=ROW 4、分别启动mysql、canal、rocketmq服务 ![watermark_type_ZHJvaWRzYW5zZmFsbGJhY2s_shadow_50_text_Q1NETiBAZmVuZ2NoZW5nd3UyMDEy_size_18_color_FFFFFF_t_70_g_se_x_16][] **二、canal 整合 Spring Boot** 1、加入依赖 <dependency> <groupId>top.javatool</groupId> <artifactId>canal-spring-boot-starter</artifactId> <version>1.2.1-RELEASE</version> </dependency> 2、解析canal增量更新的数据 @Component public class CanalHelper implements CommandLineRunner { private static volatile CanalConnector connector; private static final Logger LOGGER = LoggerFactory.getLogger(CanalHelper.class); @Autowired private RocketMQTemplate rocketMqTemplate; @Override public void run(String... args) throws Exception { readChange(); } /*** * 遍历侦听数据变化 * @throws Exception */ private void readChange() throws Exception { LOGGER.info("CanalHelper---------------------开始监听数据----------------"); int batchSize = 1000; while (true) { Thread.sleep(5000); Message message = getCanalConnector().getWithoutAck(batchSize); ///LOGGER.info("监控数据变化数据 {} ",message.toString()); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { Thread.sleep(1000); } else { ///dataHandle(message.getEntries()); List<CanalEntry.Entry> entries = message.getEntries(); printEntry(entries); } getCanalConnector().ack(batchId); ///当队列里面堆积的sql大于一定数值的时候就模拟执行 // /executeQueueSql(); } } /** * 解析变化数据 * @param entries */ private void printEntry(List<CanalEntry.Entry> entries) { for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { continue; } CanalEntry.RowChange rowChange; try { rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } CanalEntry.EventType eventType = rowChange.getEventType(); LOGGER.info(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { if (eventType == CanalEntry.EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == CanalEntry.EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { LOGGER.info("变化之前数据-------> before"); printColumn(rowData.getBeforeColumnsList()); LOGGER.info("变化之后数据-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private void printColumn(List<CanalEntry.Column> columns) { JSONObject jsonObject = new JSONObject(); for (CanalEntry.Column column : columns) { ///LOGGER.info(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); jsonObject.put(column.getName(),column.getValue()); } LOGGER.info("canal监听数据发送至mq {} ",jsonObject.toString()); String msgBody =jsonObject.toJSONString(); rocketMqTemplate.sendOneWay(RocketMqConstant.TOPIC_CANAL, MessageBuilder.withPayload(msgBody).build()); } private static CanalConnector getCanalConnector() { if (connector == null) { synchronized (CanalHelper.class) { if (connector == null) { connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.81.200", 11111), "example", "canal", "canal"); connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); } } } // 创建链接 return connector; } private void disConnect() { if (connector != null) { connector.disconnect(); } } } 3、mq消费增量数据 @Slf4j @Component @RocketMQMessageListener( topic = RocketMqConstant.TOPIC_CANAL, selectorExpression="*", consumerGroup = RocketMqConstant.CONSUMER_CANAL_GROUP) public class CanalMsgListener implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt message) { byte[] body = message.getBody(); String msg = new String(body); log.info("主题:{} 消费组:{} 接收到消息:{}",RocketMqConstant.TOPIC_CANAL,RocketMqConstant.CONSUMER_CANAL_GROUP, msg); } } public interface RocketMqConstant { String TOPIC_CANAL ="canal_topic"; String CONSUMER_CANAL_GROUP ="canal_consumer"; } [watermark_type_ZHJvaWRzYW5zZmFsbGJhY2s_shadow_50_text_Q1NETiBAZmVuZ2NoZW5nd3UyMDEy_size_18_color_FFFFFF_t_70_g_se_x_16]: /images/20220828/ac671a7ab84f4f6c9d7192164d86b1da.png
还没有评论,来说两句吧...