使用canal配合rocketmq监听mysql的binlog日志
使用canal配合rocketmq监听mysql的binlog日志
目录
一. 安装配置canal
1.1 安装canal
canal由阿里开源到了github, 可在此页面 github地址, 下载开发版本的压缩包, 由于github下载巨慢, 也可以自己寻找国内资源
之后解压缩, canal 的文件目录如下
1.2 配置canal基本属性
进入到 conf 文件夹, 目录结构如下
先进行基础的配置, vim命令进入到 canal.properties 文件, 配置 canal 服务的ip地址, 以及端口号
之后配置 canal 可视化控制台, 我这里没安装, 所以暂时不配置
由于本次是配合 rocketMQ 使用, 所以在下面的配置中找到 canal.serverMode, 并设置为 rocketMQ
之后, 进入到关键的配置, 配置 rocketMQ 各项属性
rocketmq.producer.group : 生产者分组, 和在代码中指定的 group 的含义一致
rocketmq.namesrv.addr : mq 的 nameSvr 的ip地址
rocketmq.tag : 生产者生产消息时, 指定的 tag
1.3 配置canal的mysql
进入 conf 下的 example 文件夹, 目录如下, vim 命令进入 instance.properties
几个比较重要的配置
canal.instance.mysql.slaveId : 因为 canal 的工作原理是伪装成 mysql 的 slave, 所以这里需要设置 slave 的 id
canal.instance.master.address : 数据库的 ip 地址以及端口号
canal.instance.dbUsername : 要监听的数据库的登录用户名
canal.instance.dbPassword : 数据库的登录密码
canal.instance.filter.regex : 要监听的数据库表, 正则表达式匹配模式, 没配置就是全部监听
canal.mq.topic : mq 生产者的发送消息时, 指定的 topic, 和 java代码的含义一致
二. mysql配置
2.1 开启mysql的binlog日志
由于 canal 的工作原理, 所以需要开启 mysql 的 binlog 日志, vim 命令编辑 etc下的 my.cnf 文件, 在 mysqld 下添加如下配置 :
log-bin : 配置 binlog 日志文件目录
binlog-format : 配置 binlog 日志文件的格式
server_id : 配置 mysql 主节点的 id, 不能和集群中id, 或是从节点中的 id 重复, 该 id 不能和 canal 的 slaveId 重复
之后查看 mysql binlog 的开启状态, show variables like '%log_bin%', 可以看到是开启状态
2.2 配置 canal 专用用户
在 canal 配置文件中, 我使用的 root 用户登录的, 所以不在给 mysql 配置 canal 用户, 可参考其他博客
2.3 启动canal
进入到bin文件夹, 目录如下, 使用命令 ./startup.sh 启 canal, 特别注意, 如果服务器内存不充足, 可以先使用 vim 编辑 startup.sh, 把文件中的 java参数的使用内存设置小一点
之后观察 canal.log 日志, 可以看到启动成功
查看 rocketmq_client.log 日志, 发现 canal 一直在向 mq 发送心跳检测, 并输出了 mq 的 group, 实例id信息
登录 rocketMQ 可视化控制台, 可以看到 canal 注册到 mq 中的生产者实例, 以及 topic 信息, 这些信息和我们之前在 canal 的配置文件中配置的一致
至此, 关于服务器中 canal 的配置介绍完毕, 下面用java代码实现 mq 消费者监生产者的信息
三. java代码的实现
3.1 mq消费者
由于 canal 使用的 mq 模式, 他可以说是一个 mq 生产者, 所以我们需要定义 mq 的消费者, 代码如下, 消费者要监听的 topic 和 tag 和 canal 配置文件中配置的一致
消费者抽象类
/**
* @author canxiusi.yan
* @description AbstractProducer 抽象SysLog抽象父类
* @date 2022/5/10 10:03
*/
public abstract class AbstractBinLogConsumer {
/**
* 子类通用日志
*/
protected static final Logger logger = LoggerFactory.getLogger(AbstractBinLogConsumer.class);
private DefaultMQPushConsumer consumer;
private final static String GROUP_NAME = "canal-rocketmq-es%canal";
/**
* 初始化mq消费者
*/
@PostConstruct
public void initConsumer() {
// 相同的业务消费者分到同组, 需要指定相同的tag,
consumer = new DefaultMQPushConsumer(GROUP_NAME);
consumer.setNamesrvAddr("ip:port");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setInstanceName("bin_log_consumer");
consumer.setPullInterval(1000);
// 设置每次从队列中拉取的消息数
consumer.setPullBatchSize(1000);
try {
consumer.subscribe("binlog_topic", "binlog_es_tag");
} catch (MQClientException e) {
logger.error("[binlog消费者初始化异常]", e);
System.exit(-1);
}
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
if (CollectionUtils.isEmpty(msgs)) {
logger.info(LogUtils.format("[消息列表为空], msgs=<{0}>", msgs));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt msg : msgs) {
try {
handlerMsg(msg);
} catch (Exception e) {
logger.error("[日志消费处理异常]", e);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
logger.info(LogUtils.format("[binlog消费者初始化完成], consumer=<{0}>", consumer));
}
/**
* 启动消费者
*
* @param event
*/
@EventListener(ApplicationPreparedEvent.class)
public void startSysLogConsumer(ApplicationPreparedEvent event) {
try {
consumer.start();
} catch (MQClientException e) {
logger.error(LogUtils.format("[binlog消费者start异常], nameServerAddress={}"), e);
System.exit(-1);
}
}
@PreDestroy
public void closeConsumer() {
consumer.shutdown();
}
/**
* 解析binlog日志
* @param msg
*/
protected abstract void handlerMsg(MessageExt msg);
}
消费者实现类, 这里只把监听的消息, 做下日志打印
/**
* @author canxiusi.yan
* @description BinlogConsumer
* @date 2022/6/28 15:07
*/
@Component
public class BinlogConsumer extends AbstractBinLogConsumer {
/**
* 日志
*/
private static final Logger logger = LoggerFactory.getLogger(BinlogConsumer.class);
@Override
protected void handlerMsg(MessageExt msg) {
JSONObject jsonObject = JSONObject.parseObject(new String(msg.getBody()));
// 影响到几条sql, 就会有几条日志
logger.info(LogUtils.format("监听到binlog日志, json=<{0}>", jsonObject));
}
}
3.2 运行模拟
运行程序, 登录 rocketmq 控制台, 发现消费者已经注册成功
我这里使用 postman 调用新增数据的接口, 向 db 插入一条sql, 流程: 向db插入数据, 生成 binlog 日志, 把日志同步给伪装成了从节点的 canal, 然后把 binlog 日志发送给 mq, 之后代码中的消费者监听到数据, 观察日志打印, 可以发现, 已经监听到了消息
格式化消息后, 可以看到, 本次 db 变更的类型, 变更的数据, 都是能获取到的, 后面就可以做很多事情了, 比如把数据同步到 redis, es搜索引擎等
更多推荐
所有评论(0)