aa5c88e240b12946aad2de5cbabd5c18.png

一、什么是canal

官网的介绍

canal,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

b04a31974393c8731e3c1a5968431696.png

      Canal 是阿里巴巴开源的一套分布式数据库同步系统,目前主要支持 MySQL、RDS。Canal 的主要原理是伪装成 MySQL 的 Slave 节点,监听 MySQL 主库的 binlog 文件,根据 binglog 将数据库事件发送到 MQ 中,消费端可以订阅 MQ 中的消息。为了方便 Canal 的运维人员,阿里还提供了 Canal Admin 这个运维平台,使用户可以快速和安全的操作。

二、canal能做什么

基于日志增量订阅&消费实现数据同步,canal的数据同步不是全量的,而是增量。基于binary log增量订阅和消费,canal可以做:

  1. 数据库镜像

  2. 数据库实时备份

  3. 多级索引 (卖家和买家各自分库索引)

  4. search build

  5. 业务cache刷新

  6. 价格变化等重要业务消息

三、如何搭建canal

3.1 首先有一个MySQL服务器

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

3.1.1 在MySQL中需要创建一个用户,并授权:

-- 创建用户,用户名:canal,密码:canal@123456

create user canal identified by 'canal@123456';

-- 为canal用户赋予replication权限,*.*标识所有库

grant SELECT,REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'canal@123456';

-- 修改完毕立即生效

flush privileges;

3.1.2 使用命令查看数据库是否开启binlog模式:

show variables like 'log_%';


显示如下图:

a2253d2fe7e3b5411db842f56b8d5453.png

log_bin属性值为ON,则binlog模式开启;为OFF则binlog模式关闭。

若binlog模式关闭,则在MySQL配置文件my.cnf设置如下信息:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

修改完配置文件之后,重启MySQL,使用命令查看是否打开binlog模式。

757b8d308e12405e9ab0f277025e08d1.png

查看binlog日志文件列表:

d37e46fee96fcf4a726b9297c8949e70.png

查看当前正在写入的binlog文件:

fa86c7c81abedf9723d36d2aef9438e0.png

3.2 安装canal

官网下载地址:

https://github.com/alibaba/canal/releases

以v1.1.5-alpha-2版本为例

2d8318d1607695dcc9f97e428475db37.png

1.linux服务器下载:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-2/canal.deployer-1.1.5-SNAPSHOT.tar.gz

2.创建文件夹:mkdir /usr/local/canal

3.解压文件:tar -zxvf canal.deployer-1.1.0.tar.gz -C /usr/local/canal/

解压后如下图:

2b4f8860dd40f649ff87d411bd9c93a3.png

打开配置文件conf/example/instance.properties,配置信息如下:

vi conf/example/instance.properties

#################################################

## mysql serverId , v1.0.26+ will autoGen

## v1.0.26版本后会自动生成slaveId,所以可以不用配置 # canal.instance.mysql.slaveId=0 # 数据库地址canal.instance.master.address=127.0.0.1:3306# binlog日志名称 canal.instance.master.journal.name=mysql-bin.000001 # mysql主库链接时起始的binlog偏移量canal.instance.master.position=154# mysql主库链接时起始的binlog的时间戳 canal.instance.master.timestamp= canal.instance.master.gtid= # username/password # 在MySQL服务器授权的账号密码 canal.instance.dbUsername=canal canal.instance.dbPassword=canal@123456 # 字符集 canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false # table regex .*\\..*表示监听所有表 也可以写具体的表名,用,隔开 canal.instance.filter.regex=.*\\..* # mysql 数据解析表的黑名单,多个表用,隔开 canal.instance.filter.black.regex=

备注:

canal.instance.filter.regex 配置规则

mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:1. 所有表:.* or .*\\..*2. canal schema下所有表:canal\\..*3. canal下的以canal打头的表:canal\\.canal.*4. canal schema下的一张表:canal.test15. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)

4.启动canal服务

window: /bin/startup.bat

linux: /bin/startup.sh

四、Java客户端操作

4.1 在pom.xml中添加canal的jar包

    com.alibaba.otter

    canal.client

    1.1.4

4.2 编写测试类

import java.net.InetSocketAddress;import java.util.List;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.protocol.Message;import com.alibaba.otter.canal.protocol.CanalEntry.Column;import com.alibaba.otter.canal.protocol.CanalEntry.Entry;import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;import com.alibaba.otter.canal.protocol.CanalEntry.EventType;import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;import com.alibaba.otter.canal.protocol.CanalEntry.RowData;import com.alibaba.otter.canal.client.*;public class CanalTest {    public static void main(String args[]) {        // 创建链接        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.10.34",                11111), "example", "", "");        int batchSize = 1000;        int emptyCount = 0;        try {            connector.connect();            connector.subscribe(".*\\..*");            connector.rollback();            int totalEntryCount = 1200;            while (emptyCount < totalEntryCount) {                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据                long batchId = message.getId();                int size = message.getEntries().size();                if (batchId == -1 || size == 0) {                    emptyCount++;                    System.out.println("empty count : " + emptyCount);                    try {                        Thread.sleep(1000);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                } else {                    emptyCount = 0;                    printEntry(message.getEntries());                }                connector.ack(batchId); // 提交确认            }            System.out.println("empty too many times, exit");        }catch (Exception e){            //connector.rollback(batchId); // 处理失败, 回滚数据        }        finally {            connector.disconnect();        }    }    private static void printEntry( List<Entry> entrys) {        for (Entry entry : entrys) {            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {                continue;            }            RowChange rowChage = null;            try {                rowChage = RowChange.parseFrom(entry.getStoreValue());            } catch (Exception e) {                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);            }            EventType eventType = rowChage.getEventType();            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),                    eventType));            for (RowData rowData : rowChage.getRowDatasList()) {                if (eventType == EventType.DELETE) {                    printColumn(rowData.getBeforeColumnsList());                    //此处可以将监控到的rowData.getAfterColumnsList()集合数据更新或者同步到redis或者es中                } else if (eventType == EventType.INSERT) {                    printColumn(rowData.getAfterColumnsList());                    //此处可以将监控到的rowData.getAfterColumnsList()集合数据更新或者同步到redis或者es中                } else {                    System.out.println("-------> before");                    printColumn(rowData.getBeforeColumnsList());                    System.out.println("-------> after");                    printColumn(rowData.getAfterColumnsList());                    //此处可以将监控到的rowData.getAfterColumnsList()集合数据更新或者同步到redis或者es中                }            }        }    }    private static void printColumn( List<Column> columns) {        for (Column column : columns) {            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());        }    }}

测试结果示例:

================》; binlog[mysql-bin.000003:409047] , name[jq_test,user] , eventType : UPDATE------->; beforeid : 22    update=falsenick_name : 米熊    update=falsephone :     update=falsehead_img_url : https://wx.qlogo.cn/mmopen/Q/132    update=falsegender : 2    update=falseapplet_open_id : oY0le5V-YzahRYmY5PWMfo    update=falseip_addr :     update=falseuser_type : 1    update=falsecreate_time : 2020-03-25 21:21:26    update=falseupdate_time :     update=falseunion_id :     update=falsepublic_open_id :     update=false------->; afterid : 22    update=falsenick_name : 。米哈    update=truephone :     update=falsehead_img_url : https://wx.qlogo.cn/mmopen/Q/132    update=falsegender : 2    update=falseapplet_open_id : oY0le5V-YzahRYmY5PWMfo    update=falseip_addr :     update=falseuser_type : 1    update=falsecreate_time : 2020-03-25 21:21:26    update=falseupdate_time :     update=falseunion_id :     update=falsepublic_open_id :     update=false
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐