Canal监听mysql日志数据(增、删、改操作)windows版
Canal监听mysql日志数据(增、删、改操作)windows版
·
Canal是阿里巴巴开源的数据库binlog日志解析工具,其可以监听MySQL、PostgreSQL、TiDB、Oracle数据库等数据库的binlog,并将变更实时地推送给指定的消费端。
以下为监听mysql的binlog日志流程:
一、打开mysql的binlog权限:
1、打开方式:
找到mysql的my.ini文件(或者my.cnf文件)找到 [mysqld] 部分,并添加如下配置:
log-bin = mysql-bin
server-id = 1
binlog_format = raw
binlog-do-db=canal_text(要监听的库名,如果不配置,默认监听所有库)
2、查看是否开启成功:
show variables like '%log_bin%';

3、给canal链接数据库权限:
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'root';
刷新:
FLUSH PRIVILEGES;
二、下载canal(windows版):
1、修改conf /canal.properties配置文件,注册地址为本机IP

2、修改conf/example/instance.properties配置文件,配置数据库信息

3、启动服务:Windows下使用startup.bat

4、启动成功:

三、java依赖和客户端代码:
1、配置java和maven环境后加入依赖包(java环境和maven环境配置可以上网搜,有详细教程):
<parent>
<artifactId>spring-boot-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.1.4.RELEASE</version>
</parent>
<dependencies>
<!--canal依赖-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
<!-- Message、CanalEntry.Entry等来自此安装包 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>1.5.6</version>
</dependency>
<dependency>
<groupId>org.eclipse.aether</groupId>
<artifactId>aether-impl</artifactId>
<version>1.0.0.v20140518</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>9.4.46.v20220331</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
</dependencies>
2、编写客户端和写入记录表操作:
客户端代码:
package org.example;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
public class Monitor {
//static List<String> str_list = new ArrayList<>();
static String operate;
public static void main(String[] args) throws InterruptedException {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("192.168.26.114",11111), "example", "admin", "4ACFE3202A5FF5CF467898FC58AAB1D615029441");//之前配置的canal的注册地址和端口号,默认端口号为11111
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {//间隔一秒
// 没有变化,等一秒钟再去拉取数据
Thread.sleep(1000);
} else {
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
} finally {
connector.disconnect();
}
}
private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
operate = eventType.toString();//获取操作方式(增删改)
System.out.println(String.format("================> \nbinlog[%s:%s]\ndatabase_name: %s\ntable_name: %s\neventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));//java客户端输出信息
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {//判断增删改
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {//修改会产生两条信息,一条修改前,一条修改后
System.out.println("-------> before");
operate = eventType.toString()+"-"+"before";//修改前的记录为UPDATE-before
printColumn(rowData.getBeforeColumnsList());
operate = eventType.toString()+"-"+"after";//修改后的记录为UPDATE-after
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {//修改内容函数,获取修改内容,并写入新的记录表
List<String> str_list = new ArrayList<>();//定义新的集合,用来记录修改内容数据
String name,age,sal;
for (CanalEntry.Column column : columns) {
str_list.add(column.getValue());
System.out.println(column.getName() + " : " + column.getValue() );
}
DDL_Mysql a = new DDL_Mysql();//定义插入类对象
name = str_list.get(0);//将修改的name字段数据赋值给name
age = str_list.get(1);//将修改的age字段数据赋值给age
sal = str_list.get(2);//将修改的sal字段数据赋值给sal
a.insert(name,age,sal,operate);//调用插入函数,将mysql日志的数据的内容和操作方式插入记录表中
}
}
3、编写插入记录表代码:
package org.example;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.PreparedStatement;
public class DDL_Mysql {
String driver = "com.mysql.jdbc.Driver";
/*
* URL指向要访问的数据库名(我用的是数据库名为test)
*本地用127.0.0.1
*/
String url = "jdbc:mysql://127.0.0.1:3306/test?useSSL=false&serverTimezone=GMT";
String user = "root"; //MySQL配置时的用户名
String password = "root"; //MySQL配置时的密码
//throw则是指抛出的一个具体的异常类型。
/*
* 连接数据库
*/
public Connection getConn()
{
Connection con = null;
try
{ ////加载驱动程序
Class.forName(driver);
}
catch (ClassNotFoundException e)
{
e.printStackTrace();
}
try
{
con = DriverManager.getConnection(url,user,password);//注意是三个参数
}
catch (SQLException e)
{
e.printStackTrace();
}
return con;
}
/*
* 插入操作
*/
public int insert(String name,String age,String sal,String operate)
{
Connection con = getConn();
String sql = "insert into test (name, age, sal,operate) values (?, ?, ?, ?)";
try {
//用来执行SQL语句
PreparedStatement pst = con.prepareStatement(sql);
pst.setString(1,name);
pst.setString(2, age);
pst.setString(3, sal);
pst.setString(4, operate);
i = pst.executeUpdate();
}
catch (Exception e) {
e.printStackTrace();
}
System.out.println(i);
return i; //返回影响的行数,1为执行成功
}
}
四、测试:
测试中使用两张表,一张是canal_text库中的test表(操作表),一张是test库中的test表(记录表)
操作表:

记录表:

当我对操作表添加一条数据时:

操作表中加入一条信息:

同时,记录库中也有一条操作信息,包括操作方式:

更多推荐
所有评论(0)