CDC系列(二)、Maxwell_v1.27.1 监控MySQL操作日志实时同步到Kafka

Myth丶恋晨 2022-12-09 14:54 369阅读 0赞

目录

前言

安装

配置

使用Maxwell监控MySQL操作并写入Kafka

尾巴


CDC系列:

CDC系列(一)、Canal 集群部署及使用(带WebUI)


前言

在上一篇我们介绍了CDC工具,以及Canal的集群安装和使用,本篇我们来讲解另一个CDC工具:Maxwell。和Canal一样,Maxwell也是将自己伪装成MySQL的slave节点,通过监控MySQL的binlog来将数据操作日志同步到kafka等消息队列中供异构数据源使用。

本篇我们会介绍Maxwell的安装和使用。

安装

和Canal一样,一定要至少准备一个MySQL库用于Maxwell的管理库存放状态信息以及用来监控的MySQL库,监控到的binlog导出到kafka,因此也需要准备kafka 和zookeeper ,Maxwell也是java开发的因此需要JDK1.8,这些在之前的文章中都有介绍安装方式,因此本文不再赘述。

下载Maxwell_v1.27.1: https://github.com/zendesk/maxwell/releases/download/v1.27.1/maxwell-1.27.1.tar.gz

解压下载的tar包:

  1. tar -zxvf maxwell-1.27.1.tar.gz -C /opt/app/

配置

和Canal一样,被监控的MySQL一定要开启Binlog,具体步骤可参考上一篇内容:MySQL开启Binlog。

其次在Maxwell的管理数据库上创建用户maxwell并授权,由于这里演示的被监控的mysql和maxwell在同一个mysql实例上,因此这里直接使用maxwell用户来监控mysql binlog,在实际生产应用中一般是将这两个库分开的,在下面的config_csdn.properties 配置中也可以体现出来。

  1. -- 创建maxwell管理库的连接账户
  2. CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell_pwd';
  3. GRANT ALL ON maxwell.* TO 'maxwell'@'%';
  4. GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

修改配置文件,详细选项可参考:http://maxwells-daemon.io/config/

  1. cd /opt/app/maxwell-1.27.1
  2. cp config.properties.example config_csdn.properties
  3. vim config_csdn.properties
  4. # 修改下面这些配置,其他配置选项可以默认或按需配置,可参考:http://maxwells-daemon.io/config/
  5. -------------------------------------------
  6. log_level=info
  7. # 该ID不能与mysql my.cnf中的一样,也不能与其他监控同一个库的CDC中配置id一样
  8. replica_server_id=9
  9. # 将监控的log导出到kafka
  10. producer=kafka
  11. kafka.bootstrap.servers=wyk01:9092,wyk02:9092,wyk03:9092
  12. # 增删改操作和ddl操作分别写到下面两个topic内
  13. kafka_topic=wyk_csdn_maxwell
  14. ddl_kafka_topic=wyk_csdn_maxwell
  15. #kinesis_stream=maxwell
  16. #include_dbs=test_csdn
  17. kafka_version=0.11.0
  18. kafka.compression.type=snappy
  19. kafka.retries=0
  20. kafka.acks=-1
  21. # 配置maxwell的管理库连接
  22. # mysql login info
  23. host=wyk01
  24. port=3306
  25. user=maxwell
  26. password=maxwell_pwd
  27. schema_database=maxwell
  28. jdbc_options=autoReconnect=true
  29. # 配置监控binlog的mysql库连接信息,如果没配置此内容的话,默认被监控的mysql和maxwell管理库是同一个mysql,即上面配置的内容。 这里被监控的mysql的连接用户需要有该mysql的binlog复制权限
  30. # maxwell can optionally replicate from a different server than where it stores
  31. # schema and binlog position info. Specify that different server here:
  32. replication_host=wyk01
  33. replication_port=3306
  34. replication_user=maxwell
  35. replication_password=maxwell_pwd
  36. # 对log的过滤条件, include/exclude,具体请参考:http://maxwells-daemon.io/filtering
  37. # 下面表示只监控test_csdn库下的所有表,不监控maxwell库的变更操作
  38. filter=include: test_csdn.*,exclude: maxwell.*

使用Maxwell监控MySQL操作并写入Kafka

上面的配置文件写完之后我们就可以直接启动maxwell监控mysql的操作,然后将监控的操作日志写入到kafka了,相关命令如下:

  1. # 启动maxwell进程
  2. bin/maxwell --config config_csdn.properties
  3. # 后台启动maxwell进程
  4. bin/maxwell --config config_csdn.properties --daemon
  5. # 开启一个kafka的消费端验证效果
  6. bin/kafka-console-consumer.sh --bootstrap-server wyk01:9092,wyk02:9092,wyk03:9092 --topic wyk_csdn_maxwell
  7. # 全量推送指定表(test_csdn.wyk_csdn)的数据到kafka内
  8. bin/maxwell-bootstrap --config config_csdn.properties --database test_csdn --table wyk_csdn --client_id=csdn_full --bootstrapper=sync

注意:

1. 当启动多个Maxwell,需要为每个实例配置不同的client_id 字符串型(例如上面的第4个命令),以存储不同的binlog位点,如:

bin/maxwell-bootstrap —config config_csdn.properties —database test_csdn —table wyk_csdn --client_id=csdn_full --bootstrapper=sync

2. Maxwell支持对指定的表全量同步,当参数—bootstrapper=sync 时,在处理bootstrap时,会阻塞正常的binlog解析,当参数—bootstrapper=async时,不会阻塞。

触发全量同步也可以直接在maxwell库下插入一条记录:

insert into maxwell.bootstrap (database_name, table_name) values (‘fooDB’, ‘barTable’);

或指定client_id:

insert into maxwell.bootstrap (database_name, table_name, client_id) values (‘fooDB’, ‘barTable’, ‘custom_maxwell_client_id’);

当全量同步时,消息结构以bootstrap-start 声明开始,然后是bootstrap-insert 全量同步数据,最后以bootstrap-complete 结尾。

若在全量同步过程中出现故障,下次重新启动时会从头重新开始,若重启遇到问题,可以到maxwell管理库将 bootstrap表的is_completed字段更新为true即可。

更加详细的Maxwell Bootstrap请参考:http://maxwells-daemon.io/bootstrapping/

启动maxwell及kafka消费者之后,在mysql中执行下面的命令,验证是否会被maxwell发送到kafka内:

  1. create database test_csdn;
  2. use test_csdn;
  3. -- 建表
  4. create table wyk_csdn(id int,name varchar(20),ins_ts timestamp);
  5. -- 插入2条数据
  6. insert into wyk_csdn values(1,'wyk1',current_timestamp());
  7. insert into wyk_csdn values(2,'wyk2',current_timestamp());
  8. -- 更新id=2的数据
  9. update wyk_csdn set name='wyk2_new' where id=2;
  10. -- 删除id=1的数据
  11. delete from wyk_csdn where id=1;
  12. -- 清空表
  13. truncate table wyk_csdn;
  14. -- 删除表
  15. drop table wyk_csdn;
  16. -- 修改表结构
  17. create table wyk_csdn2(id int,name varchar(20),ins_ts timestamp);
  18. alter table wyk_csdn2 add column(add_c text);
  19. -- 修改表名
  20. rename table wyk_csdn2 to wyk_csdn2_new;
  21. drop table wyk_csdn2_new;

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dzZGMwNTIx_size_16_color_FFFFFF_t_70

20200922181549716.png

Maxwell监控binlog发送的json消息格式:

  1. {
  2. "type": "table-create",
  3. "database": "test_csdn",
  4. "table": "wyk_csdn",
  5. "def": {
  6. "database": "test_csdn",
  7. "charset": "latin1",
  8. "table": "wyk_csdn",
  9. "columns": [
  10. {
  11. "type": "int",
  12. "name": "id",
  13. "signed": true
  14. },
  15. {
  16. "type": "varchar",
  17. "name": "name",
  18. "charset": "latin1"
  19. },
  20. {
  21. "type": "timestamp",
  22. "name": "ins_ts",
  23. "column-length": 0
  24. }
  25. ],
  26. "primary-key": []
  27. },
  28. "ts": 1600769161000,
  29. "sql": "create table wyk_csdn(id int,name varchar(20),ins_ts timestamp)"
  30. }
  31. {
  32. "database": "test_csdn",
  33. "table": "wyk_csdn",
  34. "type": "insert",
  35. "ts": 1600769172,
  36. "xid": 74698,
  37. "commit": true,
  38. "data": {
  39. "id": 1,
  40. "name": "wyk1",
  41. "ins_ts": "2020-09-22 10:06:12"
  42. }
  43. }
  44. {
  45. "database": "test_csdn",
  46. "table": "wyk_csdn",
  47. "type": "insert",
  48. "ts": 1600769175,
  49. "xid": 74713,
  50. "commit": true,
  51. "data": {
  52. "id": 2,
  53. "name": "wyk2",
  54. "ins_ts": "2020-09-22 10:06:15"
  55. }
  56. }
  57. {
  58. "database": "test_csdn",
  59. "table": "wyk_csdn",
  60. "type": "update",
  61. "ts": 1600769181,
  62. "xid": 74739,
  63. "commit": true,
  64. "data": {
  65. "id": 2,
  66. "name": "wyk2_new",
  67. "ins_ts": "2020-09-22 10:06:21"
  68. },
  69. "old": {
  70. "name": "wyk2",
  71. "ins_ts": "2020-09-22 10:06:15"
  72. }
  73. }
  74. {
  75. "database": "test_csdn",
  76. "table": "wyk_csdn",
  77. "type": "delete",
  78. "ts": 1600769184,
  79. "xid": 74756,
  80. "commit": true,
  81. "data": {
  82. "id": 1,
  83. "name": "wyk1",
  84. "ins_ts": "2020-09-22 10:06:12"
  85. }
  86. }
  87. {
  88. "type": "table-drop",
  89. "database": "test_csdn",
  90. "table": "wyk_csdn",
  91. "ts": 1600769190000,
  92. "sql": "DROP TABLE `wyk_csdn` /* generated by server */"
  93. }
  94. {
  95. "type": "table-create",
  96. "database": "test_csdn",
  97. "table": "wyk_csdn2",
  98. "def": {
  99. "database": "test_csdn",
  100. "charset": "latin1",
  101. "table": "wyk_csdn2",
  102. "columns": [
  103. {
  104. "type": "int",
  105. "name": "id",
  106. "signed": true
  107. },
  108. {
  109. "type": "varchar",
  110. "name": "name",
  111. "charset": "latin1"
  112. },
  113. {
  114. "type": "timestamp",
  115. "name": "ins_ts",
  116. "column-length": 0
  117. }
  118. ],
  119. "primary-key": []
  120. },
  121. "ts": 1600769193000,
  122. "sql": "create table wyk_csdn2(id int,name varchar(20),ins_ts timestamp)"
  123. }
  124. {
  125. "type": "table-alter",
  126. "database": "test_csdn",
  127. "table": "wyk_csdn2",
  128. "old": {
  129. "database": "test_csdn",
  130. "charset": "latin1",
  131. "table": "wyk_csdn2",
  132. "columns": [
  133. {
  134. "type": "int",
  135. "name": "id",
  136. "signed": true
  137. },
  138. {
  139. "type": "varchar",
  140. "name": "name",
  141. "charset": "latin1"
  142. },
  143. {
  144. "type": "timestamp",
  145. "name": "ins_ts",
  146. "column-length": 0
  147. }
  148. ],
  149. "primary-key": []
  150. },
  151. "def": {
  152. "database": "test_csdn",
  153. "charset": "latin1",
  154. "table": "wyk_csdn2",
  155. "columns": [
  156. {
  157. "type": "int",
  158. "name": "id",
  159. "signed": true
  160. },
  161. {
  162. "type": "varchar",
  163. "name": "name",
  164. "charset": "latin1"
  165. },
  166. {
  167. "type": "timestamp",
  168. "name": "ins_ts",
  169. "column-length": 0
  170. },
  171. {
  172. "type": "text",
  173. "name": "add_c",
  174. "charset": "latin1"
  175. }
  176. ],
  177. "primary-key": []
  178. },
  179. "ts": 1600769196000,
  180. "sql": "alter table wyk_csdn2 add column(add_c text)"
  181. }
  182. {
  183. "type": "table-alter",
  184. "database": "test_csdn",
  185. "table": "wyk_csdn2",
  186. "old": {
  187. "database": "test_csdn",
  188. "charset": "latin1",
  189. "table": "wyk_csdn2",
  190. "columns": [
  191. {
  192. "type": "int",
  193. "name": "id",
  194. "signed": true
  195. },
  196. {
  197. "type": "varchar",
  198. "name": "name",
  199. "charset": "latin1"
  200. },
  201. {
  202. "type": "timestamp",
  203. "name": "ins_ts",
  204. "column-length": 0
  205. },
  206. {
  207. "type": "text",
  208. "name": "add_c",
  209. "charset": "latin1"
  210. }
  211. ],
  212. "primary-key": []
  213. },
  214. "def": {
  215. "database": "test_csdn",
  216. "charset": "latin1",
  217. "table": "wyk_csdn2_new",
  218. "columns": [
  219. {
  220. "type": "int",
  221. "name": "id",
  222. "signed": true
  223. },
  224. {
  225. "type": "varchar",
  226. "name": "name",
  227. "charset": "latin1"
  228. },
  229. {
  230. "type": "timestamp",
  231. "name": "ins_ts",
  232. "column-length": 0
  233. },
  234. {
  235. "type": "text",
  236. "name": "add_c",
  237. "charset": "latin1"
  238. }
  239. ],
  240. "primary-key": []
  241. },
  242. "ts": 1600769199000,
  243. "sql": "rename table wyk_csdn2 to wyk_csdn2_new"
  244. }
  245. {
  246. "type": "table-drop",
  247. "database": "test_csdn",
  248. "table": "wyk_csdn2_new",
  249. "ts": 1600769202000,
  250. "sql": "DROP TABLE `wyk_csdn2_new` /* generated by server */"
  251. }

Maxwell bootstrap全量同步表数据的json消息:

  1. {
  2. "database": "test_csdn",
  3. "table": "wyk_csdn",
  4. "type": "bootstrap-start",
  5. "ts": 1600769648,
  6. "data": {}
  7. }
  8. {
  9. "database": "test_csdn",
  10. "table": "wyk_csdn",
  11. "type": "bootstrap-insert",
  12. "ts": 1600769648,
  13. "data": {
  14. "id": 1,
  15. "name": "wyk1",
  16. "ins_ts": "2020-09-23 07:13:02"
  17. }
  18. }
  19. {
  20. "database": "test_csdn",
  21. "table": "wyk_csdn",
  22. "type": "bootstrap-insert",
  23. "ts": 1600769648,
  24. "data": {
  25. "id": 2,
  26. "name": "wyk2",
  27. "ins_ts": "2020-09-23 07:13:02"
  28. }
  29. }
  30. {
  31. "database": "test_csdn",
  32. "table": "wyk_csdn",
  33. "type": "bootstrap-complete",
  34. "ts": 1600769648,
  35. "data": {}
  36. }

尾巴

通过上面的使用,我们可以看出,maxwell能够监控到数据库的alter table&view /create table&view/insert/update/delete/truncate/drop/rename等动作,但并不能监控到truncate动作,相比于Canal是个劣势,但相比于canal,maxwell bootstrap支持指定表进行全量同步,在做初始化切换以及重刷全量数据的时候可以保证不会有数据丢失的风险,而canal对于此场景的处理会麻烦许多(通过canal adapter的etl功能实现)。

希望本文对你有帮助,请点个赞鼓励一下作者吧~ 谢谢!

发表评论

表情:
评论列表 (有 0 条评论,369人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Flink CDC 同步mysql数据

    前言 在实际开发中,需要做数据同步的场景是非常多的,比如不同的应用之间不想直接通过RPC的方式进行数据交互,或者说下游应用需要检测来自上游应用的某些业务指标数据的变化时,