基于canal的实时数据同步
适用场景
使用canal做数据备份而不用mysql自带的主从备份的场景主要为:
- 跨数据库的数据备份,例如mysql => oracle
- 数据异构,即对同一份数据做不同的分库分表查询。例如卖家和买家各自分库索引
maven
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.2</version>
</dependency>
java
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
import org.apache.commons.lang.StringUtils;
public class SimpleCanalClient {
public static void main(String[] args) throws Exception {
String destination = "example";
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111), destination, "", "");
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int batchSize = 5 * 1024;
while (true) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// }
} else {
synchronizedData(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
}
/** * 同步数据 * @param entries * @throws Exception */
private static void synchronizedData(List<Entry> entries) throws Exception {
for (Entry entry : entries) {
if (entry.getEntryType() != EntryType.ROWDATA) {
continue;
}
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
String tableName = entry.getHeader().getTableName();
for (RowData rowData : rowChange.getRowDatasList()) {
String sql = getSql(rowChange.getEventType(),tableName,rowData);
System.out.println(sql);
// TODO 执行sql语句
}
}
}
/** * 获取增删改的sql * @param eventType * @param tableName * @param rowData * @return */
private static String getSql(CanalEntry.EventType eventType,String tableName,RowData rowData){
String sql = null;
switch (eventType) {
case INSERT:
sql = getInsertSql(tableName,rowData.getAfterColumnsList());
break;
case UPDATE:
sql = getUpdateSql(tableName,rowData.getAfterColumnsList());
break;
case DELETE:
sql = getDeleteSql(tableName,rowData.getBeforeColumnsList());
break;
default:
break;
}
return sql;
}
private static String getInsertSql(String tableName,List<Column> columns){
if(columns.size() == 0 || StringUtils.isBlank(tableName)){
return null;
}
String keys = "";
String values = "";
for(int i=0;i<columns.size();i++){
if(i != 0) {
keys += ",";
values += ",";
}
keys += columns.get(i).getName();
values += getValue(columns.get(i));
}
String format = "INSERT INTO %s (%s) VALUES (%s)";
return String.format(format,tableName,keys,values);
}
private static String getUpdateSql(String tableName,List<Column> columns){
if(columns.size() == 0 || StringUtils.isBlank(tableName)){
return null;
}
String sets = "";
String where = "";
for(Column column : columns){
if(column.getIsKey()){
where = column.getName() + "=" + getValue(column);
continue;
}
if(!StringUtils.isBlank(sets)) {
sets += ",";
}
sets += column.getName() + "=" + getValue(column);
}
String format = "UPDATE %s SET %s WHERE %s";
return String.format(format,tableName,sets,where);
}
private static String getDeleteSql(String tableName,List<Column> columns){
if(columns.size() == 0 || StringUtils.isBlank(tableName)){
return null;
}
String where = "";
for(Column column : columns){
if(column.getIsKey()){
where = column.getName() + "=" + getValue(column);
continue;
}
}
String format = "DELETE FROM %s WHERE %s";
return String.format(format,tableName,where);
}
private static String getValue(Column column){
if(column.getIsNull()){
return "null";
}
return String.format("'%s'",column.getValue());
}
}
数据一致性
单机单点消费mysql的log-bin后直接更新到备份数据库中,数据一致性没有问题。但是如果变成分布式环境以及消费mysql的log-bin后将更新数据推到MQ中由多节点消费更新到多个备份数据库中,则会出现数据更新时序和数据一致性的问题。
而以上代码在update sql中除了获取值变化了的字段,也反查数据库获取了未变化的字段。因此每次update的sql实际上是该条记录的全量数据。
通过在表中加上时间戳字段作为记录的版本号,用逻辑删除取代物理删除delete,修改以上代码的sql拼接,insert操作时忽略主键冲突、update操作时仅更新版本号(时间戳)旧的记录,可以极大避免数据不一致的现象,也解决了MQ重复消费的问题。
`last_update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
再通过定时任务,每天一次增量数据更新,每周一次全量数据更新,保证数据的最终一致性。
还没有评论,来说两句吧...