canal mq数据同步
canal mq数据同步
官网:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
*******************
简 介
tcp mode:客户端直连canal server
![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzkzMTYyNQ_size_16_color_FFFFFF_t_70][]
mq mode:canal server将binlog数据发送到mq,1.1.5支持kafka、rocketmq、rabbitmq
![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzkzMTYyNQ_size_16_color_FFFFFF_t_70 1][]
*******************
canal server mq配置
canal.properties
##################################################
######### MQ Properties #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =
example/instance.properties
# 静态topic:消息发送的分区为example
canal.mq.topic=example
# 动态topic:根据数据库、表动态设置发送的topic
canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
# 静态分区:数据发送的分区
canal.mq.partition=0
# 动态分区:根据数据库、表设置返送的分区
canal.mq.partitionsNum=3
canal.mq.partitionHash=test.table:id^name,.*\\..*
# 为不同的topic动态设置分区数
canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 #test.*:test开头的topic分区数为4
#mycanal:6 topic mycanal分区数为6
# 动态topic:canal.mq.dynamicTopic
test:test库中的所有数据都发送到test topic上
.*:数据库的数据都发到对应数据库名的topic上
mytest1.user:发送到mytest1_user topic上
mytest2\\..*:发送到mytest2_tableName topic上
topicName:.*:所有数据库的数据都发到topicName上
topicName:test\\..*:test下的所有表都发送到topicName上
test,test1\\.*:数据库test1中的表发送到test1_tableName topic上
数据库test中的所有表发送到test topic上
其余所有数据发送到canal.mq.topic指定的topic上
# 动态分区:
.*\\..*:id:hash字段为id
.*\\..*:id^name:hash字段为id、name
.*\\..*:$pk$:hash字段为主键(自动查找)
.*\\..*:根据tableName hash
partitionHash为空:发送到默认分区 0
test.test,test.test2:id:test.test根据表名test hash
test.test2根据id hash
其余发送到对应topic的0分区
CanalKafkaProducer:**kafka消息发送类**
@SuppressWarnings({ "rawtypes", "unchecked" })
@SPI("kafka")
public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQProducer {
private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
private static final String PREFIX_KAFKA_CONFIG = "kafka.";
private Producer<String, byte[]> producer;
@Override
public void init(Properties properties) {
KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig();
this.mqProperties = kafkaProducerConfig;
super.init(properties);
// load properties
this.loadKafkaProperties(properties);
Properties kafkaProperties = new Properties();
kafkaProperties.putAll(kafkaProducerConfig.getKafkaProperties());
kafkaProperties.put("max.in.flight.requests.per.connection", 1);
kafkaProperties.put("key.serializer", StringSerializer.class);
if (kafkaProducerConfig.isKerberosEnabled()) {
File krb5File = new File(kafkaProducerConfig.getKrb5File());
File jaasFile = new File(kafkaProducerConfig.getJaasFile());
if (krb5File.exists() && jaasFile.exists()) {
// 配置kerberos认证,需要使用绝对路径
System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath());
System.setProperty("java.security.auth.login.config", jaasFile.getAbsolutePath());
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
kafkaProperties.put("security.protocol", "SASL_PLAINTEXT");
kafkaProperties.put("sasl.kerberos.service.name", "kafka");
} else {
String errorMsg = "ERROR # The kafka kerberos configuration file does not exist! please check it";
logger.error(errorMsg);
throw new RuntimeException(errorMsg);
}
}
kafkaProperties.put("value.serializer", KafkaMessageSerializer.class);
producer = new KafkaProducer<>(kafkaProperties);
}
private void loadKafkaProperties(Properties properties) {
//加载相关配置
KafkaProducerConfig kafkaProducerConfig = (KafkaProducerConfig) this.mqProperties;
Map<String, Object> kafkaProperties = kafkaProducerConfig.getKafkaProperties();
// 兼容下<=1.1.4的mq配置
doMoreCompatibleConvert("canal.mq.servers", "kafka.bootstrap.servers", properties);
doMoreCompatibleConvert("canal.mq.acks", "kafka.acks", properties);
doMoreCompatibleConvert("canal.mq.compressionType", "kafka.compression.type", properties);
doMoreCompatibleConvert("canal.mq.retries", "kafka.retries", properties);
doMoreCompatibleConvert("canal.mq.batchSize", "kafka.batch.size", properties);
doMoreCompatibleConvert("canal.mq.lingerMs", "kafka.linger.ms", properties);
doMoreCompatibleConvert("canal.mq.maxRequestSize", "kafka.max.request.size", properties);
doMoreCompatibleConvert("canal.mq.bufferMemory", "kafka.buffer.memory", properties);
doMoreCompatibleConvert("canal.mq.kafka.kerberos.enable", "kafka.kerberos.enable", properties);
doMoreCompatibleConvert("canal.mq.kafka.kerberos.krb5.file", "kafka.kerberos.krb5.file", properties);
doMoreCompatibleConvert("canal.mq.kafka.kerberos.jaas.file", "kafka.kerberos.jaas.file", properties);
RocketMQConstants:rocketmq producer属性配置
public class RocketMQConstants {
public static final String ROOT = "rocketmq";
public static final String ROCKETMQ_PRODUCER_GROUP = ROOT + "." + "producer.group";
public static final String ROCKETMQ_ENABLE_MESSAGE_TRACE = ROOT + "." + "enable.message.trace";
public static final String ROCKETMQ_CUSTOMIZED_TRACE_TOPIC = ROOT + "." + "customized.trace.topic";
public static final String ROCKETMQ_NAMESPACE = ROOT + "." + "namespace";
public static final String ROCKETMQ_NAMESRV_ADDR = ROOT + "." + "namesrv.addr";
public static final String ROCKETMQ_RETRY_TIMES_WHEN_SEND_FAILED = ROOT + "." + "retry.times.when.send.failed";
public static final String ROCKETMQ_VIP_CHANNEL_ENABLED = ROOT + "." + "vip.channel.enabled";
public static final String ROCKETMQ_TAG = ROOT + "." + "tag";
public static final String ROCKETMQ_ACCESS_CHANNEL = ROOT + "." + "access.channel";
public static final String ROCKETMQ_BATCH_SIZE = ROOT + "." + "batch.size";
public static final String ROCKETMQ_SUBSCRIBE_FILTER = ROOT + "." + "subscribe.filter";
}
RabbitMQConstants:rabbitmq producer属性配置
public class RabbitMQConstants {
public static final String ROOT = "rabbitmq";
public static final String RABBITMQ_HOST = ROOT + "." + "host";
public static final String RABBITMQ_EXCHANGE = ROOT + "." + "exchange";
public static final String RABBITMQ_VIRTUAL_HOST = ROOT + "." + "virtual.host";
public static final String RABBITMQ_USERNAME = ROOT + "." + "username";
public static final String RABBITMQ_PASSWORD = ROOT + "." + "password";
public static final String RABBITMQ_RESOURCE_OWNERID = ROOT + "." + "rabbitmq.resource.ownerId";
}
*******************
canal adapter mq属性配置
application.yml:启动器配置
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
# srcDataSources:
# defaultDS:
# url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
# username: root
# password: 121212
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
CanalKafkaConsumer:kafka. 开头的属性均可读取
@SPI("kafka")
public class CanalKafkaConsumer implements CanalMsgConsumer {
private static final String PREFIX_KAFKA_CONFIG = "kafka.";
private KafkaConsumer<String, ?> kafkaConsumer;
private boolean flatMessage = true;
private String topic;
private Map<Integer, Long> currentOffsets = new ConcurrentHashMap<>();
private Properties kafkaProperties = new Properties();
@Override
public void init(Properties properties, String topic, String groupId) {
this.topic = topic;
Boolean flatMessage = (Boolean) properties.get(CanalConstants.CANAL_MQ_FLAT_MESSAGE);
if (flatMessage != null) {
this.flatMessage = flatMessage;
}
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
String k = (String) entry.getKey();
Object v = entry.getValue();
if (k.startsWith(PREFIX_KAFKA_CONFIG) && v != null) {
kafkaProperties.put(k.substring(PREFIX_KAFKA_CONFIG.length()), v);
} //读取所有 kafka. 开头的属性
}
kafkaProperties.put("group.id", groupId);
kafkaProperties.put("key.deserializer", StringDeserializer.class);
kafkaProperties.put("client.id", UUID.randomUUID().toString().substring(0, 6));
}
@Override
public void connect() {
if (this.flatMessage) {
kafkaProperties.put("value.deserializer", StringDeserializer.class);
this.kafkaConsumer = new KafkaConsumer<String, String>(kafkaProperties);
} else {
kafkaProperties.put("value.deserializer", KafkaMessageDeserializer.class);
this.kafkaConsumer = new KafkaConsumer<String, Message>(kafkaProperties);
}
kafkaConsumer.subscribe(Collections.singletonList(topic));
}
CanalRocketMQConsumer:rocketmq 消费相关属性
@SPI("rocketmq")
public class CanalRocketMQConsumer implements CanalMsgConsumer {
private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQConsumer.class);
private static final String CLOUD_ACCESS_CHANNEL = "cloud";
private String nameServer;
private String topic;
private String groupName;
private DefaultMQPushConsumer rocketMQConsumer;
private BlockingQueue<ConsumerBatchMessage<CommonMessage>> messageBlockingQueue;
private int batchSize = -1;
private long batchProcessTimeout = 60 * 1000;
private boolean flatMessage;
private volatile ConsumerBatchMessage<CommonMessage> lastGetBatchMessage = null;
private String accessKey;
private String secretKey;
private String customizedTraceTopic;
private boolean enableMessageTrace = false;
private String accessChannel;
private String namespace;
private String filter = "*";
@Override
public void init(Properties properties, String topic, String groupName) {
this.topic = topic;
this.groupName = groupName;
this.flatMessage = (Boolean) properties.get(CanalConstants.CANAL_MQ_FLAT_MESSAGE);
this.messageBlockingQueue = new LinkedBlockingQueue<>(1024);
this.accessKey = properties.getProperty(CanalConstants.CANAL_ALIYUN_ACCESS_KEY);
this.secretKey = properties.getProperty(CanalConstants.CANAL_ALIYUN_SECRET_KEY);
String enableMessageTrace = properties.getProperty(RocketMQConstants.ROCKETMQ_ENABLE_MESSAGE_TRACE);
if (StringUtils.isNotEmpty(enableMessageTrace)) {
this.enableMessageTrace = Boolean.parseBoolean(enableMessageTrace);
}
this.customizedTraceTopic = properties.getProperty(RocketMQConstants.ROCKETMQ_CUSTOMIZED_TRACE_TOPIC);
this.accessChannel = properties.getProperty(RocketMQConstants.ROCKETMQ_ACCESS_CHANNEL);
this.namespace = properties.getProperty(RocketMQConstants.ROCKETMQ_NAMESPACE);
this.nameServer = properties.getProperty(RocketMQConstants.ROCKETMQ_NAMESRV_ADDR);
String batchSize = properties.getProperty(RocketMQConstants.ROCKETMQ_BATCH_SIZE);
if (StringUtils.isNotEmpty(batchSize)) {
this.batchSize = Integer.parseInt(batchSize);
}
String subscribeFilter = properties.getProperty(RocketMQConstants.ROCKETMQ_SUBSCRIBE_FILTER);
if (StringUtils.isNotEmpty(subscribeFilter)) {
this.filter = subscribeFilter;
}
}
CanalRabbitMQConsumer:rabbitmq 消费相关属性
@SPI("rabbitmq")
public class CanalRabbitMQConsumer implements CanalMsgConsumer {
private static final Logger logger = LoggerFactory.getLogger(CanalRabbitMQConsumer.class);
// 链接地址
private String nameServer;
// 主机名
private String vhost;
private String queueName;
// 一些鉴权信息
private String accessKey;
private String secretKey;
private Long resourceOwnerId;
private String username;
private String password;
private boolean flatMessage;
private Connection connect;
private Channel channel;
private long batchProcessTimeout = 60 * 1000;
private BlockingQueue<ConsumerBatchMessage<CommonMessage>> messageBlockingQueue;
private volatile ConsumerBatchMessage<CommonMessage> lastGetBatchMessage = null;
@Override
public void init(Properties properties, String topic, String groupId) {
this.nameServer = PropertiesUtils.getProperty(properties, "rabbitmq.host");
this.vhost = PropertiesUtils.getProperty(properties, "rabbitmq.virtual.host");
this.queueName = topic;
this.accessKey = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_ALIYUN_ACCESS_KEY);
this.secretKey = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_ALIYUN_SECRET_KEY);
this.username = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_USERNAME);
this.password = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_PASSWORD);
Long resourceOwnerIdPro = (Long) properties.get(RabbitMQConstants.RABBITMQ_RESOURCE_OWNERID);
*******************
示例
创建 mysql实例
docker run -it -d --net fixed --ip 172.18.0.2 -p 3306:3306 --privileged=true \
--name mysql -e MYSQL_ROOT_PASSWORD=123456 mysql
# 创建用户、并授权
mysql> create user canal identified with mysql_native_password by "123456";
Query OK, 0 rows affected (0.01 sec)
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
Query OK, 0 rows affected (0.00 sec)
mysql> flush privileges;
Query OK, 0 rows affected (0.00 sec)
# 创建数据库、表
mysql> create database lihu;
Query OK, 1 row affected (0.35 sec)
mysql> use lihu;
Database changed
mysql> create table test(id int not null auto_increment primary key, value int not null);
Query OK, 0 rows affected (0.67 sec)
mysql> create table test2(id int not null auto_increment primary key, value int not null);
Query OK, 0 rows affected (0.27 sec)
创建 canal server
docker run -it -d --net fixed --ip 172.18.0.3 -p 11111:11111 --name canal-server \
-v /usr/canal/mq/conf:/home/admin/canal-server/conf canal/canal-server
# canal.properties
canal.serverMode = rocketmq
rocketmq.namesrv.addr = 172.18.0.4:9876
# instance.properties
canal.instance.master.address=172.18.0.2:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456
canal.mq.topic=example
canal.mq.dynamicTopic=.*
canal.mq.partition=0
canal.mq.partitionsNum=3
canal.mq.partitionHash=.*\\..*
创建 rocketmq(namesrv、broker)
docker run -it -d --net fixed --ip 172.18.0.4 -p 9876:9876 \
-e JAVA_OPT="-server -Xms256m -Xmx256m -Xmn128m" \
--name namesrv apacherocketmq/rocketmq:4.9.0-alpine sh mqnamesrv
docker run -it -d --net fixed --ip 172.18.0.5 -p 10911:10911 -p 10909:10909 \
-e NAMESRV_ADDR="172.18.0.4:9876" \
-e JAVA_OPT="-server -Xms2g -Xmx2g -Xmn1g" \
-v /usr/rocketmq/test/broker.conf:/home/rocketmq/rocketmq-4.9.0/conf/broker.conf \
--name broker apacherocketmq/rocketmq:4.9.0-alpine \
sh mqbroker autoCreateTopicEnable=true -c /home/rocketmq/rocketmq-4.9.0/conf/broker.conf
创建 canal adapter
docker run -it -d --net fixed --ip 172.18.0.6 -p 8081:8081 \
-v /usr/canal/mq/adapter/conf/application.yml:/opt/canal-adapter/conf/application.yml \
-v /usr/canal/mq/adapter/conf/rdb:/opt/canal-adapter/conf/rdb \
--name canal-adapter slpcat/canal-adapter:v1.1.5-jdk8
****************
application.yml:启动器配置
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: rocketMQ #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
rocketmq.namespace:
rocketmq.namesrv.addr: 172.18.0.4:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
srcDataSources:
defaultDS:
url: jdbc:mysql://172.18.0.2:3306/lihu?useUnicode=true
username: root
password: 123456
canalAdapters:
- instance: lihu # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://172.18.0.7:3306/lihu?useUnicode=true
jdbc.username: root
jdbc.password: 123456
****************
rdb/mytest_user.yml:适配器配置
dataSourceKey: defaultDS
destination: lihu
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
mirrorDb: true
database: lihu
创建 mysql2实例(目标数据库)
docker run -it -d --net fixed --ip 172.18.0.7 -p 3307:3306 --privileged=true \
--name mysql2 -e MYSQL_ROOT_PASSWORD=123456 mysql
# 创建数据库(不需要创建表)
mysql> create database lihu;
Query OK, 1 row affected (0.00 sec)
*******************
使用测试
源数据库 mysql:插入数据
mysql> insert into test(id,value) values(1,2);
Query OK, 1 row affected (0.11 sec)
mysql> insert into test(id,value) values(2,2);
Query OK, 1 row affected (0.15 sec)
mysql> insert into test2(id,value) values(1,2);
Query OK, 1 row affected (0.03 sec)
mysql> insert into test2(id,value) values(2,2);
Query OK, 1 row affected (0.11 sec)
canal adapter 日志
2021-07-14 22:36:09.564 [pool-7-thread-1] DEBUG c.a.o.c.c.adapter.rdb.service.RdbMirrorDbSyncService - DDL: {"data":null,"database":"lihu","destination":"lihu","es":1626273340000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"create table test(id int not null auto_increment primary key, value int not null)","table":"test","ts":1626273345726,"type":"CREATE"}
2021-07-14 22:42:00.785 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"2","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test","type":"INSERT"}
2021-07-14 22:44:48.353 [pool-7-thread-1] DEBUG c.a.o.c.c.adapter.rdb.service.RdbMirrorDbSyncService - DDL: {"data":null,"database":"lihu","destination":"lihu","es":1626273887000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"create table test2(id int not null auto_increment primary key, value int not null)","table":"test2","ts":1626273888331,"type":"CREATE"}
2021-07-14 22:44:54.400 [pool-5-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"1","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test2","type":"INSERT"}
2021-07-14 22:44:57.226 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"2","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test2","type":"INSERT"}
2021-07-14 22:57:26.640 [pool-7-thread-1] DEBUG c.a.o.c.c.adapter.rdb.service.RdbMirrorDbSyncService - DDL: {"data":null,"database":"lihu","destination":"lihu","es":1626274646000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"create table test3(id int not null auto_increment primary key, value int not null)","table":"test3","ts":1626274646540,"type":"CREATE"}
2021-07-14 22:57:34.374 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"2","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test3","type":"INSERT"}
2021-07-14 22:59:08.462 [pool-5-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"1","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test","type":"DELETE"}
2021-07-14 22:59:16.007 [pool-5-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"1","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test","type":"INSERT"}
目标数据库 mysql2:查看数据
mysql> select * from test;
+----+-------+
| id | value |
+----+-------+
| 1 | 2 |
| 2 | 2 |
+----+-------+
2 rows in set (0.00 sec)
mysql> select * from test2;
+----+-------+
| id | value |
+----+-------+
| 1 | 2 |
| 2 | 2 |
+----+-------+
mysql2 可以同步 mysql中的数据
还没有评论,来说两句吧...