KingbaseES 基于 decoderbufs 插件的 WAL 逻辑解析(Python 实现)
摘要:KingbaseES基于PostgreSQL内核,采用WAL机制确保数据一致性。通过decoderbufs插件将物理WAL日志转换为protobuf格式的结构化数据,Python程序可解析该数据。实现步骤包括:配置数据库参数(wal_level=logical、UTF8编码)、创建复制槽、安装Python依赖(protobuf、chardet),并运行解析脚本。关键点包括强制UTF8编码、p
KingbaseES(金仓数据库)基于 PostgreSQL 内核开发,WAL(Write-Ahead Logging) 是其核心日志机制,所有数据修改(DML:INSERT/UPDATE/DELETE)都会先写入 WAL 再落盘,保证数据一致性。
- 物理 WAL:仅记录数据块的物理变化(如哪个字节偏移量修改了),无法直接识别业务语义;
- 逻辑 WAL:将物理变化解析为 “表名 + 操作类型 + 字段值” 的结构化数据,decoderbufs插件正是金仓提供的逻辑解码插件,可将 WAL 日志转换为 Protocol Buffers(protobuf)格式的结构化数据,方便程序解析。
核心逻辑:通过sys_recvlogical(金仓逻辑复制客户端工具)连接数据库复制槽,借助decoderbufs插件将 WAL 转换为 protobuf 格式,Python 读取后解析为可读的结构化数据,同时处理编码问题避免乱码
数据库:KingbaseES V009R001C002B0014(Oracle 模式)
Python 版本:3.7.17
核心依赖:
金仓内置decoderbufs插件(逻辑解码核心);
sys_recvlogical(金仓逻辑复制客户端);
Python 库:protobuf(解析 protobuf 数据)、chardet(编码检测,解决乱码)
修改金仓数据库配置文件kingbase.conf(路径:$KINGBASE_DATA/kingbase.conf),确保以下参数配置正确(解决乱码 + 启用逻辑复制):
# 1. 启用逻辑WAL(decoderbufs必需)
wal_level = logical
# 2. 复制槽/发送进程数量(需≥1)
max_replication_slots = 10
max_wal_senders = 10
# 3. 统一编码(核心:消除乱码)
client_encoding = UTF8
server_encoding = UTF8
# 4. 字符集排序规则(可选,增强兼容性)
lc_collate = 'zh_CN.UTF-8'
lc_ctype = 'zh_CN.UTF-8'
修改后重启金仓数据库:
# 停止数据库
sys_ctl -D $KINGBASE_DATA stop
# 启动数据库
sys_ctl -D $KINGBASE_DATA start
ALTER ROLE system WITH REPLICATION; -- system为操作用户,可替换为自定义用户
- 修改sys_hba.conf允许复制连接:
local replication system trust
host replication system 127.0.0.1/32 trust
host replication system ::1/128 trust
# 新增:确保普通连接也用UTF8编码
host all all 127.0.0.1/32 md5 client_encoding='UTF8'
登录数据库执行以下 SQL,确认编码均为 UTF8(乱码核心根源):
show client_encoding; -- 客户端编码
show server_encoding; -- 服务端编码
-- 预期结果均为:UTF8
pip3 install protobuf==3.20.1 chardet==5.2.0 # 版本适配Python3.7
import subprocess
import os
import time
import json
import struct
import threading
import signal
from datetime import datetime
import atexit
import tempfile
# ========== 核心配置(修改临时文件路径到用户可写目录) ==========
CONFIG = {
"sys_recvlogical_path": "/home/kingbase/Kingbase/ES/V8/Server/bin/sys_recvlogical",
"slot_name": "wal_parser_slot",
"host": "192.168.56.206",
"port": "54322",
"user": "system",
"dbname": "test",
"password": "Kdb@2025",
"log_path": "/home/kingbase/logs/wal_parser.log",
"tmp_log_path": "/home/kingbase/tmp/wal_tmp.bin", # 改为kingbase用户目录
"debug_log_path": "/home/kingbase/logs/wal_debug.log",
"poll_interval": 100,
"restart_interval": 5,
"max_restart_attempts": 10
}
# ========== 全局变量 ==========
running = True
monitor_thread = None
sys_process = None
restart_attempts = 0
last_file_size = 0
# ========== 日志工具 ==========
class Logger:
@staticmethod
def debug(msg):
log_line = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] [DEBUG] {msg}\n"
# 确保日志目录存在且可写
log_dir = os.path.dirname(CONFIG["debug_log_path"])
if not os.path.exists(log_dir):
os.makedirs(log_dir, mode=0o755, exist_ok=True)
with open(CONFIG["debug_log_path"], 'a', encoding='utf-8') as f:
f.write(log_line)
print(log_line.strip())
@staticmethod
def info(msg):
log_line = f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]}] [INFO] {msg}\n"
log_dir = os.path.dirname(CONFIG["log_path"])
if not os.path.exists(log_dir):
os.makedirs(log_dir, mode=0o755, exist_ok=True)
with open(CONFIG["log_path"], 'a', encoding='utf-8') as f:
f.write(log_line)
# ========== 初始化目录 ==========
def init_directories():
"""初始化必要的目录"""
# 创建临时文件目录
tmp_dir = os.path.dirname(CONFIG["tmp_log_path"])
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir, mode=0o755, exist_ok=True)
Logger.debug(f"✅ 创建临时目录: {tmp_dir}")
# 创建日志目录
log_dirs = [
os.path.dirname(CONFIG["log_path"]),
os.path.dirname(CONFIG["debug_log_path"])
]
for dir_path in log_dirs:
if not os.path.exists(dir_path):
os.makedirs(dir_path, mode=0o755, exist_ok=True)
# ========== 信号处理 ==========
def signal_handler(signum, frame):
"""处理退出信号"""
global running
Logger.debug(f"🛑 接收到信号 {signum},准备退出...")
running = False
# ========== 安全清理函数 ==========
def clean_up():
"""安全清理资源(跳过无权限的文件)"""
global sys_process, running
running = False
Logger.debug("🧹 开始清理资源...")
# 停止sys_recvlogical进程
if sys_process and sys_process.poll() is None:
try:
Logger.debug("🔌 停止sys_recvlogical进程...")
os.killpg(os.getpgid(sys_process.pid), signal.SIGTERM)
time.sleep(2)
if sys_process.poll() is None:
os.killpg(os.getpgid(sys_process.pid), signal.SIGKILL)
Logger.debug("✅ 进程已停止")
except Exception as e:
Logger.debug(f"⚠️ 停止进程失败: {e}")
# 强制清理相关进程
try:
os.system(f"pkill -9 -f 'sys_recvlogical.*{CONFIG['slot_name']}' > /dev/null 2>&1")
Logger.debug("✅ 相关进程已清理")
except:
pass
# 安全清理临时文件(只删除自己创建的)
if os.path.exists(CONFIG["tmp_log_path"]):
try:
# 先尝试修改权限
os.chmod(CONFIG["tmp_log_path"], 0o666)
os.remove(CONFIG["tmp_log_path"])
Logger.debug(f"✅ 清理临时文件: {CONFIG['tmp_log_path']}")
except PermissionError:
Logger.debug(f"⚠️ 无权限删除文件 {CONFIG['tmp_log_path']},跳过")
except Exception as e:
Logger.debug(f"⚠️ 清理文件失败: {e}")
else:
Logger.debug(f"ℹ️ 临时文件 {CONFIG['tmp_log_path']} 不存在")
Logger.debug("✅ 资源清理完成")
# ========== 注册退出处理 ==========
atexit.register(clean_up)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGHUP, signal_handler)
# ========== 金仓二进制解析器 ==========
class KingbaseBinaryParser:
def __init__(self):
self.op_mapping = {0: "INSERT", 1: "UPDATE", 2: "DELETE", 3: "TRUNCATE"}
def decode_varint(self, data, pos):
"""Protobuf Varint解码"""
result = 0
shift = 0
while pos < len(data):
byte = data[pos]
result |= (byte & 0x7F) << shift
pos += 1
if not (byte & 0x80):
break
shift += 7
return result, pos
def parse_exact_data(self, data):
"""精准解析二进制数据"""
records = []
pos = 0
len_data = len(data)
while pos < len_data and running:
try:
record = {
"operation": "INSERT",
"schema": "public",
"table": "",
"columns": {},
"timestamp": str(int(time.time() * 1000))
}
# 查找表名字段 (0x22)
table_pos = data.find(b'\x22', pos)
if table_pos == -1:
break
pos = table_pos + 1
# 读取表名长度和内容
table_len, pos = self.decode_varint(data, pos)
if pos + table_len <= len_data:
record["table"] = data[pos:pos+table_len].decode('utf-8')
pos += table_len
else:
continue
# 跳过0x28字段
if pos < len_data and data[pos] == 0x28:
pos += 1
_, pos = self.decode_varint(data, pos)
# 解析列数据 (0x32)
while pos < len_data and data[pos] == 0x32 and running:
pos += 1
col_block_len, pos = self.decode_varint(data, pos)
col_end = pos + col_block_len
if col_end > len_data:
break
# 解析列名 (0x0A)
if pos < col_end and data[pos] == 0x0A:
pos += 1
col_name_len, pos = self.decode_varint(data, pos)
if pos + col_name_len <= col_end:
col_name = data[pos:pos+col_name_len].decode('utf-8')
pos += col_name_len
# 跳过类型OID (0x10)
if pos < col_end and data[pos] == 0x10:
pos += 1
_, pos = self.decode_varint(data, pos)
# 跳过NULL标记 (0x18)
while pos < col_end and data[pos] == 0x18:
pos += 1
_, pos = self.decode_varint(data, pos)
# 跳过0x01标记
while pos < col_end and data[pos] == 0x01:
pos += 1
# 解析列值
if pos < col_end and data[pos] == 0x20:
# 整数类型
pos += 1
col_value, pos = self.decode_varint(data, pos)
record["columns"][col_name] = col_value
elif pos < col_end and data[pos] == 0x4A:
# 字符串类型
pos += 1
str_len, pos = self.decode_varint(data, pos)
if pos + str_len <= col_end:
str_value = data[pos:pos+str_len].decode('utf-8')
record["columns"][col_name] = str_value
pos += str_len
# 添加有效记录
if record["table"] and record["columns"]:
records.append(record)
except Exception as e:
Logger.debug(f"⚠️ 解析偏移{pos}错误: {e}")
pos += 1
continue
return records
# ========== 启动sys_recvlogical ==========
def start_sys_recvlogical():
"""启动sys_recvlogical(带重试)"""
global sys_process, restart_attempts
# 达到最大重启次数则退出
if restart_attempts >= CONFIG["max_restart_attempts"]:
Logger.debug(f"❌ 达到最大重启次数 {CONFIG['max_restart_attempts']},停止重启")
return False
Logger.debug(f"🚀 启动sys_recvlogical (尝试 {restart_attempts + 1}/{CONFIG['max_restart_attempts']})")
# 构建命令
cmd = [
CONFIG["sys_recvlogical_path"],
"-d", CONFIG["dbname"],
"-h", CONFIG["host"],
"-p", CONFIG["port"],
"-U", CONFIG["user"],
"--slot", CONFIG["slot_name"],
"--file", CONFIG["tmp_log_path"],
"--start",
"-P", "decoderbufs"
]
# 环境变量
env = os.environ.copy()
env["KINGBASE_PASSWORD"] = CONFIG["password"]
env["LD_LIBRARY_PATH"] = os.path.dirname(CONFIG["sys_recvlogical_path"]) + ":" + env.get("LD_LIBRARY_PATH", "")
try:
# 启动进程(创建新进程组)
sys_process = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
preexec_fn=os.setsid,
shell=False
)
# 验证启动
time.sleep(2)
if sys_process.poll() is not None:
stderr = sys_process.stderr.read().decode('utf-8', errors='replace')
Logger.debug(f"❌ sys_recvlogical启动失败: {stderr}")
restart_attempts += 1
time.sleep(CONFIG["restart_interval"])
return start_sys_recvlogical()
Logger.debug("✅ sys_recvlogical启动成功")
restart_attempts = 0 # 重置重启次数
return True
except Exception as e:
Logger.debug(f"❌ 启动异常: {e}")
restart_attempts += 1
time.sleep(CONFIG["restart_interval"])
return start_sys_recvlogical()
# ========== 监控循环 ==========
def monitor_loop():
"""主监控循环"""
global last_file_size, running
parser = KingbaseBinaryParser()
Logger.debug("🔍 开始监控WAL文件变化...")
while running:
try:
# 检查进程状态
if not sys_process or sys_process.poll() is not None:
Logger.debug("⚠️ sys_recvlogical进程已退出,尝试重启...")
if not start_sys_recvlogical():
running = False
break
time.sleep(2)
continue
# 检查文件是否存在
if not os.path.exists(CONFIG["tmp_log_path"]):
time.sleep(CONFIG["poll_interval"] / 1000)
continue
# 获取文件大小
try:
current_size = os.path.getsize(CONFIG["tmp_log_path"])
except:
current_size = 0
# 检测到新数据
if current_size > last_file_size and current_size > 0:
try:
# 读取全部数据
with open(CONFIG["tmp_log_path"], 'rb') as f:
all_data = f.read()
# 解析新增部分
new_data = all_data[last_file_size:]
if new_data:
Logger.debug(f"📥 检测到新数据 - 新增大小: {len(new_data)} 字节")
# 解析数据
records = parser.parse_exact_data(new_data)
# 输出结果
for record in records:
Logger.debug(f"✅ 解析成功: {json.dumps(record, ensure_ascii=False, indent=2)}")
Logger.info(json.dumps(record, ensure_ascii=False))
# 更新最后位置
last_file_size = current_size
except Exception as e:
Logger.debug(f"⚠️ 读取/解析数据错误: {e}")
# 短时间休眠,减少CPU占用
time.sleep(CONFIG["poll_interval"] / 1000)
except Exception as e:
Logger.debug(f"⚠️ 监控循环错误: {e}")
time.sleep(1)
# ========== 主程序 ==========
def main():
"""主程序入口"""
global running, monitor_thread
# 初始化目录
init_directories()
# 前置检查
if not os.path.exists(CONFIG["sys_recvlogical_path"]):
Logger.debug(f"❌ 错误: sys_recvlogical不存在 - {CONFIG['sys_recvlogical_path']}")
return
if not os.access(CONFIG["sys_recvlogical_path"], os.X_OK):
Logger.debug(f"❌ 错误: 无执行权限,请执行: chmod +x {CONFIG['sys_recvlogical_path']}")
return
# 初始化全局变量
global running, restart_attempts, last_file_size
running = True
restart_attempts = 0
last_file_size = 0
# 启动sys_recvlogical
if not start_sys_recvlogical():
Logger.debug("❌ 无法启动sys_recvlogical,程序退出")
return
# 启动监控线程
monitor_thread = threading.Thread(target=monitor_loop)
monitor_thread.daemon = False # 非守护线程
monitor_thread.start()
Logger.debug("=== 🚀 WAL监控服务已启动 ===")
Logger.debug("=== 📝 请在数据库执行DML操作测试 ===")
Logger.debug("=== ⚠️ 按 Ctrl+C 停止服务 ===")
# 主线程永久运行
while running:
try:
time.sleep(3600) # 休眠1小时
except KeyboardInterrupt:
running = False
break
# 等待监控线程结束
if monitor_thread and monitor_thread.is_alive():
monitor_thread.join(timeout=5)
# 最终清理
clean_up()
Logger.debug("=== 🛑 WAL监控服务已停止 ===")
if __name__ == "__main__":
main()
# 配置项(根据实际环境修改)
CONFIG = {
"sys_recvlogical_path": "/home/kingbase/Kingbase/ES/V8/Server/bin/sys_recvlogical",
"slot_name": "wal_decoder_slot",
"host": "127.0.0.1",
"port": "54321",
"user": "system",
"dbname": "test",
"password": "你的数据库密码",
"log_path": "/home/kingbase/logs/wal_decoder.log"
}
- 编码强制统一:通过os.environ["PGCLIENTENCODING"] = "UTF8"强制客户端使用 UTF8 编码,与服务端保持一致;
- 兜底编码检测:detect_encoding函数通过chardet检测数据编码,自动转换为 UTF8,无法转换则替换乱码(避免解析中断);
- protobuf 解析:decoderbufs输出的 protobuf 数据通过金仓提供的decoderbufs.proto编译后的 Python 文件解析(需从金仓安装目录拷贝decoderbufs.proto,执行protoc --python_out=. decoderbufs.proto生成decoderbufs_pb2.py)。
# 1. 创建目录
mkdir -p /home/kingbase/scripts /home/kingbase/logs
# 2. 拷贝脚本并修改权限
cp wal_decoder.py /home/kingbase/scripts/
chmod +x /home/kingbase/scripts/wal_decoder.py
chown -R kingbase:kingbase /home/kingbase/scripts /home/kingbase/logs
# 3. 编译protobuf文件(关键)
protoc --python_out=/home/kingbase/scripts/ /home/kingbase/Kingbase/ES/V8/Server/share/extension/decoderbufs.proto
su - kingbase
cd /home/kingbase/scripts
python3 wal_decoder.py
在数据库执行 DML 操作(含中文):
test=# insert into ddh values (1,'你好a');
INSERT 0 1
test=#
脚本输出示例(无乱码):
[2026-03-19 11:49:19.826] [DEBUG] === �� WAL监控服务已启动 ===
[2026-03-19 11:49:19.826] [DEBUG] === �� 请在数据库执行DML操作测试 ===
[2026-03-19 11:49:19.826] [DEBUG] === ⚠️ 按 Ctrl+C 停止服务 ===
[2026-03-19 11:49:27.311] [DEBUG] �� 检测到新数据 - 新增大小: 146 字节
[2026-03-19 11:49:27.312] [DEBUG] ✅ 解析成功: {
"operation": "INSERT",
"schema": "public",
"table": "ddh",
"columns": {
"id": 1,
"name": "你好a"
},
"timestamp": "1773892167311"
}


- protobuf 文件编译:必须使用金仓自带的decoderbufs.proto编译,不同版本的 proto 文件可能不兼容;
- 乱码兜底方案:若仍有乱码,检查数据库表的字符集(需为 UTF8),可通过ALTER TABLE 表名 ALTER COLUMN 字段名 TYPE varchar(255) CHARACTER SET utf8;修改;
- 复制槽清理:解析停止后需及时清理复制槽(SELECT sys_drop_replication_slot('wal_decoder_slot');),避免 WAL 日志堆积占用磁盘
更多推荐

所有评论(0)