canal mq数据同步

刺骨的言语ヽ痛彻心扉 2022-10-19 12:48 254阅读 0赞

canal mq数据同步

官网:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

*******************

简 介

tcp mode:客户端直连canal server

  1. ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzkzMTYyNQ_size_16_color_FFFFFF_t_70][]

mq mode:canal server将binlog数据发送到mq,1.1.5支持kafka、rocketmq、rabbitmq

  1. ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzkzMTYyNQ_size_16_color_FFFFFF_t_70 1][]

*******************

canal server mq配置

canal.properties

  1. ##################################################
  2. ######### MQ Properties #############
  3. ##################################################
  4. # aliyun ak/sk , support rds/mq
  5. canal.aliyun.accessKey =
  6. canal.aliyun.secretKey =
  7. canal.aliyun.uid=
  8. canal.mq.flatMessage = true
  9. canal.mq.canalBatchSize = 50
  10. canal.mq.canalGetTimeout = 100
  11. # Set this value to "cloud", if you want open message trace feature in aliyun.
  12. canal.mq.accessChannel = local
  13. canal.mq.database.hash = true
  14. canal.mq.send.thread.size = 30
  15. canal.mq.build.thread.size = 8
  16. ##################################################
  17. ######### Kafka #############
  18. ##################################################
  19. kafka.bootstrap.servers = 127.0.0.1:9092
  20. kafka.acks = all
  21. kafka.compression.type = none
  22. kafka.batch.size = 16384
  23. kafka.linger.ms = 1
  24. kafka.max.request.size = 1048576
  25. kafka.buffer.memory = 33554432
  26. kafka.max.in.flight.requests.per.connection = 1
  27. kafka.retries = 0
  28. kafka.kerberos.enable = false
  29. kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
  30. kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
  31. ##################################################
  32. ######### RocketMQ #############
  33. ##################################################
  34. rocketmq.producer.group = test
  35. rocketmq.enable.message.trace = false
  36. rocketmq.customized.trace.topic =
  37. rocketmq.namespace =
  38. rocketmq.namesrv.addr = 127.0.0.1:9876
  39. rocketmq.retry.times.when.send.failed = 0
  40. rocketmq.vip.channel.enabled = false
  41. rocketmq.tag =
  42. ##################################################
  43. ######### RabbitMQ #############
  44. ##################################################
  45. rabbitmq.host =
  46. rabbitmq.virtual.host =
  47. rabbitmq.exchange =
  48. rabbitmq.username =
  49. rabbitmq.password =
  50. rabbitmq.deliveryMode =

example/instance.properties

  1. # 静态topic:消息发送的分区为example
  2. canal.mq.topic=example
  3. # 动态topic:根据数据库、表动态设置发送的topic
  4. canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
  5. # 静态分区:数据发送的分区
  6. canal.mq.partition=0
  7. # 动态分区:根据数据库、表设置返送的分区
  8. canal.mq.partitionsNum=3
  9. canal.mq.partitionHash=test.table:id^name,.*\\..*
  10. # 为不同的topic动态设置分区数
  11. canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 #test.*:test开头的topic分区数为4
  12. #mycanal:6 topic mycanal分区数为6
  13. # 动态topic:canal.mq.dynamicTopic
  14. testtest库中的所有数据都发送到test topic
  15. .*:数据库的数据都发到对应数据库名的topic
  16. mytest1.user:发送到mytest1_user topic
  17. mytest2\\..*:发送到mytest2_tableName topic
  18. topicName:.*:所有数据库的数据都发到topicName
  19. topicName:test\\..*:test下的所有表都发送到topicName
  20. test,test1\\.*:数据库test1中的表发送到test1_tableName topic
  21. 数据库test中的所有表发送到test topic
  22. 其余所有数据发送到canal.mq.topic指定的topic
  23. # 动态分区:
  24. .*\\..*:idhash字段为id
  25. .*\\..*:id^namehash字段为idname
  26. .*\\..*:$pk$hash字段为主键(自动查找)
  27. .*\\..*:根据tableName hash
  28. partitionHash为空:发送到默认分区 0
  29. test.test,test.test2:idtest.test根据表名test hash
  30. test.test2根据id hash
  31. 其余发送到对应topic0分区

CanalKafkaProducer:**kafka消息发送类**

  1. @SuppressWarnings({ "rawtypes", "unchecked" })
  2. @SPI("kafka")
  3. public class CanalKafkaProducer extends AbstractMQProducer implements CanalMQProducer {
  4. private static final Logger logger = LoggerFactory.getLogger(CanalKafkaProducer.class);
  5. private static final String PREFIX_KAFKA_CONFIG = "kafka.";
  6. private Producer<String, byte[]> producer;
  7. @Override
  8. public void init(Properties properties) {
  9. KafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig();
  10. this.mqProperties = kafkaProducerConfig;
  11. super.init(properties);
  12. // load properties
  13. this.loadKafkaProperties(properties);
  14. Properties kafkaProperties = new Properties();
  15. kafkaProperties.putAll(kafkaProducerConfig.getKafkaProperties());
  16. kafkaProperties.put("max.in.flight.requests.per.connection", 1);
  17. kafkaProperties.put("key.serializer", StringSerializer.class);
  18. if (kafkaProducerConfig.isKerberosEnabled()) {
  19. File krb5File = new File(kafkaProducerConfig.getKrb5File());
  20. File jaasFile = new File(kafkaProducerConfig.getJaasFile());
  21. if (krb5File.exists() && jaasFile.exists()) {
  22. // 配置kerberos认证,需要使用绝对路径
  23. System.setProperty("java.security.krb5.conf", krb5File.getAbsolutePath());
  24. System.setProperty("java.security.auth.login.config", jaasFile.getAbsolutePath());
  25. System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
  26. kafkaProperties.put("security.protocol", "SASL_PLAINTEXT");
  27. kafkaProperties.put("sasl.kerberos.service.name", "kafka");
  28. } else {
  29. String errorMsg = "ERROR # The kafka kerberos configuration file does not exist! please check it";
  30. logger.error(errorMsg);
  31. throw new RuntimeException(errorMsg);
  32. }
  33. }
  34. kafkaProperties.put("value.serializer", KafkaMessageSerializer.class);
  35. producer = new KafkaProducer<>(kafkaProperties);
  36. }
  37. private void loadKafkaProperties(Properties properties) {
  38. //加载相关配置
  39. KafkaProducerConfig kafkaProducerConfig = (KafkaProducerConfig) this.mqProperties;
  40. Map<String, Object> kafkaProperties = kafkaProducerConfig.getKafkaProperties();
  41. // 兼容下<=1.1.4的mq配置
  42. doMoreCompatibleConvert("canal.mq.servers", "kafka.bootstrap.servers", properties);
  43. doMoreCompatibleConvert("canal.mq.acks", "kafka.acks", properties);
  44. doMoreCompatibleConvert("canal.mq.compressionType", "kafka.compression.type", properties);
  45. doMoreCompatibleConvert("canal.mq.retries", "kafka.retries", properties);
  46. doMoreCompatibleConvert("canal.mq.batchSize", "kafka.batch.size", properties);
  47. doMoreCompatibleConvert("canal.mq.lingerMs", "kafka.linger.ms", properties);
  48. doMoreCompatibleConvert("canal.mq.maxRequestSize", "kafka.max.request.size", properties);
  49. doMoreCompatibleConvert("canal.mq.bufferMemory", "kafka.buffer.memory", properties);
  50. doMoreCompatibleConvert("canal.mq.kafka.kerberos.enable", "kafka.kerberos.enable", properties);
  51. doMoreCompatibleConvert("canal.mq.kafka.kerberos.krb5.file", "kafka.kerberos.krb5.file", properties);
  52. doMoreCompatibleConvert("canal.mq.kafka.kerberos.jaas.file", "kafka.kerberos.jaas.file", properties);

RocketMQConstants:rocketmq producer属性配置

  1. public class RocketMQConstants {
  2. public static final String ROOT = "rocketmq";
  3. public static final String ROCKETMQ_PRODUCER_GROUP = ROOT + "." + "producer.group";
  4. public static final String ROCKETMQ_ENABLE_MESSAGE_TRACE = ROOT + "." + "enable.message.trace";
  5. public static final String ROCKETMQ_CUSTOMIZED_TRACE_TOPIC = ROOT + "." + "customized.trace.topic";
  6. public static final String ROCKETMQ_NAMESPACE = ROOT + "." + "namespace";
  7. public static final String ROCKETMQ_NAMESRV_ADDR = ROOT + "." + "namesrv.addr";
  8. public static final String ROCKETMQ_RETRY_TIMES_WHEN_SEND_FAILED = ROOT + "." + "retry.times.when.send.failed";
  9. public static final String ROCKETMQ_VIP_CHANNEL_ENABLED = ROOT + "." + "vip.channel.enabled";
  10. public static final String ROCKETMQ_TAG = ROOT + "." + "tag";
  11. public static final String ROCKETMQ_ACCESS_CHANNEL = ROOT + "." + "access.channel";
  12. public static final String ROCKETMQ_BATCH_SIZE = ROOT + "." + "batch.size";
  13. public static final String ROCKETMQ_SUBSCRIBE_FILTER = ROOT + "." + "subscribe.filter";
  14. }

RabbitMQConstants:rabbitmq producer属性配置

  1. public class RabbitMQConstants {
  2. public static final String ROOT = "rabbitmq";
  3. public static final String RABBITMQ_HOST = ROOT + "." + "host";
  4. public static final String RABBITMQ_EXCHANGE = ROOT + "." + "exchange";
  5. public static final String RABBITMQ_VIRTUAL_HOST = ROOT + "." + "virtual.host";
  6. public static final String RABBITMQ_USERNAME = ROOT + "." + "username";
  7. public static final String RABBITMQ_PASSWORD = ROOT + "." + "password";
  8. public static final String RABBITMQ_RESOURCE_OWNERID = ROOT + "." + "rabbitmq.resource.ownerId";
  9. }

*******************

canal adapter mq属性配置

application.yml:启动器配置

  1. server:
  2. port: 8081
  3. spring:
  4. jackson:
  5. date-format: yyyy-MM-dd HH:mm:ss
  6. time-zone: GMT+8
  7. default-property-inclusion: non_null
  8. canal.conf:
  9. mode: tcp #tcp kafka rocketMQ rabbitMQ
  10. flatMessage: true
  11. zookeeperHosts:
  12. syncBatchSize: 1000
  13. retries: 0
  14. timeout:
  15. accessKey:
  16. secretKey:
  17. consumerProperties:
  18. # canal tcp consumer
  19. canal.tcp.server.host: 127.0.0.1:11111
  20. canal.tcp.zookeeper.hosts:
  21. canal.tcp.batch.size: 500
  22. canal.tcp.username:
  23. canal.tcp.password:
  24. # kafka consumer
  25. kafka.bootstrap.servers: 127.0.0.1:9092
  26. kafka.enable.auto.commit: false
  27. kafka.auto.commit.interval.ms: 1000
  28. kafka.auto.offset.reset: latest
  29. kafka.request.timeout.ms: 40000
  30. kafka.session.timeout.ms: 30000
  31. kafka.isolation.level: read_committed
  32. kafka.max.poll.records: 1000
  33. # rocketMQ consumer
  34. rocketmq.namespace:
  35. rocketmq.namesrv.addr: 127.0.0.1:9876
  36. rocketmq.batch.size: 1000
  37. rocketmq.enable.message.trace: false
  38. rocketmq.customized.trace.topic:
  39. rocketmq.access.channel:
  40. rocketmq.subscribe.filter:
  41. # rabbitMQ consumer
  42. rabbitmq.host:
  43. rabbitmq.virtual.host:
  44. rabbitmq.username:
  45. rabbitmq.password:
  46. rabbitmq.resource.ownerId:
  47. # srcDataSources:
  48. # defaultDS:
  49. # url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
  50. # username: root
  51. # password: 121212
  52. canalAdapters:
  53. - instance: example # canal instance Name or mq topic name
  54. groups:
  55. - groupId: g1
  56. outerAdapters:
  57. - name: logger

CanalKafkaConsumer:kafka. 开头的属性均可读取

  1. @SPI("kafka")
  2. public class CanalKafkaConsumer implements CanalMsgConsumer {
  3. private static final String PREFIX_KAFKA_CONFIG = "kafka.";
  4. private KafkaConsumer<String, ?> kafkaConsumer;
  5. private boolean flatMessage = true;
  6. private String topic;
  7. private Map<Integer, Long> currentOffsets = new ConcurrentHashMap<>();
  8. private Properties kafkaProperties = new Properties();
  9. @Override
  10. public void init(Properties properties, String topic, String groupId) {
  11. this.topic = topic;
  12. Boolean flatMessage = (Boolean) properties.get(CanalConstants.CANAL_MQ_FLAT_MESSAGE);
  13. if (flatMessage != null) {
  14. this.flatMessage = flatMessage;
  15. }
  16. for (Map.Entry<Object, Object> entry : properties.entrySet()) {
  17. String k = (String) entry.getKey();
  18. Object v = entry.getValue();
  19. if (k.startsWith(PREFIX_KAFKA_CONFIG) && v != null) {
  20. kafkaProperties.put(k.substring(PREFIX_KAFKA_CONFIG.length()), v);
  21. } //读取所有 kafka. 开头的属性
  22. }
  23. kafkaProperties.put("group.id", groupId);
  24. kafkaProperties.put("key.deserializer", StringDeserializer.class);
  25. kafkaProperties.put("client.id", UUID.randomUUID().toString().substring(0, 6));
  26. }
  27. @Override
  28. public void connect() {
  29. if (this.flatMessage) {
  30. kafkaProperties.put("value.deserializer", StringDeserializer.class);
  31. this.kafkaConsumer = new KafkaConsumer<String, String>(kafkaProperties);
  32. } else {
  33. kafkaProperties.put("value.deserializer", KafkaMessageDeserializer.class);
  34. this.kafkaConsumer = new KafkaConsumer<String, Message>(kafkaProperties);
  35. }
  36. kafkaConsumer.subscribe(Collections.singletonList(topic));
  37. }

CanalRocketMQConsumer:rocketmq 消费相关属性

  1. @SPI("rocketmq")
  2. public class CanalRocketMQConsumer implements CanalMsgConsumer {
  3. private static final Logger logger = LoggerFactory.getLogger(CanalRocketMQConsumer.class);
  4. private static final String CLOUD_ACCESS_CHANNEL = "cloud";
  5. private String nameServer;
  6. private String topic;
  7. private String groupName;
  8. private DefaultMQPushConsumer rocketMQConsumer;
  9. private BlockingQueue<ConsumerBatchMessage<CommonMessage>> messageBlockingQueue;
  10. private int batchSize = -1;
  11. private long batchProcessTimeout = 60 * 1000;
  12. private boolean flatMessage;
  13. private volatile ConsumerBatchMessage<CommonMessage> lastGetBatchMessage = null;
  14. private String accessKey;
  15. private String secretKey;
  16. private String customizedTraceTopic;
  17. private boolean enableMessageTrace = false;
  18. private String accessChannel;
  19. private String namespace;
  20. private String filter = "*";
  21. @Override
  22. public void init(Properties properties, String topic, String groupName) {
  23. this.topic = topic;
  24. this.groupName = groupName;
  25. this.flatMessage = (Boolean) properties.get(CanalConstants.CANAL_MQ_FLAT_MESSAGE);
  26. this.messageBlockingQueue = new LinkedBlockingQueue<>(1024);
  27. this.accessKey = properties.getProperty(CanalConstants.CANAL_ALIYUN_ACCESS_KEY);
  28. this.secretKey = properties.getProperty(CanalConstants.CANAL_ALIYUN_SECRET_KEY);
  29. String enableMessageTrace = properties.getProperty(RocketMQConstants.ROCKETMQ_ENABLE_MESSAGE_TRACE);
  30. if (StringUtils.isNotEmpty(enableMessageTrace)) {
  31. this.enableMessageTrace = Boolean.parseBoolean(enableMessageTrace);
  32. }
  33. this.customizedTraceTopic = properties.getProperty(RocketMQConstants.ROCKETMQ_CUSTOMIZED_TRACE_TOPIC);
  34. this.accessChannel = properties.getProperty(RocketMQConstants.ROCKETMQ_ACCESS_CHANNEL);
  35. this.namespace = properties.getProperty(RocketMQConstants.ROCKETMQ_NAMESPACE);
  36. this.nameServer = properties.getProperty(RocketMQConstants.ROCKETMQ_NAMESRV_ADDR);
  37. String batchSize = properties.getProperty(RocketMQConstants.ROCKETMQ_BATCH_SIZE);
  38. if (StringUtils.isNotEmpty(batchSize)) {
  39. this.batchSize = Integer.parseInt(batchSize);
  40. }
  41. String subscribeFilter = properties.getProperty(RocketMQConstants.ROCKETMQ_SUBSCRIBE_FILTER);
  42. if (StringUtils.isNotEmpty(subscribeFilter)) {
  43. this.filter = subscribeFilter;
  44. }
  45. }

CanalRabbitMQConsumer:rabbitmq 消费相关属性

  1. @SPI("rabbitmq")
  2. public class CanalRabbitMQConsumer implements CanalMsgConsumer {
  3. private static final Logger logger = LoggerFactory.getLogger(CanalRabbitMQConsumer.class);
  4. // 链接地址
  5. private String nameServer;
  6. // 主机名
  7. private String vhost;
  8. private String queueName;
  9. // 一些鉴权信息
  10. private String accessKey;
  11. private String secretKey;
  12. private Long resourceOwnerId;
  13. private String username;
  14. private String password;
  15. private boolean flatMessage;
  16. private Connection connect;
  17. private Channel channel;
  18. private long batchProcessTimeout = 60 * 1000;
  19. private BlockingQueue<ConsumerBatchMessage<CommonMessage>> messageBlockingQueue;
  20. private volatile ConsumerBatchMessage<CommonMessage> lastGetBatchMessage = null;
  21. @Override
  22. public void init(Properties properties, String topic, String groupId) {
  23. this.nameServer = PropertiesUtils.getProperty(properties, "rabbitmq.host");
  24. this.vhost = PropertiesUtils.getProperty(properties, "rabbitmq.virtual.host");
  25. this.queueName = topic;
  26. this.accessKey = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_ALIYUN_ACCESS_KEY);
  27. this.secretKey = PropertiesUtils.getProperty(properties, CanalConstants.CANAL_ALIYUN_SECRET_KEY);
  28. this.username = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_USERNAME);
  29. this.password = PropertiesUtils.getProperty(properties, RabbitMQConstants.RABBITMQ_PASSWORD);
  30. Long resourceOwnerIdPro = (Long) properties.get(RabbitMQConstants.RABBITMQ_RESOURCE_OWNERID);

*******************

示例

创建 mysql实例

  1. docker run -it -d --net fixed --ip 172.18.0.2 -p 3306:3306 --privileged=true \
  2. --name mysql -e MYSQL_ROOT_PASSWORD=123456 mysql
  3. # 创建用户、并授权
  4. mysql> create user canal identified with mysql_native_password by "123456";
  5. Query OK, 0 rows affected (0.01 sec)
  6. mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
  7. Query OK, 0 rows affected (0.00 sec)
  8. mysql> flush privileges;
  9. Query OK, 0 rows affected (0.00 sec)
  10. # 创建数据库、表
  11. mysql> create database lihu;
  12. Query OK, 1 row affected (0.35 sec)
  13. mysql> use lihu;
  14. Database changed
  15. mysql> create table test(id int not null auto_increment primary key, value int not null);
  16. Query OK, 0 rows affected (0.67 sec)
  17. mysql> create table test2(id int not null auto_increment primary key, value int not null);
  18. Query OK, 0 rows affected (0.27 sec)

创建 canal server

  1. docker run -it -d --net fixed --ip 172.18.0.3 -p 11111:11111 --name canal-server \
  2. -v /usr/canal/mq/conf:/home/admin/canal-server/conf canal/canal-server
  3. # canal.properties
  4. canal.serverMode = rocketmq
  5. rocketmq.namesrv.addr = 172.18.0.4:9876
  6. # instance.properties
  7. canal.instance.master.address=172.18.0.2:3306
  8. canal.instance.dbUsername=canal
  9. canal.instance.dbPassword=123456
  10. canal.mq.topic=example
  11. canal.mq.dynamicTopic=.*
  12. canal.mq.partition=0
  13. canal.mq.partitionsNum=3
  14. canal.mq.partitionHash=.*\\..*

创建 rocketmq(namesrv、broker)

  1. docker run -it -d --net fixed --ip 172.18.0.4 -p 9876:9876 \
  2. -e JAVA_OPT="-server -Xms256m -Xmx256m -Xmn128m" \
  3. --name namesrv apacherocketmq/rocketmq:4.9.0-alpine sh mqnamesrv
  4. docker run -it -d --net fixed --ip 172.18.0.5 -p 10911:10911 -p 10909:10909 \
  5. -e NAMESRV_ADDR="172.18.0.4:9876" \
  6. -e JAVA_OPT="-server -Xms2g -Xmx2g -Xmn1g" \
  7. -v /usr/rocketmq/test/broker.conf:/home/rocketmq/rocketmq-4.9.0/conf/broker.conf \
  8. --name broker apacherocketmq/rocketmq:4.9.0-alpine \
  9. sh mqbroker autoCreateTopicEnable=true -c /home/rocketmq/rocketmq-4.9.0/conf/broker.conf

创建 canal adapter

  1. docker run -it -d --net fixed --ip 172.18.0.6 -p 8081:8081 \
  2. -v /usr/canal/mq/adapter/conf/application.yml:/opt/canal-adapter/conf/application.yml \
  3. -v /usr/canal/mq/adapter/conf/rdb:/opt/canal-adapter/conf/rdb \
  4. --name canal-adapter slpcat/canal-adapter:v1.1.5-jdk8
  5. ****************
  6. application.yml:启动器配置
  7. server:
  8. port: 8081
  9. spring:
  10. jackson:
  11. date-format: yyyy-MM-dd HH:mm:ss
  12. time-zone: GMT+8
  13. default-property-inclusion: non_null
  14. canal.conf:
  15. mode: rocketMQ #tcp kafka rocketMQ rabbitMQ
  16. flatMessage: true
  17. zookeeperHosts:
  18. syncBatchSize: 1000
  19. retries: 0
  20. timeout:
  21. accessKey:
  22. secretKey:
  23. consumerProperties:
  24. rocketmq.namespace:
  25. rocketmq.namesrv.addr: 172.18.0.4:9876
  26. rocketmq.batch.size: 1000
  27. rocketmq.enable.message.trace: false
  28. rocketmq.customized.trace.topic:
  29. rocketmq.access.channel:
  30. rocketmq.subscribe.filter:
  31. srcDataSources:
  32. defaultDS:
  33. url: jdbc:mysql://172.18.0.2:3306/lihu?useUnicode=true
  34. username: root
  35. password: 123456
  36. canalAdapters:
  37. - instance: lihu # canal instance Name or mq topic name
  38. groups:
  39. - groupId: g1
  40. outerAdapters:
  41. - name: rdb
  42. key: mysql1
  43. properties:
  44. jdbc.driverClassName: com.mysql.jdbc.Driver
  45. jdbc.url: jdbc:mysql://172.18.0.7:3306/lihu?useUnicode=true
  46. jdbc.username: root
  47. jdbc.password: 123456
  48. ****************
  49. rdb/mytest_user.yml:适配器配置
  50. dataSourceKey: defaultDS
  51. destination: lihu
  52. groupId: g1
  53. outerAdapterKey: mysql1
  54. concurrent: true
  55. dbMapping:
  56. mirrorDb: true
  57. database: lihu

创建 mysql2实例(目标数据库)

  1. docker run -it -d --net fixed --ip 172.18.0.7 -p 3307:3306 --privileged=true \
  2. --name mysql2 -e MYSQL_ROOT_PASSWORD=123456 mysql
  3. # 创建数据库(不需要创建表)
  4. mysql> create database lihu;
  5. Query OK, 1 row affected (0.00 sec)

*******************

使用测试

源数据库 mysql:插入数据

  1. mysql> insert into test(id,value) values(1,2);
  2. Query OK, 1 row affected (0.11 sec)
  3. mysql> insert into test(id,value) values(2,2);
  4. Query OK, 1 row affected (0.15 sec)
  5. mysql> insert into test2(id,value) values(1,2);
  6. Query OK, 1 row affected (0.03 sec)
  7. mysql> insert into test2(id,value) values(2,2);
  8. Query OK, 1 row affected (0.11 sec)

canal adapter 日志

  1. 2021-07-14 22:36:09.564 [pool-7-thread-1] DEBUG c.a.o.c.c.adapter.rdb.service.RdbMirrorDbSyncService - DDL: {"data":null,"database":"lihu","destination":"lihu","es":1626273340000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"create table test(id int not null auto_increment primary key, value int not null)","table":"test","ts":1626273345726,"type":"CREATE"}
  2. 2021-07-14 22:42:00.785 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"2","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test","type":"INSERT"}
  3. 2021-07-14 22:44:48.353 [pool-7-thread-1] DEBUG c.a.o.c.c.adapter.rdb.service.RdbMirrorDbSyncService - DDL: {"data":null,"database":"lihu","destination":"lihu","es":1626273887000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"create table test2(id int not null auto_increment primary key, value int not null)","table":"test2","ts":1626273888331,"type":"CREATE"}
  4. 2021-07-14 22:44:54.400 [pool-5-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"1","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test2","type":"INSERT"}
  5. 2021-07-14 22:44:57.226 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"2","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test2","type":"INSERT"}
  6. 2021-07-14 22:57:26.640 [pool-7-thread-1] DEBUG c.a.o.c.c.adapter.rdb.service.RdbMirrorDbSyncService - DDL: {"data":null,"database":"lihu","destination":"lihu","es":1626274646000,"groupId":"g1","isDdl":true,"old":null,"pkNames":null,"sql":"create table test3(id int not null auto_increment primary key, value int not null)","table":"test3","ts":1626274646540,"type":"CREATE"}
  7. 2021-07-14 22:57:34.374 [pool-6-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"2","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test3","type":"INSERT"}
  8. 2021-07-14 22:59:08.462 [pool-5-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"1","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test","type":"DELETE"}
  9. 2021-07-14 22:59:16.007 [pool-5-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"id":"1","value":"2"},"database":"lihu","destination":"lihu","old":null,"table":"test","type":"INSERT"}

目标数据库 mysql2:查看数据

  1. mysql> select * from test;
  2. +----+-------+
  3. | id | value |
  4. +----+-------+
  5. | 1 | 2 |
  6. | 2 | 2 |
  7. +----+-------+
  8. 2 rows in set (0.00 sec)
  9. mysql> select * from test2;
  10. +----+-------+
  11. | id | value |
  12. +----+-------+
  13. | 1 | 2 |
  14. | 2 | 2 |
  15. +----+-------+

mysql2 可以同步 mysql中的数据

发表评论

表情:
评论列表 (有 0 条评论,254人围观)

还没有评论,来说两句吧...

相关阅读

    相关 canal实现mysql数据同步

    前言 canal是实现mysql数据备份,异地灾备,异地数据同步等重要的中间件,在实际的业务场景中有着广泛的使用,本文基于小编所在项目中一个异地数据同步的场景为例,通过案

    相关 canal mysql 数据同步

    首先canal是什么呢?     canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL;简单来说,