不知道你是否有遇到過需要解析binlog日誌的場景。今天我整理了一些對binlog解析的解決方案,供大家參考下。
基於Canal的實時監聽
一般業內對binlog進行實時監聽最常用的中介軟體會是canal。canal其實本質底層是制定了一個偽造的MySQL的Slave節點,接收MySQL主節點發送過來的binlog檔案。只需要我們引入相關的依賴,然後部署一套Canal服務即可。Canal的部署,可以採用單機或者結合zk做叢集架構。
相關的客戶端依賴如下:
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <canal.version>1.1.4</canal.version> </properties> <description>canal監聽案例</description> <dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>${canal.version}</version> </dependency> <!-- Message、CanalEntry.Entry等來自此安裝包 --> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.protocol</artifactId> <version>${canal.version}</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> </dependency> </dependencies>
然後具體的實現程式碼可以如下:
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; import java.util.List; /** * @author idea * @description 監聽canal的回撥輸出 */ public class CanalDemo { //Canal服務地址 使用自己虛擬機器的ip地址 private static final String SERVER_ADDRESS = "127.0.0.1"; //Canal Server 服務埠號 private static final Integer PORT = 11111; //目的地,其實Canal Service內部有一個佇列,和配置檔案中一致即可,參考【修改instance.properties】圖中 private static final String DESTINATION = "user_instance"; //使用者名稱和密碼,但是目前不支援,只能為空 private static final String USERNAME = "canal"; //使用者名稱和密碼,但是目前不支援,只能為空 private static final String PASSWORD = "canal"; public static void main(String[] args) { CanalConnector canalConnector = CanalConnectors.newSingleConnector( new InetSocketAddress(SERVER_ADDRESS, PORT), DESTINATION, USERNAME, PASSWORD); canalConnector.connect(); //訂閱所有訊息 canalConnector.subscribe(".*\..*"); // 只訂閱test資料庫下的所有表 //canalConnector.subscribe("test"); //恢復到之前同步的那個位置 canalConnector.rollback(); System.out.println("啟動canal"); for (; ; ) { //獲取指定數量的資料,但是不做確認標記,下一次取還會取到這些資訊。 注:不會阻塞,若不夠100,則有多少返回多少 Message message = canalConnector.getWithoutAck(100); //獲取訊息id long batchId = message.getId(); if (batchId != -1) { log.info("msgId -> " + batchId); printEnity(message.getEntries()); //提交確認 //canalConnector.ack(batchId); //處理失敗,回滾資料 //canalConnector.rollback(batchId); } } } private static void printEnity(List<CanalEntry.Entry> entries) { for (CanalEntry.Entry entry : entries) { if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) { continue; } try { // 序列化資料 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { System.out.println(rowChange.getEventType()); switch (rowChange.getEventType()) { //如果希望監聽多種事件,可以手動增加case case INSERT: // 表名 String tableName = entry.getHeader().getTableName(); //System.out.println("表名:"+tableName); //測試users表進行對映處 List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); //for(CanalEntry.Column c:afterColumnsList){ // System.out.println("欄位:"+c.getName()+"值:"+c.getValue()); //} System.out.println(afterColumnsList); break; case UPDATE: List<CanalEntry.Column> afterColumnsList2 = rowData.getAfterColumnsList(); System.out.println("新插入的資料是:" + afterColumnsList2); break; case DELETE: List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); System.out.println("被刪除的資料是:" + beforeColumnsList); break; case CREATE: System.out.println("建立表格"); case ALTER: System.out.println("alert變更"); case TRUNCATE: System.out.println("truncate變更"); default: } } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } } }
Canal可能存在的問題
在實際使用Canal的時候需要注意,它可能會存在以下問題。
對binlog的格式有一定要求
一般使用Canal的話,建議修改MySQL主節點的binlog為ROW格式,否則可能會引發很多隱藏問題。例如當MySQL的BinLog格式為mixed型別的時候,可能Canal會收到QUERY,UPDATE兩條型別的事件。
本質還是單機架構
由於Canal的部署其實由於要保證MySQL的binlog順序性,所以其實底層是有一個緩衝區用於接受binlog資料的,
Server: 整個canal例項 Instance: 你可以理解為一個特殊的佇列角色,一個Instance可以用於承接一系列相關的庫表變更資料內容。
在Instance的內部,還有幾個角色:
EventParser (資料來源接入,模擬slave協議和master進行互動,協議解析)
EventSink (Parser和Store連結器,進行資料過濾,加工,分發的工作)
EventStore (資料儲存)
MetaManager (增量訂閱&消費資訊管理器)
由於順序性的原因,Canal的一個Instance一次只能給一個消費者訪問(pull,ack)。因此如果該消費者的消費能力較弱,就會導致大量的資料堆積在Canal的記憶體中,高併發場景下可能會有OOM情況發生。所以Canal一般會和常用的MQ進行結合使用。例如內接入RocketMQ或者Kafka,當收到binlog後傳送到MQ中,讓MQ來承載這些壓力。(MQ天生就是適合抗高併發的)
但是要注意MQ的內部佇列也得設定成一個,並且消費端得單執行緒消費,保證資料的順序性。
meta.dat檔案的binlog位置對不上
canal監聽mysql的時候會在conf/{destination}/meta.dat檔案中記錄當前binlog的名字、position。
下次啟動canal的時候就從這個binlog的position開始讀取資料。
meta.dat記錄的是上一次的binlog資訊,當你刪除mysql的binlog或者監聽到另外一臺mysql後,meta.dat記錄的資訊就相當於過期資訊,所以就會出現PositionNotFoundException。
資料重複消費問題 這類問題一般出現在叢集架構下的canal例項切換,由於他們利用了Zookeeper作為第三方,實際上binlog的位點資訊是記錄在各個server例項本地的meta.dat檔案裡面的,這塊的資料需要透過TCP請求上報到Zk,這裏存在一個時延和網路的不穩定因素干擾。因此如果你的Canal備用節點切入的時候,有可能拿到的binlog位點是有一定滯後的。 面對這種場景,建議是消費端處理的時候,可以開一個“視窗”一樣的機制,防止資料重複注入。
mysqlbinlog直接解析binlog內容
上邊我們介紹的Canal更多是實時資料解析時候使用的,有時候可能你只需要對binlog檔案做一些簡單的檔案解析而已,這種情況就不需要用Canal這麼複雜的元件了。
mysql內建的mysqlbinlog工具就是一款不錯的命令工具,它內部支援對binlog的各種轉換。例如row格式的binlog直接給你轉成可讀性較強的sql。
mysqlbinlog的常用命令
基本格式
mysqlbinlog [*options*] *log_file* ...
base64-output,可以控制輸出語句輸出base64編碼的BINLOG語句;decode-rows:選項將把基於行的事件解碼成一個SQL語句
mysqlbinlog --base64-output=decode-rows /data/binlog.00001 > mysql-binlog-00001.sql
指定時間範圍去解析binlog
mysqlbinlog -v --base64-output=decode-rows --start-datetime="2021-4-20 00:00:00" --stop-datetime="2021-04-20 23:59:59" mysql-binlog.000001 > mysql-binlog-000001.sql
指定資料庫進行過濾
mysqlbinlog -v --base64-output=decode-rows --database=test --start-datetime="2021-4-20 00:00:00" --stop-datetime="2021-04-20 23:59:59" mysql-bin.000001 > mysql-binlog-000001.sql
使用base64-output格式輸出的內容格式如下:
/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/; /*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/; DELIMITER /*!*/; # at 1180 #210420 20:24:21 server id 1 end_log_pos 1237 CRC32 0x6d2e5f81 Table_map: `test`.`users` mapped to number 108 # at 1237 #210420 20:24:21 server id 1 end_log_pos 1313 CRC32 0x7ef28a01 Update_rows: table id 108 flags: STMT_END_F ### UPDATE `test`.`users` ### WHERE ### @1=4 ### @2='idea-0001' ### @3=17 ### @4=1618841040 ### @5=0 ### SET ### @1=4 ### @2='idea' ### @3=18 ### @4=1618841040 ### @5=1618921461 SET @@SESSION.GTID_NEXT= 'AUTOMATIC' /* added by mysqlbinlog */ /*!*/; DELIMITER ; # End of log file /*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/; /*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;
你會發現基本的SQL語句會有輸出,但是並不會展示具體的欄位名稱(我猜是爲了節省空間吧)。這裏你可以自行進行grep分析,就會得出相關的INSERT,UPDATE,DELETE的sql條數了,下邊是一份我自己測試用的Shell指令碼,供大家參考下:
cat ./mysql-binlog-000001.sql | grep 'UPDATE ' >update.sql cat ./mysql-binlog-000001 | grep 'INSERT ' >insert.sql cat ./mysql-binlog-000001 | grep 'DELETE ' >delete.sql
mysql-binlog-connector
mysql-binlog-connector是一款輕量級的jar包,可以用於自行解析binlog日誌內容。相關github倉庫地址: github.com/shyiko/mysq… 這款jar是一些個人開發者維護的jar包,可以用於實現類似Canal一樣的監聽binlog變更的效果。目前GitHub上的star數量也不少。
首先我們只需引入以下依賴:
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <binlog.version>0.21.0</binlog.version> <fastjson.version>2.0.25</fastjson.version> </properties> <dependencies> <dependency> <groupId>com.github.shyiko</groupId> <artifactId>mysql-binlog-connector-java</artifactId> <version>${binlog.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> </dependencies>
解析binlog日誌內容程式碼如下:
import com.github.shyiko.mysql.binlog.BinaryLogFileReader; import com.github.shyiko.mysql.binlog.event.Event; import com.github.shyiko.mysql.binlog.event.EventType; import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType; import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer; import java.io.File; import java.io.IOException; /** * @author idea * @description */ public class BinlogMain { private static String sourceFilePath = "本地binlog檔案的地址"; public static void main(String[] args) throws IOException { String filePath = sourceFilePath; File binlogFile = new File(filePath); EventDeserializer eventDeserializer = new EventDeserializer(); eventDeserializer.setChecksumType(ChecksumType.CRC32); BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer); try { for (Event event; (event = reader.readEvent()) != null; ) { EventType eventType = event.getHeader().getEventType(); if (EventType.QUERY.equals(eventType)) { System.out.println(event); } else if (EventType.UPDATE_ROWS.equals(eventType)) { System.out.println(event); } else if (EventType.TABLE_MAP.equals(eventType)) { //每個rows_event事件的前後都會有個binlog記錄,用於描述表內部的id和變化 System.out.println(event); } else if (EventType.isUpdate(eventType)) { System.out.println(event); } else if (EventType.isWrite(eventType)) { System.out.println(event); } else if (EventType.isDelete(eventType)) { System.out.println(event); } else if (EventType.isUpdate(eventType)) { System.out.println(event); } } } finally { reader.close(); } } }
除了支援解析已有的binlog日誌外,使用該元件還支援偽造成slave,實時監聽binlog變更的效果。但是它在數據處理,穩定性等各方面均不如阿里巴巴的Canal要好,所以適合簡單場景處理。 如果你需要做實時監聽,可以參考這篇文章:blog.csdn.net/m0_37583655…