Kafka Kafka-Connect Debezium 目标:同步Mysql
Kafka Kafka-Connect Debezium 目标:同步Mysql
- 现阶段实现到 通过以上插件可以实现检测到Mysql 把更改信息通过 Connect 写入到 Kafka的topic中一下是对现阶段的记录。由于需要多个包,采用docker来进行
首先安装docker
- yum install docker
- systemctl start docker
下载镜像 debezium/zookeeper:0.9
- docker pull debezium/zookeeper:0.9
- 运行zookeeper docker run -it —rm —name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.9
下载镜像debezium/kafka:0.9
- docker pull debezium/kafka:0.9
- docker run -it —rm —name kafka -p 9092:9092 —link zookeeper:zookeeper debezium/kafka:0.9
安装mysql
- docker run -it —rm —name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9
Mysql-client
- docker run -it —rm —name mysqlterm —link mysql —rm mysql:5.7 sh -c ‘exec mysql -h” M Y S Q L P O R T 3 30 6 T C P A D D R “ − P “ MYSQL_PORT_3306_TCP_ADDR” -P” MYSQLPORT3306TCPADDR”−P”MYSQL_PORT_3306_TCP_PORT” -uroot -p”$MYSQL_ENV_MYSQL_ROOT_PASSWORD”’
- use inventory;
- show tables;
- SELECT * FROM customers;
Kafka connect
- docker run -it —rm —name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses —link zookeeper:zookeeper —link kafka:kafka —link mysql:mysql debezium/connect:0.9
- 检查
1.$ curl -H “Accept:application/json” localhost:8083/
2.$ curl -H “Accept:application/json” localhost:8083/connectors/
监视MySQL数据库
- curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” localhost:8083/connectors/ -d ‘{ “name”: “inventory-connector”, “config”: { “connector.class”: “io.debezium.connector.mysql.MySqlConnector”, “tasks.max”: “1”, “database.hostname”: “mysql”, “database.port”: “3306”, “database.user”: “debezium”, “database.password”: “dbz”, “database.server.id”: “184054”, “database.server.name”: “dbserver1”, “database.whitelist”: “inventory”, “database.history.kafka.bootstrap.servers”: “kafka:9092”, “database.history.kafka.topic”: “dbhistory.inventory” } }’
2.查看创建的connectors curl -H “Accept:application/json” localhost:8083/connectors/
- curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” localhost:8083/connectors/ -d ‘{ “name”: “inventory-connector”, “config”: { “connector.class”: “io.debezium.connector.mysql.MySqlConnector”, “tasks.max”: “1”, “database.hostname”: “mysql”, “database.port”: “3306”, “database.user”: “debezium”, “database.password”: “dbz”, “database.server.id”: “184054”, “database.server.name”: “dbserver1”, “database.whitelist”: “inventory”, “database.history.kafka.bootstrap.servers”: “kafka:9092”, “database.history.kafka.topic”: “dbhistory.inventory” } }’
- 查看变更事件
1.进入 kafka中的 查询topic 会发现多了一个dbhistory.inventory节点
2.创建dbhistory.inventory topic的节点消费者 变更数据
- UPDATE customers SET first_name=‘Anne Marie’ WHERE id=1004;
- 观察消费者topic节点发现记录下mysql的更改
以上实现了检测数据库的变更并把变更信息写入到kafka中,为实现以及现有的思路
- 现在已将mysql中的数据变更记录在kafka中现在只能检测到变更后然后记录到kafka中接下来大目标是实现mysql的增量同步思路是通过检测到的mysql信息借助kafka—connect实现数据库的增量同步
- 以上的思路是增量同步思路但是通过思考感觉全量同步通过记录变更不太现实,感觉需要通过debezium 来进行读取分析binlog 日志 通过这种方式可以实现全量同步
还没有评论,来说两句吧...