Spring Boot 集成canal、RocketMq同步异构数据
一、canal环境搭建
1、下载canal服务至指定目录,解压压缩文件
wget -P ./ https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
tar -zxf canal.deployer-1.1.5.tar.gz
2、修改配置文件
修改canal.properties
vim canal.properties
canal.id = 40
canal.ip = 192.168.81.200
修改instance.properties
vim example/instance.properties
canal.instance.master.address=192.168.81.200:3306
3、mysql 数据库配置修改
vim my.cnf
port = 3306
server-id =1
log-bin = mysql-bin
binlog_format=ROW
4、分别启动mysql、canal、rocketmq服务
二、canal 整合 Spring Boot
1、加入依赖
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
2、解析canal增量更新的数据
@Component
public class CanalHelper implements CommandLineRunner {
private static volatile CanalConnector connector;
private static final Logger LOGGER = LoggerFactory.getLogger(CanalHelper.class);
@Autowired
private RocketMQTemplate rocketMqTemplate;
@Override
public void run(String... args) throws Exception {
readChange();
}
/***
* 遍历侦听数据变化
* @throws Exception
*/
private void readChange() throws Exception {
LOGGER.info("CanalHelper---------------------开始监听数据----------------");
int batchSize = 1000;
while (true) {
Thread.sleep(5000);
Message message = getCanalConnector().getWithoutAck(batchSize);
///LOGGER.info("监控数据变化数据 {} ",message.toString());
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
} else {
///dataHandle(message.getEntries());
List<CanalEntry.Entry> entries = message.getEntries();
printEntry(entries);
}
getCanalConnector().ack(batchId);
///当队列里面堆积的sql大于一定数值的时候就模拟执行
// /executeQueueSql();
}
}
/**
* 解析变化数据
* @param entries
*/
private void printEntry(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
CanalEntry.EventType eventType = rowChange.getEventType();
LOGGER.info(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
LOGGER.info("变化之前数据-------> before");
printColumn(rowData.getBeforeColumnsList());
LOGGER.info("变化之后数据-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private void printColumn(List<CanalEntry.Column> columns) {
JSONObject jsonObject = new JSONObject();
for (CanalEntry.Column column : columns) {
///LOGGER.info(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
jsonObject.put(column.getName(),column.getValue());
}
LOGGER.info("canal监听数据发送至mq {} ",jsonObject.toString());
String msgBody =jsonObject.toJSONString();
rocketMqTemplate.sendOneWay(RocketMqConstant.TOPIC_CANAL, MessageBuilder.withPayload(msgBody).build());
}
private static CanalConnector getCanalConnector() {
if (connector == null) {
synchronized (CanalHelper.class) {
if (connector == null) {
connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.81.200", 11111), "example", "canal", "canal");
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
}
}
}
// 创建链接
return connector;
}
private void disConnect() {
if (connector != null) {
connector.disconnect();
}
}
}
3、mq消费增量数据
@Slf4j
@Component
@RocketMQMessageListener(
topic = RocketMqConstant.TOPIC_CANAL,
selectorExpression="*",
consumerGroup = RocketMqConstant.CONSUMER_CANAL_GROUP)
public class CanalMsgListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
byte[] body = message.getBody();
String msg = new String(body);
log.info("主题:{} 消费组:{} 接收到消息:{}",RocketMqConstant.TOPIC_CANAL,RocketMqConstant.CONSUMER_CANAL_GROUP, msg);
}
}
public interface RocketMqConstant {
String TOPIC_CANAL ="canal_topic";
String CONSUMER_CANAL_GROUP ="canal_consumer";
}
还没有评论,来说两句吧...