【python+fastapi+sqlalchemy】达梦数据库连接适配
【代码】达梦数据库适配。
·
一. 前言
在国产化替代浪潮下,达梦数据库(DM8)与 Python 现代框架的适配成为开发刚需。本文将基于 dameng/dm8:slim镜像,详细解析在 FastAPI 项目中配置 SQLAlchemy 连接达梦数据库的同步与异步两种模式。
二. 环境准备与依赖安装
# 基础框架
pip install fastapi uvicorn pydantic
# SQLAlchemy 核心
pip install sqlalchemy
# 达梦官方驱动 (关键!)
# 注意:需从达梦安装目录或官网获取对应平台的 dmPython 包
# pip install dmPython-2.4.5-cp38-cp38-win_amd64.whl
# 或使用 pip 安装(若官方已上传)
pip install dmPython
# 异步模式额外依赖
pip install sqlalchemy[asyncio]

三. 连接适配
1. 同步连接模式(传统阻塞式)
1.1. db_connect.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
# 达梦连接字符串格式: dm+dmPython://user:pass@host:port/database
# 注意:slim 镜像默认实例为 DMSERVER,用户 SYSDBA
DATABASE_URL = "dm+dmPython://SYSDBA:Dameng123@localhost:5236/DMSERVER"
# 创建同步引擎
# echo=True 可输出SQL日志,生产环境建议关闭
engine = create_engine(
DATABASE_URL,
echo=False,
pool_size=5, # 连接池大小
max_overflow=10, # 最大溢出连接数
pool_pre_ping=True, # 连接前预检
connect_args={
'autoCommit': True, # 自动提交
'connection_timeout': 10
}
)
# 创建会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 声明基类
Base = declarative_base()
1.2. FastAPI 依赖注入与路由
# main_sync.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from config import SessionLocal, engine
from models import User
app = FastAPI(title="DM8 Sync API")
# 创建表(仅开发使用)
Base.metadata.create_all(bind=engine)
# 依赖项:获取数据库会话
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
@app.post("/users/")
def create_user(username: str, email: str, db: Session = Depends(get_db)):
# 检查用户名是否存在
if db.query(User).filter(User.username == username).first():
raise HTTPException(status_code=400, detail="Username already registered")
db_user = User(username=username, email=email)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@app.get("/users/{user_id}")
def read_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
return user
2. 异步连接模式(高性能非阻塞)
FastAPI 的强项在于异步,搭配达梦异步驱动可实现高并发。
2.1 异步引擎配置 db_connect.py
重要提示:截至当前,达梦官方 dmPython驱动主要为同步驱动。若需实现真正的异步 I/O,需等待官方发布 dmAsync驱动或使用第三方异步适配器。以下配置为 SQLAlchemy 2.0 异步语法,实际执行仍可能通过线程池转换。
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import event, text
from project.core.config import settings
from project.core.config.settings import logger
if settings.DBConfig.CONNECT_METHOD == 'dm':
# 达梦数据库引擎配置
engine = create_async_engine(
settings.DBConfig.DM.DM8_URL,
echo=settings.DBConfig.DM.ECHO, # 根据环境配置是否输出SQL
future=True, # 使用 2.0 风格
pool_size=20,
max_overflow=10,
)
else:
# 创建mysql异步数据库引擎
engine = create_async_engine(
settings.DBConfig.MySql.MYSQL_URL,
# 连接池配置
pool_size=settings.DBConfig.MySql.POOL_SIZE, # 4核 × 2 + 1
max_overflow=settings.DBConfig.MySql.MAX_OVERFLOW, # 允许临时增加5个连接
pool_recycle=settings.DBConfig.MySql.POOL_RECYCLE, # 30分钟回收连接
pool_timeout=settings.DBConfig.MySql.POOL_TIMEOUT, # 获取连接超时30秒
pool_pre_ping=settings.DBConfig.MySql.POOL_PRE_PING, # 自动检查连接有效性
echo=settings.DBConfig.MySql.ECHO, # 根据环境配置是否输出SQL
connect_args={
"connect_timeout": settings.DBConfig.MySql.CONNECT_TIMEOUT,
}
)
# 创建异步会话工厂
SessionLocal = sessionmaker(
bind=engine,
class_=AsyncSession,
autocommit=False,
autoflush=False,
expire_on_commit=False # 避免提交后属性访问问题
)
# 声明基类
Base = declarative_base()
async def verify_connection():
"""验证数据库连接是否可用"""
async with engine.connect() as conn:
await conn.execute(text("SELECT 1"))
logger.info("Database connection verified successfully")
async def dispose_engine():
"""关闭连接池并释放所有连接"""
await engine.dispose()
logger.info("Database engine disposed successfully")
2.2. astAPI 依赖注入与路由 db_session.py
from contextlib import asynccontextmanager
from fastapi import HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import SQLAlchemyError
from project.core.config.settings import logger
from project.core.database.db_connect import SessionLocal
import asyncio
from project.core.database.db_connect import verify_connection, dispose_engine
@asynccontextmanager
async def _get_db() -> AsyncSession:
"""内部使用的上下文管理器"""
session = SessionLocal()
try:
yield session
await session.commit()
except SQLAlchemyError as e:
await session.rollback()
logger.error(f"Rollback failed: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Database operation failed: {str(e)}"
) from e
finally:
await session.close()
async def get_db() -> AsyncSession:
"""供路由使用的依赖项"""
async with _get_db() as session:
yield session
async def test_connection():
try:
await verify_connection()
except Exception as e:
logger.error(f"Database connection failed: {e}")
finally:
await dispose_engine()
asyncio.run(test_connection())
if __name__ == '__main__':
# 测试数据库连接
async def test_connection():
try:
await verify_connection()
print("✅ Database connection test successful")
except Exception as e:
print(f"❌ Database connection failed: {e}")
finally:
await dispose_engine()
asyncio.run(test_connection())
四. 达梦数据启动
1. 镜像版本v1【uxuclassmate/dameng:latest】启动脚本:
#!/bin/bash
# 配置
NAME="dmdbv1"
PORT="5237"
DATA="dmdatav1"
PASS="dm#1q2w3e"
IMG="xuxuclassmate/dameng:latest"
echo "🚀 启动达梦数据库..."
# 停止并删除旧容器
docker rm -f "$NAME" 2>/dev/null
# 启动新容器
docker run -d \
--name "$NAME" \
--restart=always \
--privileged=true \
-p "$PORT:5236" \
-e TZ=Asia/Shanghai \
-e SYSDBA_PWD="$PASS" \
-v "$(pwd)/$DATA:/opt/dmdbms/data" \
"$IMG"
# 检查状态
sleep 3
echo -e "\n📊 状态:"
docker ps | grep "$NAME"
echo -e "\n🔗 连接:"
echo " host: localhost"
echo " port: $PORT"
echo " user: SYSDBA"
echo " pass: $PASS"
2. 镜像版本v8【dameng/dm8:slim】启动脚本:
#!/bin/bash
# 达梦数据库启动脚本(本地目录挂载版)
# 使用: chmod +x dm8-start.sh && ./dm8-start.sh
# 配置
CONTAINER_NAME="dm8-slim"
HOST_PORT="5236"
SYSDBA_PWD="Dameng123"
IMAGE="dameng/dm8:slim"
# 本地目录
BASE_DIR="./dm8_slim_data" # 基础目录
DATA_DIR="$BASE_DIR/data" # 数据目录
ARCH_DIR="$BASE_DIR/arch" # 归档目录
BAK_DIR="$BASE_DIR/bak" # 备份目录
echo "================== 启动达梦数据库 =================="
# 检查Docker
if ! docker ps &> /dev/null; then
echo "❌ Docker未运行"
exit 1
fi
# 创建本地目录
echo "📁 创建本地目录..."
mkdir -p $DATA_DIR
mkdir -p $ARCH_DIR
mkdir -p $BAK_DIR
# 设置目录权限
chmod 777 $DATA_DIR 2>/dev/null || true
chmod 777 $ARCH_DIR 2>/dev/null || true
chmod 777 $BAK_DIR 2>/dev/null || true
# 清理旧容器
echo "🧹 清理旧容器..."
docker stop $CONTAINER_NAME 2>/dev/null
docker rm $CONTAINER_NAME 2>/dev/null
# 检查镜像
if ! docker images | grep -q "dameng/dm8.*slim"; then
echo "⬇️ 拉取镜像..."
docker pull $IMAGE
fi
# 启动容器
echo "🚀 启动容器..."
docker run -d \
-p $HOST_PORT:5236 \
--restart=always \
--name $CONTAINER_NAME \
--privileged=true \
-e PAGE_SIZE=32 \
-e CASE_SENSITIVE=0 \
-e TZ=Asia/Shanghai \
-e SYSDBA_PWD="$SYSDBA_PWD" \
-v $(pwd)/$DATA_DIR:/dmdata/data \
-v $(pwd)/$ARCH_DIR:/dmdata/arch \
-v $(pwd)/$BAK_DIR:/dmdata/bak \
$IMAGE
# 等待启动
echo -n "⏳ 等待启动"
for i in {1..30}; do
if docker logs $CONTAINER_NAME 2>&1 | tail -10 | grep -q -i "start.*success\|ready\|database.*start"; then
echo " ✅"
echo "✅ 启动成功"
break
fi
echo -n "."
sleep 2
done
# 显示信息
echo ""
echo "🔗 连接信息:"
echo " 容器名称: $CONTAINER_NAME"
echo " 网络: $NETWORK"
echo " 地址: localhost:$HOST_PORT"
echo " 用户: SYSDBA"
echo " 密码: $SYSDBA_PWD"
echo " JDBC URL: jdbc:dm://localhost:$HOST_PORT"
echo ""
echo "📁 本地目录:"
echo " 数据目录: $(pwd)/$DATA_DIR"
echo " 归档目录: $(pwd)/$ARCH_DIR"
echo " 备份目录: $(pwd)/$BAK_DIR"
echo ""
echo "🔧 管理命令:"
echo " 查看日志: docker logs -f $CONTAINER_NAME"
echo " 进入容器: docker exec -it $CONTAINER_NAME /bin/bash"
echo " 停止: docker stop $CONTAINER_NAME"
echo " 启动: docker start $CONTAINER_NAME"
echo " 删除: docker rm -f $CONTAINER_NAME"
echo "=================================================="
可通过以下命令创建模式(数据库)或者数据库管理工具
# 创建模式
docker exec -i dm8-slim disql SYSDBA/Dameng123@ip:port << 'EOF'
-- 创建用户(这会自动创建同名模式)
CREATE USER disys IDENTIFIED BY "Disys123";
-- 授予基本权限
GRANT CONNECT TO disys;
GRANT RESOURCE TO disys;
GRANT CREATE TABLE TO disys;
GRANT CREATE VIEW TO disys;
-- 设置表空间配额
ALTER USER disys QUOTA UNLIMITED ON MAIN;
COMMIT;
EOF
五. 总结与注意事项
- 驱动安装:dmPython是 C 扩展,在 Linux 部署需确保 LD_LIBRARY_PATH包含达梦 bin目录。
- 异步现状:目前达梦在 Python 生态的异步支持较弱,高并发场景建议使用同步模式 + 多进程部署,或联系达梦官方获取
dmAsync进展。 - 数据类型映射:达梦的 CLOB等类型在 SQLAlchemy 中需注意映射,建议使用 String或 Text。
- 事务控制:达梦默认自动提交可能与其他数据库行为不同,建议在 sessionmaker中显式设置 autocommit=False。
以上就是有关【python+fastapi+sqlalchemy】达梦数据库连接适配的介绍,希望对你有所帮助!
更多推荐
所有评论(0)