切换语言为:繁体
整理三种高效解析 MySQL BinLog 数据的方案

整理三种高效解析 MySQL BinLog 数据的方案

  • 爱糖宝
  • 2024-06-30
  • 2078
  • 0
  • 0

不知道你是否有遇到过需要解析binlog日志的场景。今天我整理了一些对binlog解析的解决方案,供大家参考下。

基于Canal的实时监听

一般业内对binlog进行实时监听最常用的中间件会是canal。canal其实本质底层是制定了一个伪造的MySQL的Slave节点,接收MySQL主节点发送过来的binlog文件。只需要我们引入相关的依赖,然后部署一套Canal服务即可。Canal的部署,可以采用单机或者结合zk做集群架构。

整理三种高效解析 MySQL BinLog 数据的方案

相关的客户端依赖如下:

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <canal.version>1.1.4</canal.version>
</properties>

<description>canal监听案例</description>

<dependencies>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>${canal.version}</version>
    </dependency>
    <!-- Message、CanalEntry.Entry等来自此安装包 -->
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.protocol</artifactId>
        <version>${canal.version}</version>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.20</version>
    </dependency>
</dependencies>


然后具体的实现代码可以如下:

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * @author idea
 * @description 监听canal的回调输出
 */

public class CanalDemo {

    //Canal服务地址 使用自己虚拟机的ip地址
    private static final String SERVER_ADDRESS = "127.0.0.1";

    //Canal Server 服务端口号
    private static final Integer PORT = 11111;

    //目的地,其实Canal Service内部有一个队列,和配置文件中一致即可,参考【修改instance.properties】图中
    private static final String DESTINATION = "user_instance";

    //用户名和密码,但是目前不支持,只能为空
    private static final String USERNAME = "canal";

    //用户名和密码,但是目前不支持,只能为空
    private static final String PASSWORD = "canal";

    public static void main(String[] args) {
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(SERVER_ADDRESS, PORT), DESTINATION, USERNAME, PASSWORD);
        canalConnector.connect();
        //订阅所有消息
        canalConnector.subscribe(".*\..*");
        // 只订阅test数据库下的所有表
        //canalConnector.subscribe("test");
        //恢复到之前同步的那个位置
        canalConnector.rollback();
        System.out.println("启动canal");
        for (; ; ) {
            //获取指定数量的数据,但是不做确认标记,下一次取还会取到这些信息。 注:不会阻塞,若不够100,则有多少返回多少
            Message message = canalConnector.getWithoutAck(100);
            //获取消息id
            long batchId = message.getId();
            if (batchId != -1) {
                log.info("msgId -> " + batchId);
                printEnity(message.getEntries());
                //提交确认
                //canalConnector.ack(batchId);
                //处理失败,回滚数据
                //canalConnector.rollback(batchId);
            }
        }
    }

    private static void printEnity(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
                continue;
            }
            try {
                // 序列化数据
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    System.out.println(rowChange.getEventType());
                    switch (rowChange.getEventType()) {
                        //如果希望监听多种事件,可以手动增加case
                        case INSERT:
                            // 表名
                            String tableName = entry.getHeader().getTableName();
                            //System.out.println("表名:"+tableName);
                            //测试users表进行映射处
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            //for(CanalEntry.Column c:afterColumnsList){
                            // System.out.println("字段:"+c.getName()+"值:"+c.getValue());
                            //}

                            System.out.println(afterColumnsList);
                            break;
                        case UPDATE:
                            List<CanalEntry.Column> afterColumnsList2 = rowData.getAfterColumnsList();
                            System.out.println("新插入的数据是:" + afterColumnsList2);
                            break;
                        case DELETE:
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            System.out.println("被删除的数据是:" + beforeColumnsList);
                            break;
                        case CREATE:
                            System.out.println("创建表格");
                        case ALTER:
                            System.out.println("alert变更");
                        case TRUNCATE:
                            System.out.println("truncate变更");
                        default:
                    }
                }
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
        }
    }
}


Canal可能存在的问题

在实际使用Canal的时候需要注意,它可能会存在以下问题。

  • 对binlog的格式有一定要求

一般使用Canal的话,建议修改MySQL主节点的binlog为ROW格式,否则可能会引发很多隐藏问题。例如当MySQL的BinLog格式为mixed类型的时候,可能Canal会收到QUERY,UPDATE两条类型的事件。

  • 本质还是单机架构

由于Canal的部署其实由于要保证MySQL的binlog顺序性,所以其实底层是有一个缓冲区用于接受binlog数据的,

整理三种高效解析 MySQL BinLog 数据的方案

Server: 整个canal实例 Instance: 你可以理解为一个特殊的队列角色,一个Instance可以用于承接一系列相关的库表变更数据内容。

在Instance的内部,还有几个角色:

EventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
EventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
EventStore (数据存储)
MetaManager (增量订阅&消费信息管理器)

由于顺序性的原因,Canal的一个Instance一次只能给一个消费者访问(pull,ack)。因此如果该消费者的消费能力较弱,就会导致大量的数据堆积在Canal的内存中,高并发场景下可能会有OOM情况发生。所以Canal一般会和常用的MQ进行结合使用。例如内接入RocketMQ或者Kafka,当收到binlog后发送到MQ中,让MQ来承载这些压力。(MQ天生就是适合抗高并发的)

但是要注意MQ的内部队列也得设置成一个,并且消费端得单线程消费,保证数据的顺序性。

  • meta.dat文件的binlog位置对不上

canal监听mysql的时候会在conf/{destination}/meta.dat文件中记录当前binlog的名字、position。

下次启动canal的时候就从这个binlog的position开始读取数据。

meta.dat记录的是上一次的binlog信息,当你删除mysql的binlog或者监听到另外一台mysql后,meta.dat记录的信息就相当于过期信息,所以就会出现PositionNotFoundException。

  • 数据重复消费问题 这类问题一般出现在集群架构下的canal实例切换,由于他们利用了Zookeeper作为第三方,实际上binlog的位点信息是记录在各个server实例本地的meta.dat文件里面的,这块的数据需要通过TCP请求上报到Zk,这里存在一个时延和网络的不稳定因素干扰。因此如果你的Canal备用节点切入的时候,有可能拿到的binlog位点是有一定滞后的。 面对这种场景,建议是消费端处理的时候,可以开一个“窗口”一样的机制,防止数据重复注入。

mysqlbinlog直接解析binlog内容

上边我们介绍的Canal更多是实时数据解析时候使用的,有时候可能你只需要对binlog文件做一些简单的文件解析而已,这种情况就不需要用Canal这么复杂的组件了。

mysql内置的mysqlbinlog工具就是一款不错的命令工具,它内部支持对binlog的各种转换。例如row格式的binlog直接给你转成可读性较强的sql。

dev.mysql.com/doc/refman/…

mysqlbinlog的常用命令

  • 基本格式

mysqlbinlog [*options*] *log_file* ...


  • base64-output,可以控制输出语句输出base64编码的BINLOG语句;decode-rows:选项将把基于行的事件解码成一个SQL语句

mysqlbinlog --base64-output=decode-rows  /data/binlog.00001 > mysql-binlog-00001.sql


  • 指定时间范围去解析binlog

mysqlbinlog -v --base64-output=decode-rows --start-datetime="2021-4-20 00:00:00" --stop-datetime="2021-04-20 23:59:59" mysql-binlog.000001 > mysql-binlog-000001.sql


  • 指定数据库进行过滤

mysqlbinlog -v --base64-output=decode-rows --database=test --start-datetime="2021-4-20 00:00:00" --stop-datetime="2021-04-20 23:59:59" mysql-bin.000001 > mysql-binlog-000001.sql


使用base64-output格式输出的内容格式如下:

/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/;
/*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/;
DELIMITER /*!*/;
# at 1180
#210420 20:24:21 server id 1  end_log_pos 1237 CRC32 0x6d2e5f81 	Table_map: `test`.`users` mapped to number 108
# at 1237
#210420 20:24:21 server id 1  end_log_pos 1313 CRC32 0x7ef28a01 	Update_rows: table id 108 flags: STMT_END_F
### UPDATE `test`.`users`
### WHERE
###   @1=4
###   @2='idea-0001'
###   @3=17
###   @4=1618841040
###   @5=0
### SET
###   @1=4
###   @2='idea'
###   @3=18
###   @4=1618841040
###   @5=1618921461
SET @@SESSION.GTID_NEXT= 'AUTOMATIC' /* added by mysqlbinlog */ /*!*/;
DELIMITER ;
# End of log file
/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/;
/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;


你会发现基本的SQL语句会有输出,但是并不会展示具体的字段名称(我猜是为了节省空间吧)。这里你可以自行进行grep分析,就会得出相关的INSERT,UPDATE,DELETE的sql条数了,下边是一份我自己测试用的Shell脚本,供大家参考下:

cat ./mysql-binlog-000001.sql | grep 'UPDATE ' >update.sql
cat ./mysql-binlog-000001 | grep 'INSERT ' >insert.sql
cat ./mysql-binlog-000001 | grep 'DELETE ' >delete.sql


mysql-binlog-connector

mysql-binlog-connector是一款轻量级的jar包,可以用于自行解析binlog日志内容。相关github仓库地址: github.com/shyiko/mysq… 这款jar是一些个人开发者维护的jar包,可以用于实现类似Canal一样的监听binlog变更的效果。目前GitHub上的star数量也不少。

首先我们只需引入以下依赖:

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <binlog.version>0.21.0</binlog.version>
    <fastjson.version>2.0.25</fastjson.version>
</properties>

<dependencies>
    <dependency>
        <groupId>com.github.shyiko</groupId>
        <artifactId>mysql-binlog-connector-java</artifactId>
        <version>${binlog.version}</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>${fastjson.version}</version>
    </dependency>
</dependencies>


解析binlog日志内容代码如下:

import com.github.shyiko.mysql.binlog.BinaryLogFileReader;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;

import java.io.File;
import java.io.IOException;

/**
 * @author idea
 * @description
 */
public class BinlogMain {

    private static String sourceFilePath = "本地binlog文件的地址";


    public static void main(String[] args) throws IOException {
        String filePath = sourceFilePath;
        File binlogFile = new File(filePath);
        EventDeserializer eventDeserializer = new EventDeserializer();
        eventDeserializer.setChecksumType(ChecksumType.CRC32);
        BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile,
                eventDeserializer);
        try {
            for (Event event; (event = reader.readEvent()) != null; ) {
                EventType eventType = event.getHeader().getEventType();
                if (EventType.QUERY.equals(eventType)) {
                    System.out.println(event);
                } else if (EventType.UPDATE_ROWS.equals(eventType)) {
                    System.out.println(event);
                } else if (EventType.TABLE_MAP.equals(eventType)) {
                    //每个rows_event事件的前后都会有个binlog记录,用于描述表内部的id和变化
                    System.out.println(event);
                } else if (EventType.isUpdate(eventType)) {
                    System.out.println(event);
                } else if (EventType.isWrite(eventType)) {
                    System.out.println(event);
                } else if (EventType.isDelete(eventType)) {
                    System.out.println(event);
                } else if (EventType.isUpdate(eventType)) {
                    System.out.println(event);
                }
            }
        } finally {
            reader.close();
        }

    }

}


除了支持解析已有的binlog日志外,使用该组件还支持伪造成slave,实时监听binlog变更的效果。但是它在数据处理,稳定性等各方面均不如阿里巴巴的Canal要好,所以适合简单场景处理。 如果你需要做实时监听,可以参考这篇文章:blog.csdn.net/m0_37583655…

0条评论

您的电子邮件等信息不会被公开,以下所有项均必填

OK! You can skip this field.