云原生丨超详细,教你基于Debezium与Kafka构建数据同步迁移(建议收藏)

末蓝、 2023-10-04 21:57 70阅读 0赞

文章目录

  • 前言
  • 一、安装部署
    • Debezium架构
    • 部署示意图
    • 安装部署
  • 二、数据迁移
    • Postgres迁移到Postgres
    • MySQL迁移到PostgresSQL

前言

在项目中,我们遇到已有数据库现存有大量数据,但需要将全部现存数据同步迁移到新的数据库中,我们应该如何处理呢?

本期我们就基于Debezium与Kafka构建数据同步。


一、安装部署

Debezium架构

在这里插入图片描述
Debezium 是一个基于不同数据库中提供的变更数据捕获功能(例如,PostgreSQL中的逻辑解码)构建的分布式平台。 Debezium是通过Apache Kafka连接部署的。

Kafka Connect是一个用于实现和操作的框架运行时。

源连接器,如Debezium,它将数据摄取到Kafka中(在我们的接下来实际的例子中,Debezium将Mysql数据摄取到Kafka中);

接收连接器,它将数据从Kafka主题写入到其他到系统,这个系统可以有多种,在我们例子中,会将Kafka主题写入到PostgreSQL数据库中。

部署示意图

在这里插入图片描述

  • Zookeeper:Zookeeper容器,用于构建Kafka环境;
  • Kafka:Kafka容器,数据库的变更信息以topic的形式保存在kafka中;
  • Kafka-ui:kafka的UI页面容器,可以直观的查看kafka中的Brokers,Topics,Consumers等信息;
  • Connect:Debezium的Connect容器,对接Kafka的Connect,通过Source Connector将数据同步到Kafka中,通过Sink Connect消费Kafka的topic消息;
  • Debezium Connector:Source Connector插件,以Jar包的形式部署在Connect中,Debezium自带有MongoDB,MySQL,PostgreSQL,SQL Server,Oracle,Db2连接器;
  • DBC connector:Sink Connector插件,以Jar包的形式部署在Connect中,本次部署安装的是JDBC连接器,将Kafka上的数据同步到数据库中;
  • Debezium-ui:Debezium connect的ui页面容器。用于创建和显示Source Connector
  • Source Database:数据迁移来源方数据库。本次部署中使用的是MySQL和Postgres(10+版本);
  • Target Database:数据库迁移目标数据库。本次部署中使用的是Postgres。

安装部署

本次部署需要先安装Docker。

Debezium使用Docker安装部署,如下⬇

docker-compose.yaml

  1. version: '2'
  2. services:
  3. zookeeper:
  4. image: quay.io/debezium/zookeeper:2.0
  5. ports:
  6. - 2181:2181
  7. - 2888:2888
  8. - 3888:3888
  9. kafka:
  10. image: quay.io/debezium/kafka:2.0
  11. ports:
  12. - 9092:9092
  13. links:
  14. - zookeeper
  15. environment:
  16. - ZOOKEEPER_CONNECT=zookeeper:2181
  17. connect:
  18. image: quay.io/debezium/connect:2.0
  19. ports:
  20. - 8083:8083
  21. - 5005:5005
  22. links:
  23. - kafka
  24. environment:
  25. - BOOTSTRAP_SERVERS=kafka:9092
  26. - GROUP_ID=1
  27. - CONFIG_STORAGE_TOPIC=my_connect_configs
  28. - OFFSET_STORAGE_TOPIC=my_connect_offsets
  29. - STATUS_STORAGE_TOPIC=my_source_connect_statuses
  30. kafka-ui:
  31. image: provectuslabs/kafka-ui:latest
  32. ports:
  33. - "9093:8080"
  34. environment:
  35. - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
  36. links:
  37. - kafka
  38. debezium-ui:
  39. image: debezium/debezium-ui:2.0
  40. ports:
  41. - "8080:8080"
  42. environment:
  43. - KAFKA_CONNECT_URIS=http://connect:8083
  44. links:
  45. - connect

部署命令:

  1. docker-compose -f docker-compose.yaml -p debezium up -d

部署完成后,Docker容器列表,如下:

在这里插入图片描述

  • Kafka-ui访问地址:http://localhost:9093
  • Debezium-ui访问地址:http://localhost:8080

Source Connector和Sink Connector都是以JAR包的方式,存在于Connect容器的/kafka/connect目录下。

Connect容器自带有Debezium的官方Source Connector:

  • debezium-connector-db2
  • debezium-connector-mysql
  • debezium-connector-postgres
  • debezium-connector-vitess
  • debezium-connector-mongodb
  • debezium-connector-oracle
  • debezium-connector-sqlserver

需要自行注册Sink Connector:Kafka-Connect-JDBC(新建Kafka-Connect-JDBC目录,下载JAR包放入此目录,重启Conenct)。

注册Sink Connector

  1. # docker容器中新建kafka-connect-jdbc目录
  2. docker exec 容器id mkdir /kafka/connect/kafka-connect-jdbc
  3. # 下载jar包到本地
  4. wget https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/5.3.2/kafka-connect-jdbc-5.3.2.jar
  5. # 拷贝jar包到docker容器
  6. docker cp kafka-connect-jdbc-5.3.2.jar 容器id:/kafka/connect/kafka-connect-jdbc
  7. # 重启connect容器
  8. docker restart 容器id

二、数据迁移

在这里插入图片描述

数据迁移经历以下几个步骤:

1)启动源数据库;

2)注册Source Connector,Source Connector监听Source Database的数据变动,发布数据到Kafka的Topic中,一个表对应一个Topic,Topic中包含对表中某条记录的某个操作(新增,修改,删除等);

3)启动目标数据库;

4)注册Sink Connector,Sink Connector消费Kafka中的Topic,通过JDBC连接到Target Database,根据Topic中的信息,对表记录执行对应操作。

Postgres迁移到Postgres

  • 1.启动源数据库-Postgres

本次部署通过容器的方式启动:

  1. docker run -d --name source-postgres -p 15432:5432 -e POSTGRES_PASSWORD=123456 -e POSTGRES_USER=debe postgres:12.6
  • 2.注册Source Connecto

通过Debezium UI页面进行注册。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
需要注意的有以下几点:

Debezium Postgres类型的Source Connector支持的Postgres需要将wal_level修改为logical;修改Postgres中的Postgresql.conf文件中的配置(wal_level = logical)并重启Postgres;
Postgres需要支持解码插件,Debezium官方一共提供了两个解码插件:

Decoderbufs:Debezium默认配置,由Debezium维护;
Pgoutput:Postgres 10+版本自带;使用此插件时,需要配置plugin.name=pgoutput

  • 3.启动目标数据库-Postgre

    docker run -d —name target-postgres -p 25432:5432 -e POSTGRES_PASSWORD=123456 -e POSTGRES_USER=debe postgres:12.6

  • 4.注册Sink Connector

通过Connect提供的API进行注册

新增Connector

  1. curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d \
  2. '{
  3. "name": "sink-connector-postgres",
  4. "config": {
  5. "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  6. "tasks.max": "1",
  7. "topics": "postgres.public.test_source",
  8. "connection.url": "jdbc:postgresql://10.3.73.160:25432/postgres?user=debe&password=123456",
  9. "transforms": "unwrap",
  10. "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  11. "transforms.unwrap.drop.tombstones": "false",
  12. "auto.create": "true",
  13. "insert.mode": "upsert",
  14. "delete.enabled": "true",
  15. "pk.fields": "id",
  16. "pk.mode": "record_key"
  17. }
  18. }'
  • 5.验证数据迁移过程

源数据库中的表数据迁移到Kafka

新建表test_source和test_source1

test_source&test_source1.sql

  1. -- test_source
  2. create table if not exists public.test_source
  3. (
  4. id integer not null
  5. constraint test_source_pk
  6. primary key,
  7. name varchar(64)
  8. );
  9. alter table public.test_source
  10. owner to debe;
  11. insert into public.test_source (id, name) values (1, 'a');
  12. -- test_source1
  13. create table if not exists public.test_source1
  14. (
  15. id integer not null
  16. constraint test_source1_pk
  17. primary key,
  18. name varchar(64)
  19. );
  20. alter table public.test_source1
  21. owner to debe;
  22. insert into public.test_source1 (id, name) values (1, 'a1');

Kafka新建数据前 ⬇

在这里插入图片描述
Kafka新建数据后 ⬇

在这里插入图片描述
在这里插入图片描述

源数据库中新建表test_source和表test_source1后,Kafka中出现了两个Topic:

postgres.public.test_source和postgres.public.test_source1,与这两个表一一对应,topic中的message对应着对表中记录的操作(新增1条记录)。

监听的表可通过连接器配置进行过滤,比如配置”table.include.list”: “public.test_source”,就只会出现一个Topic:postgres.public.test_source

Kafka中的数据迁移到目标数据库

在这里插入图片描述
在这里插入图片描述

注册Sink Connector后,Kafka中会新增一个Customer,对postgres.public.test_source进行消费(sink connector配置中的”topics”: “postgres.public.test_source”指定);

对应的源数据库(sink connector配置中的”connection.url”: “jdbc:postgresql://10.3.73.160:25432/postgres?user=debe&password=123456”指定)会新增一个表public.test_source,该表中的数据和源数据库中的public.test_source始终保持同步。

MySQL迁移到PostgresSQL

  • 1.启动源数据库-mysql

本次部署通过docker启动:

  1. docker run -d --name source-mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:2.0
  • 2.注册Source Connector

启动MySQL数据源连接注册

注册MySQL数据源有两种方式:

1、在Debezium UI中直接添加
2、调用Kafka API 注册

在Debezium UI中直接添加
在这里插入图片描述
选择MySQL数据源

在这里插入图片描述
调用Kafka API注册

新增Connector

  1. curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d \
  2. '{
  3. "name": "inventory-connector",
  4. "config": {
  5. "connector.class": "io.debezium.connector.mysql.MySqlConnector",
  6. "tasks.max": "1",
  7. "topic.prefix": "dbserver1",
  8. "database.hostname": "mysql",
  9. "database.port": "3306",
  10. "database.user": "debezium", //数据库用户名
  11. "database.password": "dbz", //数据库密码
  12. "database.server.id": "184054",
  13. "database.include.list": "inventory", //数据源覆盖范围
  14. "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
  15. "schema.history.internal.kafka.topic": "schema-changes.inventory",
  16. "transforms": "route",
  17. "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
  18. "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
  19. "transforms.route.replacement": "$3"
  20. }
  21. }'

在这里插入图片描述

验证Source Connector注册结果

注册连接前:

在这里插入图片描述
注册连接后:

在这里插入图片描述
多出来的Topics信息是MySQL source表信息,连接MySQL数据库可见表:

在这里插入图片描述
在这里插入图片描述
UI for Apache Kafka中可以看到Messages同步信息。

在这里插入图片描述
访问Debezium UI(http://localhost:8080/ )可以看到MySQL的连接。

在这里插入图片描述

  • 3.启动目标数据库-Postgres

本次部署采用Docker方式启动:

  1. docker run -d --name target-postgres -p 5432:5432 -e POSTGRES_USER=postgresuser -e POSTGRES_PASSWORD=postgrespw -e POSTGRES_DB=inventory debezium/postgres:9.6
  • 4.注册Sink Connector (通过API接口)

新增Connector

  1. curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d \
  2. '{
  3. "name": "jdbc-sink",
  4. "config": {
  5. "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  6. "tasks.max": "1",
  7. "topics": "customers", //迁移目标主题(这里是按照表来订阅的)
  8. "connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
  9. "transforms": "unwrap",
  10. "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
  11. "transforms.unwrap.drop.tombstones": "false",
  12. "auto.create": "true",
  13. "insert.mode": "upsert",
  14. "delete.enabled": "true",
  15. "pk.fields": "id",
  16. "pk.mode": "record_key"
  17. }
  18. }'

在这里插入图片描述

注册PostgreSQL connector后,不会在Debezium中显示Connector client 信息,但可以在UI for Apache Kafka中看到:

在这里插入图片描述

  • 5.验证数据迁移过程

完成安装步骤后,以Customers表为例,做CUD操作语句,实现MySQL数据库同步数据到PostgreSQL 。

Mysql 数据库现有数据:

在这里插入图片描述
在这里插入图片描述
手动在MySQL数据库Customers表中添加一条数据 ⬇

customers.sql

  1. insert into customers(id,first_name,last_name,email) values(1005,'test','one','123456@qq.com');

在这里插入图片描述
在PostgreSQL数据库中Customers多出一条数据:

在这里插入图片描述

Kafka中Messages新增一条数据,完成数据同步:

在这里插入图片描述
可以看到消费如下信息:

topics-customers.json

  1. {
  2. "schema": {
  3. "type": "struct",
  4. "fields": [
  5. {
  6. "type": "struct",
  7. "fields": [
  8. {
  9. "type": "int32",
  10. "optional": false,
  11. "field": "id"
  12. },
  13. {
  14. "type": "string",
  15. "optional": false,
  16. "field": "first_name"
  17. },
  18. {
  19. "type": "string",
  20. "optional": false,
  21. "field": "last_name"
  22. },
  23. {
  24. "type": "string",
  25. "optional": false,
  26. "field": "email"
  27. }
  28. ],
  29. "optional": true,
  30. "name": "dbserver1.inventory.customers.Value",
  31. "field": "before"
  32. },
  33. {
  34. "type": "struct",
  35. "fields": [
  36. {
  37. "type": "int32",
  38. "optional": false,
  39. "field": "id"
  40. },
  41. {
  42. "type": "string",
  43. "optional": false,
  44. "field": "first_name"
  45. },
  46. {
  47. "type": "string",
  48. "optional": false,
  49. "field": "last_name"
  50. },
  51. {
  52. "type": "string",
  53. "optional": false,
  54. "field": "email"
  55. }
  56. ],
  57. "optional": true,
  58. "name": "dbserver1.inventory.customers.Value",
  59. "field": "after"
  60. },
  61. {
  62. "type": "struct",
  63. "fields": [
  64. {
  65. "type": "string",
  66. "optional": false,
  67. "field": "version"
  68. },
  69. {
  70. "type": "string",
  71. "optional": false,
  72. "field": "connector"
  73. },
  74. {
  75. "type": "string",
  76. "optional": false,
  77. "field": "name"
  78. },
  79. {
  80. "type": "int64",
  81. "optional": false,
  82. "field": "ts_ms"
  83. },
  84. {
  85. "type": "string",
  86. "optional": true,
  87. "name": "io.debezium.data.Enum",
  88. "version": 1,
  89. "parameters": {
  90. "allowed": "true,last,false,incremental"
  91. },
  92. "default": "false",
  93. "field": "snapshot"
  94. },
  95. {
  96. "type": "string",
  97. "optional": false,
  98. "field": "db"
  99. },
  100. {
  101. "type": "string",
  102. "optional": true,
  103. "field": "sequence"
  104. },
  105. {
  106. "type": "string",
  107. "optional": true,
  108. "field": "table"
  109. },
  110. {
  111. "type": "int64",
  112. "optional": false,
  113. "field": "server_id"
  114. },
  115. {
  116. "type": "string",
  117. "optional": true,
  118. "field": "gtid"
  119. },
  120. {
  121. "type": "string",
  122. "optional": false,
  123. "field": "file"
  124. },
  125. {
  126. "type": "int64",
  127. "optional": false,
  128. "field": "pos"
  129. },
  130. {
  131. "type": "int32",
  132. "optional": false,
  133. "field": "row"
  134. },
  135. {
  136. "type": "int64",
  137. "optional": true,
  138. "field": "thread"
  139. },
  140. {
  141. "type": "string",
  142. "optional": true,
  143. "field": "query"
  144. }
  145. ],
  146. "optional": false,
  147. "name": "io.debezium.connector.mysql.Source",
  148. "field": "source"
  149. },
  150. {
  151. "type": "string",
  152. "optional": false,
  153. "field": "op"
  154. },
  155. {
  156. "type": "int64",
  157. "optional": true,
  158. "field": "ts_ms"
  159. },
  160. {
  161. "type": "struct",
  162. "fields": [
  163. {
  164. "type": "string",
  165. "optional": false,
  166. "field": "id"
  167. },
  168. {
  169. "type": "int64",
  170. "optional": false,
  171. "field": "total_order"
  172. },
  173. {
  174. "type": "int64",
  175. "optional": false,
  176. "field": "data_collection_order"
  177. }
  178. ],
  179. "optional": true,
  180. "name": "event.block",
  181. "version": 1,
  182. "field": "transaction"
  183. }
  184. ],
  185. "optional": false,
  186. "name": "dbserver1.inventory.customers.Envelope",
  187. "version": 1
  188. },
  189. "payload": {
  190. "before": null,
  191. "after": {
  192. "id": 1005,
  193. "first_name": "test",
  194. "last_name": "one",
  195. "email": "123456@qq.com"
  196. },
  197. "source": {
  198. "version": "2.0.1.Final",
  199. "connector": "mysql",
  200. "name": "dbserver1",
  201. "ts_ms": 1672024796000,
  202. "snapshot": "false",
  203. "db": "inventory",
  204. "sequence": null,
  205. "table": "customers",
  206. "server_id": 223344,
  207. "gtid": null,
  208. "file": "mysql-bin.000003",
  209. "pos": 392,
  210. "row": 0,
  211. "thread": 16,
  212. "query": null
  213. },
  214. "op": "c",
  215. "ts_ms": 1672024796396,
  216. "transaction": null
  217. }
  218. }

重要的部分是 “payload” json 中信息:

  • source 中会展示“版本”,“数据源”等信息;
  • after 代表变动信息;
  • “op” 操作信息,例如“c” 代表创建;

需要注意的是,结果的json格式是Debezium定义好的格式。

Debezium json格式通常前面定义Schema信息,最后才是实际的载荷(payload)信息。

详细格式定义可以查看:https://debezium.io/documentation/reference/1.6/connectors/mysql.html

通过以上步骤,我们在Docker环境上使用Debezium实现了数据同步到kafaka。本期关于数据同步迁移的内容就到这里了,建议大家收藏学习!~

版权申明:文章由神州数码武汉云基地团队实践整理输出,转载请注明出处。
微信公众号后台回复“技术合集”,可获取更多干货内容!

发表评论

表情:
评论列表 (有 0 条评论,70人围观)

还没有评论,来说两句吧...

相关阅读