一、概述

Canal 是阿里巴巴开源的 MySQL binlog 增量订阅&消费组件,通过模拟 MySQL slave 的交互协议,从 MySQL 主库或从库获取 binlog 日志,解析后同步到 Elasticsearch 或其他存储系统。本文档基于实际项目经验,详细说明 Canal 的部署、配置、排错及注意事项,帮助读者快速搭建稳定可靠的同步链路。

二、环境准备

2.1 软件版本

  • MySQL:8.0.45(主从架构)

  • Elasticsearch:8.14.0(双节点集群)

  • Canal:1.1.8

  • JDK:openjdk 11.0.30

  • 操作系统:Ubuntu 22.04

2.2 硬件要求

  • 至少 2 核 CPU、4GB 内存

  • 磁盘空间充足(取决于数据量)

2.3 网络要求

  • 确保所有节点之间网络互通

  • MySQL 端口(3306)、ES 端口(9200/9300)、Canal 端口(11111、8081)需开放

三、MySQL 主从复制配置

3.1 主库配置(172.18.28.60)

编辑 /etc/mysql/mysql.conf.d/mysqld.cnf,在[mysqld]节中添加/修改以下配置:

[mysqld]
server-id               = 1
log_bin                 = /var/log/mysql/mysql-bin.log
binlog_format           = ROW
binlog_row_image        = FULL
binlog_expire_logs_seconds = 604800   # 7天
max_binlog_size         = 1000M
gtid_mode               = ON
enforce_gtid_consistency = ON
binlog_format = ROW 和 binlog_row_image = FULL 是Canal必须的设置。
gtid_mode 和 enforce_gtid_consistency 开启GTID,简化复制管理。
然后重启MySQL服务
systemctl restart mysql

3.2 从库配置(172.18.28.62)

编辑 /etc/mysql/mysql.conf.d/mysqld.cnf,在[mysqld]节中添加/修改以下配置:

[mysqld]
server-id               = 2
log_bin                 = /var/log/mysql/mysql-bin.log
binlog_format           = ROW
binlog_row_image        = FULL
log_slave_updates       = 1                # 若Canal从从库读取,必须开启
binlog_expire_logs_seconds = 604800
max_binlog_size         = 1000M
gtid_mode               = ON
enforce_gtid_consistency = ON
read_only               = 1                # 防止意外写入
super_read_only         = 1
然后重启MySQL服务
systemctl restart mysql

3.3 创建复制用户(在主库执行)

在主库服务器中,连接MySQL,并执行以下操作

CREATE USER 'replicator'@'%' IDENTIFIED BY 'your_password';
GRANT REPLICATION SLAVE ON *.* TO 'replicator'@'%';
FLUSH PRIVILEGES;

3.4 初始化从库数据

如果从库没有数据,需要先从主库导出并导入:

mysqldump -u root -p --all-databases --master-data=2 > master.sql
scp master.sql root@172.18.28.62:/tmp/
mysql -u root -p < /tmp/master.sql

3.5 启动从库复制(在从库执行)

STOP SLAVE;
CHANGE MASTER TO
  MASTER_HOST = '主库IP',
  MASTER_PORT = 3306,
  MASTER_USER = 'replicator',
  MASTER_PASSWORD = 'your_password',
  MASTER_AUTO_POSITION = 1;
START SLAVE;
确认Slave_IO_Running 和 Slave_SQL_Running 均为 Yes。Seconds_Behind_Master为0或者逐渐减小。

四、Canal Server 安装与配置

4.1 下载 Canal

wget https://github.com/alibaba/canal/releases/download/canal-1.1.8/canal.deployer-1.1.8.tar.gz

mkdir -p /usr/local/canal/server

tar -zxvf canal.deployer-1.1.8.tar.gz -C /usr/local/canal/server

4.2 配置 Canal Server

编辑 /usr/local/canal/server/conf/canal.properties

canal.id = 1
canal.ip = 0.0.0.0
canal.port = 11111
canal.metrics.pull.port = 11112
canal.serverMode = tcp
canal.destinations = example

编辑实例配置文件 /usr/local/canal/server/conf/example/instance.properties

canal.instance.gtidon=true   # 启动GTID

canal.instance.master.address = 172.18.28.62:3306   # 指向从库
canal.instance.dbUsername = canal
canal.instance.dbPassword = Canal@123456
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName = mh_spider
canal.instance.filter.regex = mh_spider\\..*   # 同步所有表

注意:在主库中需要创建 MySQL 用户 canal 并授予权限:

CREATE USER 'canal'@'%' IDENTIFIED BY 'Canal@123456';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

4.3 启动 Canal Server

cd /usr/local/canal/server/bin

./startup.sh

tail -f ../logs/example/example.log

看到start position successfully 表示启动成功。

五、Canal Adapter 安装与配置

5.1 下载 Canal Adapter

wget https://github.com/alibaba/canal/releases/download/canal-1.1.8/canal.adapter-1.1.8.tar.gz

mkdir -p /usr/local/canal/adapter

tar -zxvf canal.adapter-1.1.8.tar.gz -C /usr/local/canal/adapter

5.2 配置 Adapter 数据源

编辑 /usr/local/canal/adapter/conf/application.yml,添加或修改以下内容,其它原有内容保持不变。

srcDataSources:
     defaultDS:
         url: jdbc:mysql://172.18.28.60:3306/mh_spider?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true&connectTimeout=60000&socketTimeout=6000000&zeroDateTimeBehavior=convertToNull&jdbcCompliantTruncation=false
         username: canal
         password: Canal@Test1688
         initialSize: 5
         minIdle: 5
         maxActive: 20           # 从 3 增加到 20
         maxWait: 120000         # 获取连接超时时间,改为 2 分钟
         timeBetweenEvictionRunsMillis: 60000
         minEvictableIdleTimeMillis: 300000
         validationQuery: SELECT 1
         testWhileIdle: true
         testOnBorrow: false
         testOnReturn: false

  canalAdapters:
    - instance: example   # canal instance Name or mq topic name
      groups:
        - groupId: g1
          concurrency: 1   
          outerAdapters:
            - name: es8
              hosts: http://172.18.28.60:9200,http://172.18.28.62:9200  # 您的 ES 集群地址
              properties:
                mode: rest
                security.auth: testadmin:testadmin@1688   # ES 认证信息
                cluster.name: mhyx-es-test
                connectTimeout: 60000
                socketTimeout: 120000
                commitBatch: 5000          # 降低单次写入数量,减轻压力
              paths: /usr/local/canal/adapter/conf/es8 
  logging:
    level:
      com.alibaba.otter.canal.client.adapter: INFO
      com.alibaba.otter.canal.adapter.launcher: INFO
      org.apache.http: INFO
      org.elasticsearch.client: INFO

关键参数说明

  • socketTimeout=600000:防止大查询时连接超时(单位毫秒)。

  • security.auth:ES 认证信息,格式 用户名:密码

5.3 配置同步映射文件

在 /usr/local/canal/adapter/conf/es8/ 目录下创建表映射文件,例如 sync_servers.yml

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: es_cat_servers
  _id: _id
  table: cat_servers
  sql: >
    SELECT
      id AS _id,
      IFNULL(aid, '') AS aid,
      IFNULL(forWhich, 0) AS forWhich,
      IFNULL(aname, '') AS aname,
      IFNULL(sid, 0) AS sid,
      IFNULL(sname, '') AS sname,
      IFNULL(last_begin, NOW()) AS last_begin,
      IFNULL(last_end, NOW()) AS last_end,
      IFNULL(server_status, 1) AS server_status,
      IFNULL(current_spider, '') AS current_spider,
      IFNULL(pages_p, 10) AS pages_p,
      IFNULL(bk, '') AS bk,
      IFNULL(from_date, '1980-01-01') AS from_date
    FROM cat_servers
  commitBatch: 2000
  etlCondition: "where id > {} ORDER BY id ASC"   # 支持断点续传

注意

  • 确保id为数据库主键,对应的es文档id为_id,否则删除es文档会不成功

  • 使用 SELECT 明确指定字段,避免 SELECT * 导致字段过多、类型不匹配等问题。

  • 确保每个字段在 ES 索引映射中存在且类型兼容。

  • 确保每个字段都做Null判断处理,否则可能会出现空指针异常

  • etlCondition 用于增量同步时的过滤条件。

5.4 启动 Canal Adapter

cd /usr/local/canal/adapter/bin

./stop.sh

./startup.sh

tail -f ../logs/adapter/adapter.log

看到 Load canal adapter: es8 succeed 和 Subscribe destination: example succeed 表示启动成功。

六、Elasticsearch 索引创建

在同步数据前,必须创建好目标索引。以下是一个简化的索引映射示例(根据实际表结构调整):

PUT /es_servers_idx
{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1
  },
  "mappings": {
    "dynamic": false,
    "properties": {
      "id": { "type": "integer" },
      "aid": { "type": "keyword" },
      "forWhich": { "type": "byte" },
      "aname": { "type": "keyword" },
      "sid": { "type": "integer" },
      "sname": { "type": "keyword" },
      "last_begin": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis||strict_date_optional_time"
      },
      "last_end": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis||strict_date_optional_time"
      },
      "server_status": { "type": "byte" },
      "current_spider": { "type": "keyword" },
      "pages_p": { "type": "byte" },
      "bk": { "type": "keyword" },
      "from_date": {
        "type": "date",
        "format": "yyyy-MM-dd||yyyy-MM-dd HH:mm:ss||epoch_millis||strict_date_optional_time"
      }
    }
  }
}

如果后续需要添加字段,可以使用 update mapping API,但注意不能修改已有字段类型。

七、全量数据同步(ETL)

对于已有历史数据,需要手动触发全量同步:

curl -X POST http://127.0.0.1:8081/etl/es8/sync_servers.yml
观察 Adapter 日志,等待同步完成。
tail -f /usr/local/canal/adapter/logs/adapter/adapter.log
也可以通过 ES 文档计数验证:
curl -u testadmin:testadmin@1688 "http://172.18.28.60:9200/es_servers_idx/_count?pretty"

八、增量同步测试

在 MySQL 主库插入一条新数据:

INSERT INTO cat_servers (aid, aname, sid, sname)  VALUES ('A001', '测试名称', 123, '测试服务器名');

等待几秒,在 ES 中查询

GET /es_cat_servers_idx/_search

应能返回文档。

其它更新/删除可同理完成测试。

九、常见错误及解决方法

9.1 Canal Server 连接 MySQL 失败

错误日志java.io.IOException: connect /172.18.28.62:3306 failure
解决方法

  • 检查 MySQL 地址、端口是否正确,网络是否通畅(telnet 测试)。

  • 确认 canal 用户有远程访问权限。

  • 检查 MySQL 是否开启了 binlog(SHOW VARIABLES LIKE 'log_bin';)。

9.2 主从复制中断(错误 1062 重复键)

现象Slave_SQL_Running: No,错误码 1062。
解决方法

  • 在从库上删除冲突记录:DELETE FROM servers WHERE id = ?;

  • 重启复制:STOP SLAVE; START SLAVE;

  • 或者跳过错误事务:SET GLOBAL SQL_SLAVE_SKIP_COUNTER = 1; START SLAVE;

9.3 Canal Adapter 同步时 NoSuchElementException

错误日志java.util.NoSuchElementException
原因:使用了 SELECT *,但 ES mapping 中缺少部分字段,或字段顺序导致解析异常。
解决方法:在映射文件中显式指定字段名,确保每个字段在 mapping 中有定义。

9.4 ETL 过程中 Read timed out

错误日志CommunicationsException: Read timed out
原因:大表查询时间超过 JDBC 默认超时(通常 30 秒)。
解决方法:在 JDBC URL 中添加 socketTimeout=600000(10 分钟),并适当增大 connectTimeout

9.5 ES 认证失败

错误日志security_exception
解决方法:检查 application.yml 中 security.auth 格式为 用户名:密码,确认密码正确。

9.6 同步后 ES 中没有数据

  • 检查 Canal Server 日志,确认 binlog 解析正常。

  • 检查 Adapter 日志,确认事件被接收且无错误。

  • 检查 ES 索引是否存在,mapping 是否与 SQL 字段兼容。

  • 确认 MySQL 主从复制正常(从库有数据)。

9.7 Canal Adapter 健康检查 404

现象:访问 /actuator/health 返回 404。
解决方法:在 application.yml 中添加:

management:
  endpoints:
    web:
      exposure:
        include: health,info
      base-path: /actuator

重启 Adapter。

9.8 Canal同步时提示字段数不匹配错误

错误日志:column size is not match for table:mh_database.db_pet,152 vs 151
原因: Canal Server 缓存的表结构元数据未及时更新(例如表曾执行过 ALTER TABLE 添加了一列,但 Canal 仍使用旧的结构),导致解析 binlog 事件时列数不匹配。
解决方法:先停止Canal Server服务,然后清理所有可能的元数据残留,包括/usr/local/canal/server/conf/example目录下的h2.mv.db和meta.dat两个文件,

十、性能优化建议

  1. 增大 batch 大小:适当提高 commitBatch 值(如 5000~10000),减少 ES 写入次数。

  2. 调整 ES 刷新间隔:对于批量导入,可临时设置 index.refresh_interval = -1,导入完成后恢复。

  3. 增加 Canal Adapter 并发:调整 syncBatchSize 和线程池参数。

  4. 使用从库作为数据源:将 Canal Server 指向从库,减轻主库压力。

  5. 监控复制延迟:定期检查 Seconds_Behind_Master,设置告警阈值。

十一、注意事项

  1. 不要使用 SELECT *:始终指定字段列表,避免字段类型不匹配和性能问题。

  2. 确保主从复制稳定:Canal 依赖从库的 binlog,复制中断将导致同步停止。

  3. 避免在主库执行大事务:大事务会导致 binlog 剧增,影响复制延迟。

  4. 定时清理 binlog:设置合理的 expire_logs_days,防止磁盘爆满。

  5. 备份重要数据:全量 ETL 前备份 ES 索引或 MySQL 数据,防止误操作。

  6. 使用非 root 用户运行:创建专用系统用户 canal 运行服务,避免权限问题。

  7. 监控磁盘空间:ES 数据盘和 MySQL 数据盘需定期监控。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐