1. 实现原理
    1.  WAL 与逻辑复制基础

KingbaseES(金仓数据库)基于 PostgreSQL 内核开发,WAL(Write-Ahead Logging) 是其核心日志机制,所有数据修改(DML:INSERT/UPDATE/DELETE)都会先写入 WAL 再落盘,保证数据一致性。

  • 物理 WAL:仅记录数据块的物理变化(如哪个字节偏移量修改了),无法直接识别业务语义;
  • 逻辑 WAL:将物理变化解析为 “表名 + 操作类型 + 字段值” 的结构化数据,decoderbufs插件正是金仓提供的逻辑解码插件,可将 WAL 日志转换为 Protocol Buffers(protobuf)格式的结构化数据,方便程序解析。
    1. 1.2 decoderbufs + Python 解析流程

核心逻辑:通过sys_recvlogical(金仓逻辑复制客户端工具)连接数据库复制槽,借助decoderbufs插件将 WAL 转换为 protobuf 格式,Python 读取后解析为可读的结构化数据,同时处理编码问题避免乱码

  1. 环境基础信息

数据库:KingbaseES V009R001C002B0014(Oracle 模式)

Python 版本:3.7.17

核心依赖:

金仓内置decoderbufs插件(逻辑解码核心);

sys_recvlogical(金仓逻辑复制客户端);

Python 库:protobuf(解析 protobuf 数据)、chardet(编码检测,解决乱码)

  1. 数据库端核心配置
    1. 必配参数

修改金仓数据库配置文件kingbase.conf(路径:$KINGBASE_DATA/kingbase.conf),确保以下参数配置正确(解决乱码 + 启用逻辑复制):

# 1. 启用逻辑WAL(decoderbufs必需)

wal_level = logical  

# 2. 复制槽/发送进程数量(需≥1)

max_replication_slots = 10

max_wal_senders = 10

# 3. 统一编码(核心:消除乱码)

client_encoding = UTF8

server_encoding = UTF8

# 4. 字符集排序规则(可选,增强兼容性)

lc_collate = 'zh_CN.UTF-8'

lc_ctype = 'zh_CN.UTF-8'

修改后重启金仓数据库:

# 停止数据库

sys_ctl -D $KINGBASE_DATA stop

# 启动数据库

sys_ctl -D $KINGBASE_DATA start

    1. 授权与访问配置
  1. 授予用户复制权限:

ALTER ROLE system WITH REPLICATION;  -- system为操作用户,可替换为自定义用户

  1. 修改sys_hba.conf允许复制连接:

local   replication     system                     trust

host    replication     system     127.0.0.1/32    trust

host    replication     system     ::1/128         trust

# 新增:确保普通连接也用UTF8编码

host    all             all        127.0.0.1/32    md5 client_encoding='UTF8'

    1. 验证编码配置

登录数据库执行以下 SQL,确认编码均为 UTF8(乱码核心根源):

show client_encoding;   -- 客户端编码

show server_encoding;   -- 服务端编码

-- 预期结果均为:UTF8

  1. Python 环境与解析脚本
    1. 安装依赖

pip3 install protobuf==3.20.1 chardet==5.2.0  # 版本适配Python3.7

    1. 完整解析脚本
import subprocess
import os
import time
import json
import struct
import threading
import signal
from datetime import datetime
import atexit
import tempfile

# ========== 核心配置(修改临时文件路径到用户可写目录) ==========
CONFIG = {
    "sys_recvlogical_path": "/home/kingbase/Kingbase/ES/V8/Server/bin/sys_recvlogical",
    "slot_name": "wal_parser_slot",
    "host": "192.168.56.206",
    "port": "54322",
    "user": "system",
    "dbname": "test",
    "password": "Kdb@2025",
    "log_path": "/home/kingbase/logs/wal_parser.log",
    "tmp_log_path": "/home/kingbase/tmp/wal_tmp.bin",  # 改为kingbase用户目录
    "debug_log_path": "/home/kingbase/logs/wal_debug.log",
    "poll_interval": 100,
    "restart_interval": 5,
    "max_restart_attempts": 10
}

# ========== 全局变量 ==========
running = True
monitor_thread = None
sys_process = None
restart_attempts = 0
last_file_size = 0

# ========== 日志工具 ==========
class Logger:
    @staticmethod
    def debug(msg):
        log_line = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] [DEBUG] {msg}\n"
        # 确保日志目录存在且可写
        log_dir = os.path.dirname(CONFIG["debug_log_path"])
        if not os.path.exists(log_dir):
            os.makedirs(log_dir, mode=0o755, exist_ok=True)
        with open(CONFIG["debug_log_path"], 'a', encoding='utf-8') as f:
            f.write(log_line)
        print(log_line.strip())

    @staticmethod
    def info(msg):
        log_line = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] [INFO] {msg}\n"
        log_dir = os.path.dirname(CONFIG["log_path"])
        if not os.path.exists(log_dir):
            os.makedirs(log_dir, mode=0o755, exist_ok=True)
        with open(CONFIG["log_path"], 'a', encoding='utf-8') as f:
            f.write(log_line)

# ========== 初始化目录 ==========
def init_directories():
    """初始化必要的目录"""
    # 创建临时文件目录
    tmp_dir = os.path.dirname(CONFIG["tmp_log_path"])
    if not os.path.exists(tmp_dir):
        os.makedirs(tmp_dir, mode=0o755, exist_ok=True)
        Logger.debug(f"✅ 创建临时目录: {tmp_dir}")

    # 创建日志目录
    log_dirs = [
        os.path.dirname(CONFIG["log_path"]),
        os.path.dirname(CONFIG["debug_log_path"])
    ]
    for dir_path in log_dirs:
        if not os.path.exists(dir_path):
            os.makedirs(dir_path, mode=0o755, exist_ok=True)

# ========== 信号处理 ==========
def signal_handler(signum, frame):
    """处理退出信号"""
    global running
    Logger.debug(f"🛑 接收到信号 {signum},准备退出...")
    running = False

# ========== 安全清理函数 ==========
def clean_up():
    """安全清理资源(跳过无权限的文件)"""
    global sys_process, running

    running = False
    Logger.debug("🧹 开始清理资源...")

    # 停止sys_recvlogical进程
    if sys_process and sys_process.poll() is None:
        try:
            Logger.debug("🔌 停止sys_recvlogical进程...")
            os.killpg(os.getpgid(sys_process.pid), signal.SIGTERM)
            time.sleep(2)
            if sys_process.poll() is None:
                os.killpg(os.getpgid(sys_process.pid), signal.SIGKILL)
            Logger.debug("✅ 进程已停止")
        except Exception as e:
            Logger.debug(f"⚠️ 停止进程失败: {e}")

    # 强制清理相关进程
    try:
        os.system(f"pkill -9 -f 'sys_recvlogical.*{CONFIG['slot_name']}' > /dev/null 2>&1")
        Logger.debug("✅ 相关进程已清理")
    except:
        pass

    # 安全清理临时文件(只删除自己创建的)
    if os.path.exists(CONFIG["tmp_log_path"]):
        try:
            # 先尝试修改权限
            os.chmod(CONFIG["tmp_log_path"], 0o666)
            os.remove(CONFIG["tmp_log_path"])
            Logger.debug(f"✅ 清理临时文件: {CONFIG['tmp_log_path']}")
        except PermissionError:
            Logger.debug(f"⚠️ 无权限删除文件 {CONFIG['tmp_log_path']},跳过")
        except Exception as e:
            Logger.debug(f"⚠️ 清理文件失败: {e}")
    else:
        Logger.debug(f"ℹ️ 临时文件 {CONFIG['tmp_log_path']} 不存在")

    Logger.debug("✅ 资源清理完成")

# ========== 注册退出处理 ==========
atexit.register(clean_up)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGHUP, signal_handler)

# ========== 金仓二进制解析器 ==========
class KingbaseBinaryParser:
    def __init__(self):
        self.op_mapping = {0: "INSERT", 1: "UPDATE", 2: "DELETE", 3: "TRUNCATE"}

    def decode_varint(self, data, pos):
        """Protobuf Varint解码"""
        result = 0
        shift = 0
        while pos < len(data):
            byte = data[pos]
            result |= (byte & 0x7F) << shift
            pos += 1
            if not (byte & 0x80):
                break
            shift += 7
        return result, pos

    def parse_exact_data(self, data):
        """精准解析二进制数据"""
        records = []
        pos = 0
        len_data = len(data)

        while pos < len_data and running:
            try:
                record = {
                    "operation": "INSERT",
                    "schema": "public",
                    "table": "",
                    "columns": {},
                    "timestamp": str(int(time.time() * 1000))
                }

                # 查找表名字段 (0x22)
                table_pos = data.find(b'\x22', pos)
                if table_pos == -1:
                    break
                pos = table_pos + 1

                # 读取表名长度和内容
                table_len, pos = self.decode_varint(data, pos)
                if pos + table_len <= len_data:
                    record["table"] = data[pos:pos+table_len].decode('utf-8')
                    pos += table_len
                else:
                    continue

                # 跳过0x28字段
                if pos < len_data and data[pos] == 0x28:
                    pos += 1
                    _, pos = self.decode_varint(data, pos)

                # 解析列数据 (0x32)
                while pos < len_data and data[pos] == 0x32 and running:
                    pos += 1
                    col_block_len, pos = self.decode_varint(data, pos)
                    col_end = pos + col_block_len

                    if col_end > len_data:
                        break

                    # 解析列名 (0x0A)
                    if pos < col_end and data[pos] == 0x0A:
                        pos += 1
                        col_name_len, pos = self.decode_varint(data, pos)
                        if pos + col_name_len <= col_end:
                            col_name = data[pos:pos+col_name_len].decode('utf-8')
                            pos += col_name_len

                            # 跳过类型OID (0x10)
                            if pos < col_end and data[pos] == 0x10:
                                pos += 1
                                _, pos = self.decode_varint(data, pos)

                            # 跳过NULL标记 (0x18)
                            while pos < col_end and data[pos] == 0x18:
                                pos += 1
                                _, pos = self.decode_varint(data, pos)

                            # 跳过0x01标记
                            while pos < col_end and data[pos] == 0x01:
                                pos += 1

                            # 解析列值
                            if pos < col_end and data[pos] == 0x20:
                                # 整数类型
                                pos += 1
                                col_value, pos = self.decode_varint(data, pos)
                                record["columns"][col_name] = col_value

                            elif pos < col_end and data[pos] == 0x4A:
                                # 字符串类型
                                pos += 1
                                str_len, pos = self.decode_varint(data, pos)
                                if pos + str_len <= col_end:
                                    str_value = data[pos:pos+str_len].decode('utf-8')
                                    record["columns"][col_name] = str_value
                                    pos += str_len

                # 添加有效记录
                if record["table"] and record["columns"]:
                    records.append(record)

            except Exception as e:
                Logger.debug(f"⚠️ 解析偏移{pos}错误: {e}")
                pos += 1
                continue

        return records

# ========== 启动sys_recvlogical ==========
def start_sys_recvlogical():
    """启动sys_recvlogical(带重试)"""
    global sys_process, restart_attempts

    # 达到最大重启次数则退出
    if restart_attempts >= CONFIG["max_restart_attempts"]:
        Logger.debug(f"❌ 达到最大重启次数 {CONFIG['max_restart_attempts']},停止重启")
        return False

    Logger.debug(f"🚀 启动sys_recvlogical (尝试 {restart_attempts + 1}/{CONFIG['max_restart_attempts']})")

    # 构建命令
    cmd = [
        CONFIG["sys_recvlogical_path"],
        "-d", CONFIG["dbname"],
        "-h", CONFIG["host"],
        "-p", CONFIG["port"],
        "-U", CONFIG["user"],
        "--slot", CONFIG["slot_name"],
        "--file", CONFIG["tmp_log_path"],
        "--start",
        "-P", "decoderbufs"
    ]

    # 环境变量
    env = os.environ.copy()
    env["KINGBASE_PASSWORD"] = CONFIG["password"]
    env["LD_LIBRARY_PATH"] = os.path.dirname(CONFIG["sys_recvlogical_path"]) + ":" + env.get("LD_LIBRARY_PATH", "")

    try:
        # 启动进程(创建新进程组)
        sys_process = subprocess.Popen(
            cmd,
            env=env,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            preexec_fn=os.setsid,
            shell=False
        )

        # 验证启动
        time.sleep(2)
        if sys_process.poll() is not None:
            stderr = sys_process.stderr.read().decode('utf-8', errors='replace')
            Logger.debug(f"❌ sys_recvlogical启动失败: {stderr}")
            restart_attempts += 1
            time.sleep(CONFIG["restart_interval"])
            return start_sys_recvlogical()

        Logger.debug("✅ sys_recvlogical启动成功")
        restart_attempts = 0  # 重置重启次数
        return True

    except Exception as e:
        Logger.debug(f"❌ 启动异常: {e}")
        restart_attempts += 1
        time.sleep(CONFIG["restart_interval"])
        return start_sys_recvlogical()

# ========== 监控循环 ==========
def monitor_loop():
    """主监控循环"""
    global last_file_size, running

    parser = KingbaseBinaryParser()
    Logger.debug("🔍 开始监控WAL文件变化...")

    while running:
        try:
            # 检查进程状态
            if not sys_process or sys_process.poll() is not None:
                Logger.debug("⚠️ sys_recvlogical进程已退出,尝试重启...")
                if not start_sys_recvlogical():
                    running = False
                    break
                time.sleep(2)
                continue

            # 检查文件是否存在
            if not os.path.exists(CONFIG["tmp_log_path"]):
                time.sleep(CONFIG["poll_interval"] / 1000)
                continue

            # 获取文件大小
            try:
                current_size = os.path.getsize(CONFIG["tmp_log_path"])
            except:
                current_size = 0

            # 检测到新数据
            if current_size > last_file_size and current_size > 0:
                try:
                    # 读取全部数据
                    with open(CONFIG["tmp_log_path"], 'rb') as f:
                        all_data = f.read()

                    # 解析新增部分
                    new_data = all_data[last_file_size:]

                    if new_data:
                        Logger.debug(f"📥 检测到新数据 - 新增大小: {len(new_data)} 字节")

                        # 解析数据
                        records = parser.parse_exact_data(new_data)

                        # 输出结果
                        for record in records:
                            Logger.debug(f"✅ 解析成功: {json.dumps(record, ensure_ascii=False, indent=2)}")
                            Logger.info(json.dumps(record, ensure_ascii=False))

                    # 更新最后位置
                    last_file_size = current_size

                except Exception as e:
                    Logger.debug(f"⚠️ 读取/解析数据错误: {e}")

            # 短时间休眠,减少CPU占用
            time.sleep(CONFIG["poll_interval"] / 1000)

        except Exception as e:
            Logger.debug(f"⚠️ 监控循环错误: {e}")
            time.sleep(1)

# ========== 主程序 ==========
def main():
    """主程序入口"""
    global running, monitor_thread

    # 初始化目录
    init_directories()

    # 前置检查
    if not os.path.exists(CONFIG["sys_recvlogical_path"]):
        Logger.debug(f"❌ 错误: sys_recvlogical不存在 - {CONFIG['sys_recvlogical_path']}")
        return

    if not os.access(CONFIG["sys_recvlogical_path"], os.X_OK):
        Logger.debug(f"❌ 错误: 无执行权限,请执行: chmod +x {CONFIG['sys_recvlogical_path']}")
        return

    # 初始化全局变量
    global running, restart_attempts, last_file_size
    running = True
    restart_attempts = 0
    last_file_size = 0

    # 启动sys_recvlogical
    if not start_sys_recvlogical():
        Logger.debug("❌ 无法启动sys_recvlogical,程序退出")
        return

    # 启动监控线程
    monitor_thread = threading.Thread(target=monitor_loop)
    monitor_thread.daemon = False  # 非守护线程
    monitor_thread.start()

    Logger.debug("=== 🚀 WAL监控服务已启动 ===")
    Logger.debug("=== 📝 请在数据库执行DML操作测试 ===")
    Logger.debug("=== ⚠️  按 Ctrl+C 停止服务 ===")

    # 主线程永久运行
    while running:
        try:
            time.sleep(3600)  # 休眠1小时
        except KeyboardInterrupt:
            running = False
            break

    # 等待监控线程结束
    if monitor_thread and monitor_thread.is_alive():
        monitor_thread.join(timeout=5)

    # 最终清理
    clean_up()
    Logger.debug("=== 🛑 WAL监控服务已停止 ===")

if __name__ == "__main__":
    main()

# 配置项(根据实际环境修改)

CONFIG = {

    "sys_recvlogical_path": "/home/kingbase/Kingbase/ES/V8/Server/bin/sys_recvlogical",

    "slot_name": "wal_decoder_slot",

    "host": "127.0.0.1",

    "port": "54321",

    "user": "system",

    "dbname": "test",

    "password": "你的数据库密码",

    "log_path": "/home/kingbase/logs/wal_decoder.log"

}

    1. 脚本关键说明
  1. 编码强制统一:通过os.environ["PGCLIENTENCODING"] = "UTF8"强制客户端使用 UTF8 编码,与服务端保持一致;
  2. 兜底编码检测:detect_encoding函数通过chardet检测数据编码,自动转换为 UTF8,无法转换则替换乱码(避免解析中断);
  3. protobuf 解析:decoderbufs输出的 protobuf 数据通过金仓提供的decoderbufs.proto编译后的 Python 文件解析(需从金仓安装目录拷贝decoderbufs.proto,执行protoc --python_out=. decoderbufs.proto生成decoderbufs_pb2.py)。
  1. 运行测试与验证
    1. 脚本部署与权限

# 1. 创建目录

mkdir -p /home/kingbase/scripts /home/kingbase/logs

# 2. 拷贝脚本并修改权限

cp wal_decoder.py /home/kingbase/scripts/

chmod +x /home/kingbase/scripts/wal_decoder.py

chown -R kingbase:kingbase /home/kingbase/scripts /home/kingbase/logs

# 3. 编译protobuf文件(关键)

protoc --python_out=/home/kingbase/scripts/ /home/kingbase/Kingbase/ES/V8/Server/share/extension/decoderbufs.proto

    1. 启动与验证

su - kingbase

cd /home/kingbase/scripts

python3 wal_decoder.py

在数据库执行 DML 操作(含中文):

test=# insert into  ddh values (1,'你好a');

INSERT 0 1

test=#

脚本输出示例(无乱码):

[2026-03-19 11:49:19.826] [DEBUG] === �� WAL监控服务已启动 ===

[2026-03-19 11:49:19.826] [DEBUG] === �� 请在数据库执行DML操作测试 ===

[2026-03-19 11:49:19.826] [DEBUG] === ⚠️  按 Ctrl+C 停止服务 ===

[2026-03-19 11:49:27.311] [DEBUG] �� 检测到新数据 - 新增大小: 146 字节

[2026-03-19 11:49:27.312] [DEBUG] ✅ 解析成功: {

  "operation": "INSERT",

  "schema": "public",

  "table": "ddh",

  "columns": {

    "id": 1,

    "name": "你好a"

  },

  "timestamp": "1773892167311"

}

  1. 关键注意事项
  1. protobuf 文件编译:必须使用金仓自带的decoderbufs.proto编译,不同版本的 proto 文件可能不兼容;
  2. 乱码兜底方案:若仍有乱码,检查数据库表的字符集(需为 UTF8),可通过ALTER TABLE 表名 ALTER COLUMN 字段名 TYPE varchar(255) CHARACTER SET utf8;修改;
  3. 复制槽清理:解析停止后需及时清理复制槽(SELECT sys_drop_replication_slot('wal_decoder_slot');),避免 WAL 日志堆积占用磁盘
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐