Canal是阿里巴巴开源的数据库binlog日志解析工具,其可以监听MySQL、PostgreSQL、TiDB、Oracle数据库等数据库的binlog,并将变更实时地推送给指定的消费端。

以下为监听mysql的binlog日志流程:

一、打开mysql的binlog权限:

1、打开方式:

找到mysql的my.ini文件(或者my.cnf文件)找到 [mysqld] 部分,并添加如下配置:

log-bin = mysql-bin

server-id = 1

binlog_format = raw

binlog-do-db=canal_text(要监听的库名,如果不配置,默认监听所有库)

2、查看是否开启成功:

show variables like '%log_bin%';

3、给canal链接数据库权限:

ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'root';

刷新:

FLUSH PRIVILEGES;

二、下载canal(windows版):

1、修改conf /canal.properties配置文件,注册地址为本机IP

2、修改conf/example/instance.properties配置文件,配置数据库信息

3、启动服务:Windows下使用startup.bat

4、启动成功:

三、java依赖和客户端代码:

1、配置java和maven环境后加入依赖包(java环境和maven环境配置可以上网搜,有详细教程):
<parent>

   <artifactId>spring-boot-parent</artifactId>

   <groupId>org.springframework.boot</groupId>

   <version>2.1.4.RELEASE</version>

</parent>

<dependencies>

   <!--canal依赖-->

   <dependency>

       <groupId>com.alibaba.otter</groupId>

       <artifactId>canal.client</artifactId>

       <version>1.1.5</version>

   </dependency>

   <!-- Message、CanalEntry.Entry等来自此安装包 -->

   <dependency>

       <groupId>com.alibaba.otter</groupId>

       <artifactId>canal.protocol</artifactId>

       <version>1.1.5</version>

   </dependency>

   <dependency>

       <groupId>org.slf4j</groupId>

       <artifactId>jcl-over-slf4j</artifactId>

       <version>1.5.6</version>

   </dependency>

   <dependency>

       <groupId>org.eclipse.aether</groupId>

       <artifactId>aether-impl</artifactId>

       <version>1.0.0.v20140518</version>

   </dependency>

   <dependency>

       <groupId>org.eclipse.jetty</groupId>

       <artifactId>jetty-server</artifactId>

       <version>9.4.46.v20220331</version>

   </dependency>

   <dependency>

       <groupId>org.projectlombok</groupId>

       <artifactId>lombok</artifactId>

   </dependency>

   <dependency>

       <groupId>mysql</groupId>

       <artifactId>mysql-connector-java</artifactId>

       <version>8.0.15</version>

   </dependency>

</dependencies>
2、编写客户端和写入记录表操作:

客户端代码:

package org.example;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry;

import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;

import java.util.ArrayList;

import java.util.List;

public class Monitor {

    //static List<String> str_list = new ArrayList<>();

    static String operate;

    public static void main(String[] args) throws InterruptedException {

    CanalConnector connector = CanalConnectors.newSingleConnector(

            new InetSocketAddress("192.168.26.114",11111), "example", "admin", "4ACFE3202A5FF5CF467898FC58AAB1D615029441");//之前配置的canal的注册地址和端口号,默认端口号为11111

    int batchSize = 1000;

    try {

        connector.connect();

        connector.subscribe(".*\\..*");

        connector.rollback();

        while (true) {

            Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据

            long batchId = message.getId();

            int size = message.getEntries().size();

            if (batchId == -1 || size == 0) {//间隔一秒

                // 没有变化,等一秒钟再去拉取数据

                Thread.sleep(1000);

            } else {

                printEntry(message.getEntries());

            }

            connector.ack(batchId); // 提交确认

            // connector.rollback(batchId); // 处理失败, 回滚数据

        }

    } finally {

        connector.disconnect();

    }

}


    private static void printEntry(List<CanalEntry.Entry> entrys) {

        for (CanalEntry.Entry entry : entrys) {

            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {

                continue;

            }


            CanalEntry.RowChange rowChage = null;

            try {

                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());

            } catch (Exception e) {

                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),

                        e);

            }

            CanalEntry.EventType eventType = rowChage.getEventType();

            operate = eventType.toString();//获取操作方式(增删改)

            System.out.println(String.format("================> \nbinlog[%s:%s]\ndatabase_name: %s\ntable_name: %s\neventType : %s",

                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),

                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),

                    eventType));//java客户端输出信息

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {//判断增删改

                if (eventType == CanalEntry.EventType.DELETE) {

                    printColumn(rowData.getBeforeColumnsList());

                } else if (eventType == CanalEntry.EventType.INSERT) {


                    printColumn(rowData.getAfterColumnsList());

                } else {//修改会产生两条信息,一条修改前,一条修改后

                    System.out.println("-------> before");

                    operate = eventType.toString()+"-"+"before";//修改前的记录为UPDATE-before

                    printColumn(rowData.getBeforeColumnsList());

                    operate = eventType.toString()+"-"+"after";//修改后的记录为UPDATE-after

                    System.out.println("-------> after");

                    printColumn(rowData.getAfterColumnsList());

                }

            }

        }

    }

    private static void printColumn(List<CanalEntry.Column> columns) {//修改内容函数,获取修改内容,并写入新的记录表

        List<String> str_list = new ArrayList<>();//定义新的集合,用来记录修改内容数据

        String name,age,sal;

        for (CanalEntry.Column column : columns) {

            str_list.add(column.getValue());

            System.out.println(column.getName() + " : " + column.getValue() );

         }

        DDL_Mysql a = new DDL_Mysql();//定义插入类对象

        name = str_list.get(0);//将修改的name字段数据赋值给name

        age = str_list.get(1);//将修改的age字段数据赋值给age

        sal = str_list.get(2);//将修改的sal字段数据赋值给sal

        a.insert(name,age,sal,operate);//调用插入函数,将mysql日志的数据的内容和操作方式插入记录表中

    }

}
3、编写插入记录表代码:
package org.example;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.sql.Statement;

import java.sql.PreparedStatement;

public class DDL_Mysql {

    String driver = "com.mysql.jdbc.Driver";

    /*

     * URL指向要访问的数据库名(我用的是数据库名为test)

     *本地用127.0.0.1

     */

    String url = "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&serverTimezone=GMT";

    String user = "root";      //MySQL配置时的用户名

    String password = "root";   //MySQL配置时的密码

    //throw则是指抛出的一个具体的异常类型。


    /*

     * 连接数据库

     */


    public  Connection getConn()

    {

        Connection con = null;

        try

        {  ////加载驱动程序

            Class.forName(driver);

        }

        catch (ClassNotFoundException e)

        {

            e.printStackTrace();

        }

        try

        {

            con = DriverManager.getConnection(url,user,password);//注意是三个参数

        }

        catch (SQLException e)

        {

            e.printStackTrace();

        }

        return con;

    }

   

    /*

     * 插入操作

     */



    public int insert(String name,String age,String sal,String operate)

    {

        Connection con = getConn();

        String sql = "insert into test (name, age, sal,operate) values (?, ?, ?, ?)";

        try {

            //用来执行SQL语句

            PreparedStatement pst = con.prepareStatement(sql);

            pst.setString(1,name);

            pst.setString(2, age);

            pst.setString(3, sal);

            pst.setString(4, operate);

            i = pst.executeUpdate();

        }

        catch (Exception e) {

            e.printStackTrace();

        }

        System.out.println(i);

        return i; //返回影响的行数,1为执行成功

    }

}

四、测试:

  测试中使用两张表,一张是canal_text库中的test表(操作表),一张是test库中的test表(记录表)

操作表:

记录表:

当我对操作表添加一条数据时:

操作表中加入一条信息:

同时,记录库中也有一条操作信息,包括操作方式:

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐