docker+canal+canal-admin监听mysql增删改操作-初体验(window版本)
C#+docker+canal+canal-admin初体验,实现mysql增删改数据之后,触发C#事件
操作系统:window10
Docker Server Version: 20.10.12
canal-serve:1.1.15
canal-admin:1.1.15
C# 目标框架:.NET 5 (这个应该低版本也没啥问题)
官方文档:https://www.bookstack.cn/read/canal-v1.1.4/e015ba3570152b7a.md
canal安装
此处跳过一般环境安装
# 安装canal-admin
# 管理地址是:127.0.0.1:8089 帐号密码:admin/123456。 如下命令中的密码应该是加入admin的帐号密码(猜测)
docker run -d -it -p 8089:8089 -e server.port=8089 -e canal.adminUser=admin -e canal.adminPasswd=admin --name=canal-admin -m 1024m canal/canal-admin:v1.1.5
# 安装canal-server
# 11111端口是canal客户端连接的端口; 下面canal.admin.manager的ip换成你局域网的ip
docker run -d -it -p 11111:11111 -e canal.admin.manager=10.8.0.5:8089 -e canal.admin.port=11110 -e canal.admin.user=admin -e canal.admin.passwd=4ACFE3202A5FF5CF467898FC58AAB1D615029441 --name=canal-server -m 2048m canal/canal-server:v1.1.5
完成上面步骤后,看下效果,看到server有一条记录,代表canal-server已经成功加入到了canal-admin
C# 代码
C#代码我直接粘贴了,有点丑,大家先别管,自己跑起来感受下然后自己修改把
参考: https://github.com/dotnetcore/CanalSharp
using CanalSharp.Connections;
using Microsoft.Extensions.Logging;
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Linq;
using CanalSharp.Protocol;
namespace CanalTest
{
class Program
{
static ILogger<SimpleCanalConnection> logger;
static ILogger<SimpleCanalConnection> _logger;
static async Task Main(string[] args)
{
var loggerFactory = LoggerFactory.Create(builder =>
{
builder
.AddFilter("Microsoft", LogLevel.Debug)
.AddFilter("System", LogLevel.Information);
//builder.AddConsole();
});
logger = loggerFactory.CreateLogger<SimpleCanalConnection>();
var conn = new SimpleCanalConnection(new SimpleCanalOptions("127.0.0.1", 11111, "22"), logger);
//连接到 Canal Server
await conn.ConnectAsync();
//订阅
await conn.SubscribeAsync();
do
{
try
{
var msg = await conn.GetAsync(1024);
PrintEntry(msg.Entries);
await Task.Delay(300);
}
catch (Exception ex)
{
Thread.Sleep(5000);
Console.WriteLine(ex.Message);
//logger.LogError(ex, "异常");
}
} while (true);
}
private static void PrintEntry(List<Entry> entries)
{
foreach (var entry in entries)
{
if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
{
continue;
}
RowChange rowChange = null;
try
{
rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
}
catch (Exception e)
{
logger.LogError(e.ToString());
}
if (rowChange != null)
{
EventType eventType = rowChange.EventType;
logger.LogInformation(
$"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}");
foreach (var rowData in rowChange.RowDatas)
{
if (eventType == EventType.Delete)
{
PrintColumn(rowData.BeforeColumns.ToList());
}
else if (eventType == EventType.Insert)
{
PrintColumn(rowData.AfterColumns.ToList());
}
else
{
logger.LogInformation("-------> before");
PrintColumn(rowData.BeforeColumns.ToList());
logger.LogInformation("-------> after");
PrintColumn(rowData.AfterColumns.ToList());
}
}
}
}
static void PrintColumn(List<Column> columns)
{
foreach (var column in columns)
{
Console.WriteLine($"{column.Name} : {column.Value} update= {column.Updated}");
}
}
}
}
}
遇到的问题
CanalSharp.CanalConnectionException:“Received an error returned by the server: something goes wrong with channel:[id: 0x47ae2285, /172.17.0.1:56572 => /172.17.0.3:11111], exception=com.alibaba.otter.canal.server.exception.CanalServerException: destination:example should start first
”
这个问题是真的狗,canal概念也多,找了很多才知道要添加一个名为example的instance,其他名字不好使,修改canal.destinations都不好使
如图,保证example为启动状态,在连接就ok了
我体验到现在比较晚了,还有一个坑,tm的断开一下后不会自动重连,无语!
场景使用
前几天我在研究我两个库的数据如何放到一个查询页面,并且都能够搜索他,还能分页。找到个词语叫做聚合查询
第一个就是redis,发现我第一步就失败了,能分页不能搜索条件,能搜索条件玩不好分页,最后放弃了
然后就自己封装了一套规则,把变化数据传到mq,然后在每个缓存数据订阅变化,并且增量更新,但这个有一个地方就是每个cur的时候需要发送变化数据到mq,这可不得了,那个混蛋没有写这行代码,就要被客户叼了。
后面和就Java大佬聊了下,他果断给出了canal这个工具,经过两天研究发现挺好的,正好解决了我聚合查询缓存的增量更新问题,还不用太担心混蛋漏写代码
同步数据,这个老掉牙了,canal还有其他生态,可以直接同步数据到es,其他库啥的,不用编码
视频
canal演示
canal-补充
更多推荐
所有评论(0)