基于Canal实现MySQL到Elasticsearch的数据同步
基于Canal实现MySQL到Elasticsearch的数据同步
一、概述
Canal 是阿里巴巴开源的 MySQL binlog 增量订阅&消费组件,通过模拟 MySQL slave 的交互协议,从 MySQL 主库或从库获取 binlog 日志,解析后同步到 Elasticsearch 或其他存储系统。本文档基于实际项目经验,详细说明 Canal 的部署、配置、排错及注意事项,帮助读者快速搭建稳定可靠的同步链路。
二、环境准备
2.1 软件版本
-
MySQL:8.0.45(主从架构)
-
Elasticsearch:8.14.0(双节点集群)
-
Canal:1.1.8
-
JDK:openjdk 11.0.30
-
操作系统:Ubuntu 22.04
2.2 硬件要求
-
至少 2 核 CPU、4GB 内存
-
磁盘空间充足(取决于数据量)
2.3 网络要求
-
确保所有节点之间网络互通
-
MySQL 端口(3306)、ES 端口(9200/9300)、Canal 端口(11111、8081)需开放
三、MySQL 主从复制配置
3.1 主库配置(172.18.28.60)
编辑 /etc/mysql/mysql.conf.d/mysqld.cnf,在[mysqld]节中添加/修改以下配置:
[mysqld]
server-id = 1
log_bin = /var/log/mysql/mysql-bin.log
binlog_format = ROW
binlog_row_image = FULL
binlog_expire_logs_seconds = 604800 # 7天
max_binlog_size = 1000M
gtid_mode = ON
enforce_gtid_consistency = ON
binlog_format = ROW 和 binlog_row_image = FULL 是Canal必须的设置。 gtid_mode 和 enforce_gtid_consistency 开启GTID,简化复制管理。 然后重启MySQL服务
systemctl restart mysql
3.2 从库配置(172.18.28.62)
编辑 /etc/mysql/mysql.conf.d/mysqld.cnf,在[mysqld]节中添加/修改以下配置:
[mysqld]
server-id = 2
log_bin = /var/log/mysql/mysql-bin.log
binlog_format = ROW
binlog_row_image = FULL
log_slave_updates = 1 # 若Canal从从库读取,必须开启
binlog_expire_logs_seconds = 604800
max_binlog_size = 1000M
gtid_mode = ON
enforce_gtid_consistency = ON
read_only = 1 # 防止意外写入
super_read_only = 1
然后重启MySQL服务
systemctl restart mysql
3.3 创建复制用户(在主库执行)
在主库服务器中,连接MySQL,并执行以下操作
CREATE USER 'replicator'@'%' IDENTIFIED BY 'your_password';
GRANT REPLICATION SLAVE ON *.* TO 'replicator'@'%';
FLUSH PRIVILEGES;
3.4 初始化从库数据
如果从库没有数据,需要先从主库导出并导入:
mysqldump -u root -p --all-databases --master-data=2 > master.sql
scp master.sql root@172.18.28.62:/tmp/
mysql -u root -p < /tmp/master.sql
3.5 启动从库复制(在从库执行)
STOP SLAVE;
CHANGE MASTER TO
MASTER_HOST = '主库IP',
MASTER_PORT = 3306,
MASTER_USER = 'replicator',
MASTER_PASSWORD = 'your_password',
MASTER_AUTO_POSITION = 1;
START SLAVE;
确认Slave_IO_Running 和 Slave_SQL_Running 均为 Yes。Seconds_Behind_Master为0或者逐渐减小。
四、Canal Server 安装与配置
4.1 下载 Canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.8/canal.deployer-1.1.8.tar.gz
mkdir -p /usr/local/canal/server
tar -zxvf canal.deployer-1.1.8.tar.gz -C /usr/local/canal/server
4.2 配置 Canal Server
编辑 /usr/local/canal/server/conf/canal.properties:
canal.id = 1
canal.ip = 0.0.0.0
canal.port = 11111
canal.metrics.pull.port = 11112
canal.serverMode = tcp
canal.destinations = example
编辑实例配置文件 /usr/local/canal/server/conf/example/instance.properties:
canal.instance.gtidon=true # 启动GTID
canal.instance.master.address = 172.18.28.62:3306 # 指向从库
canal.instance.dbUsername = canal
canal.instance.dbPassword = Canal@123456
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName = mh_spider
canal.instance.filter.regex = mh_spider\\..* # 同步所有表
注意:在主库中需要创建 MySQL 用户 canal 并授予权限:
CREATE USER 'canal'@'%' IDENTIFIED BY 'Canal@123456';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
4.3 启动 Canal Server
cd /usr/local/canal/server/bin
./startup.sh
tail -f ../logs/example/example.log
看到start position successfully 表示启动成功。
五、Canal Adapter 安装与配置
5.1 下载 Canal Adapter
wget https://github.com/alibaba/canal/releases/download/canal-1.1.8/canal.adapter-1.1.8.tar.gz
mkdir -p /usr/local/canal/adapter
tar -zxvf canal.adapter-1.1.8.tar.gz -C /usr/local/canal/adapter
5.2 配置 Adapter 数据源
编辑 /usr/local/canal/adapter/conf/application.yml,添加或修改以下内容,其它原有内容保持不变。
srcDataSources:
defaultDS:
url: jdbc:mysql://172.18.28.60:3306/mh_spider?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true&connectTimeout=60000&socketTimeout=6000000&zeroDateTimeBehavior=convertToNull&jdbcCompliantTruncation=false
username: canal
password: Canal@Test1688
initialSize: 5
minIdle: 5
maxActive: 20 # 从 3 增加到 20
maxWait: 120000 # 获取连接超时时间,改为 2 分钟
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT 1
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
concurrency: 1
outerAdapters:
- name: es8
hosts: http://172.18.28.60:9200,http://172.18.28.62:9200 # 您的 ES 集群地址
properties:
mode: rest
security.auth: testadmin:testadmin@1688 # ES 认证信息
cluster.name: mhyx-es-test
connectTimeout: 60000
socketTimeout: 120000
commitBatch: 5000 # 降低单次写入数量,减轻压力
paths: /usr/local/canal/adapter/conf/es8
logging:
level:
com.alibaba.otter.canal.client.adapter: INFO
com.alibaba.otter.canal.adapter.launcher: INFO
org.apache.http: INFO
org.elasticsearch.client: INFO
关键参数说明:
-
socketTimeout=600000:防止大查询时连接超时(单位毫秒)。 -
security.auth:ES 认证信息,格式用户名:密码。
5.3 配置同步映射文件
在 /usr/local/canal/adapter/conf/es8/ 目录下创建表映射文件,例如 sync_servers.yml:
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: es_cat_servers
_id: _id
table: cat_servers
sql: >
SELECT
id AS _id,
IFNULL(aid, '') AS aid,
IFNULL(forWhich, 0) AS forWhich,
IFNULL(aname, '') AS aname,
IFNULL(sid, 0) AS sid,
IFNULL(sname, '') AS sname,
IFNULL(last_begin, NOW()) AS last_begin,
IFNULL(last_end, NOW()) AS last_end,
IFNULL(server_status, 1) AS server_status,
IFNULL(current_spider, '') AS current_spider,
IFNULL(pages_p, 10) AS pages_p,
IFNULL(bk, '') AS bk,
IFNULL(from_date, '1980-01-01') AS from_date
FROM cat_servers
commitBatch: 2000
etlCondition: "where id > {} ORDER BY id ASC" # 支持断点续传
注意:
-
确保id为数据库主键,对应的es文档id为_id,否则删除es文档会不成功
-
使用
SELECT明确指定字段,避免SELECT *导致字段过多、类型不匹配等问题。 -
确保每个字段在 ES 索引映射中存在且类型兼容。
-
确保每个字段都做Null判断处理,否则可能会出现空指针异常
-
etlCondition用于增量同步时的过滤条件。
5.4 启动 Canal Adapter
cd /usr/local/canal/adapter/bin
./stop.sh
./startup.sh
tail -f ../logs/adapter/adapter.log
看到 Load canal adapter: es8 succeed 和 Subscribe destination: example succeed 表示启动成功。
六、Elasticsearch 索引创建
在同步数据前,必须创建好目标索引。以下是一个简化的索引映射示例(根据实际表结构调整):
PUT /es_servers_idx
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"dynamic": false,
"properties": {
"id": { "type": "integer" },
"aid": { "type": "keyword" },
"forWhich": { "type": "byte" },
"aname": { "type": "keyword" },
"sid": { "type": "integer" },
"sname": { "type": "keyword" },
"last_begin": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis||strict_date_optional_time"
},
"last_end": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis||strict_date_optional_time"
},
"server_status": { "type": "byte" },
"current_spider": { "type": "keyword" },
"pages_p": { "type": "byte" },
"bk": { "type": "keyword" },
"from_date": {
"type": "date",
"format": "yyyy-MM-dd||yyyy-MM-dd HH:mm:ss||epoch_millis||strict_date_optional_time"
}
}
}
}
如果后续需要添加字段,可以使用 update mapping API,但注意不能修改已有字段类型。
七、全量数据同步(ETL)
对于已有历史数据,需要手动触发全量同步:
curl -X POST http://127.0.0.1:8081/etl/es8/sync_servers.yml
观察 Adapter 日志,等待同步完成。
tail -f /usr/local/canal/adapter/logs/adapter/adapter.log
也可以通过 ES 文档计数验证:
curl -u testadmin:testadmin@1688 "http://172.18.28.60:9200/es_servers_idx/_count?pretty"
八、增量同步测试
在 MySQL 主库插入一条新数据:
INSERT INTO cat_servers (aid, aname, sid, sname) VALUES ('A001', '测试名称', 123, '测试服务器名');
等待几秒,在 ES 中查询
GET /es_cat_servers_idx/_search
应能返回文档。
其它更新/删除可同理完成测试。
九、常见错误及解决方法
9.1 Canal Server 连接 MySQL 失败
错误日志:java.io.IOException: connect /172.18.28.62:3306 failure
解决方法:
-
检查 MySQL 地址、端口是否正确,网络是否通畅(
telnet测试)。 -
确认
canal用户有远程访问权限。 -
检查 MySQL 是否开启了 binlog(
SHOW VARIABLES LIKE 'log_bin';)。
9.2 主从复制中断(错误 1062 重复键)
现象:Slave_SQL_Running: No,错误码 1062。
解决方法:
-
在从库上删除冲突记录:
DELETE FROM servers WHERE id = ?; -
重启复制:
STOP SLAVE; START SLAVE; -
或者跳过错误事务:
SET GLOBAL SQL_SLAVE_SKIP_COUNTER = 1; START SLAVE;
9.3 Canal Adapter 同步时 NoSuchElementException
错误日志:java.util.NoSuchElementException
原因:使用了 SELECT *,但 ES mapping 中缺少部分字段,或字段顺序导致解析异常。
解决方法:在映射文件中显式指定字段名,确保每个字段在 mapping 中有定义。
9.4 ETL 过程中 Read timed out
错误日志:CommunicationsException: Read timed out
原因:大表查询时间超过 JDBC 默认超时(通常 30 秒)。
解决方法:在 JDBC URL 中添加 socketTimeout=600000(10 分钟),并适当增大 connectTimeout。
9.5 ES 认证失败
错误日志:security_exception
解决方法:检查 application.yml 中 security.auth 格式为 用户名:密码,确认密码正确。
9.6 同步后 ES 中没有数据
-
检查 Canal Server 日志,确认 binlog 解析正常。
-
检查 Adapter 日志,确认事件被接收且无错误。
-
检查 ES 索引是否存在,mapping 是否与 SQL 字段兼容。
-
确认 MySQL 主从复制正常(从库有数据)。
9.7 Canal Adapter 健康检查 404
现象:访问 /actuator/health 返回 404。
解决方法:在 application.yml 中添加:
management:
endpoints:
web:
exposure:
include: health,info
base-path: /actuator
重启 Adapter。
9.8 Canal同步时提示字段数不匹配错误
错误日志:column size is not match for table:mh_database.db_pet,152 vs 151
原因: Canal Server 缓存的表结构元数据未及时更新(例如表曾执行过 ALTER TABLE 添加了一列,但 Canal 仍使用旧的结构),导致解析 binlog 事件时列数不匹配。
解决方法:先停止Canal Server服务,然后清理所有可能的元数据残留,包括/usr/local/canal/server/conf/example目录下的h2.mv.db和meta.dat两个文件,
十、性能优化建议
-
增大 batch 大小:适当提高
commitBatch值(如 5000~10000),减少 ES 写入次数。 -
调整 ES 刷新间隔:对于批量导入,可临时设置
index.refresh_interval = -1,导入完成后恢复。 -
增加 Canal Adapter 并发:调整
syncBatchSize和线程池参数。 -
使用从库作为数据源:将 Canal Server 指向从库,减轻主库压力。
-
监控复制延迟:定期检查
Seconds_Behind_Master,设置告警阈值。
十一、注意事项
-
不要使用
SELECT *:始终指定字段列表,避免字段类型不匹配和性能问题。 -
确保主从复制稳定:Canal 依赖从库的 binlog,复制中断将导致同步停止。
-
避免在主库执行大事务:大事务会导致 binlog 剧增,影响复制延迟。
-
定时清理 binlog:设置合理的
expire_logs_days,防止磁盘爆满。 -
备份重要数据:全量 ETL 前备份 ES 索引或 MySQL 数据,防止误操作。
-
使用非 root 用户运行:创建专用系统用户
canal运行服务,避免权限问题。 -
监控磁盘空间:ES 数据盘和 MySQL 数据盘需定期监控。
更多推荐
所有评论(0)