一. 前言

在国产化替代浪潮下,达梦数据库(DM8)与 Python 现代框架的适配成为开发刚需。本文将基于 dameng/dm8:slim镜像,详细解析在 FastAPI 项目中配置 SQLAlchemy 连接达梦数据库的同步与异步两种模式。

二. 环境准备与依赖安装

# 基础框架
pip install fastapi uvicorn pydantic

# SQLAlchemy 核心
pip install sqlalchemy

# 达梦官方驱动 (关键!)
# 注意:需从达梦安装目录或官网获取对应平台的 dmPython 包
# pip install dmPython-2.4.5-cp38-cp38-win_amd64.whl
# 或使用 pip 安装(若官方已上传)
pip install dmPython

# 异步模式额外依赖
pip install sqlalchemy[asyncio]

在这里插入图片描述

三. 连接适配

1. 同步连接模式(传统阻塞式)

1.1. db_connect.py

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base

# 达梦连接字符串格式: dm+dmPython://user:pass@host:port/database
# 注意:slim 镜像默认实例为 DMSERVER,用户 SYSDBA
DATABASE_URL = "dm+dmPython://SYSDBA:Dameng123@localhost:5236/DMSERVER"

# 创建同步引擎
# echo=True 可输出SQL日志,生产环境建议关闭
engine = create_engine(
    DATABASE_URL,
    echo=False,
    pool_size=5,  # 连接池大小
    max_overflow=10,  # 最大溢出连接数
    pool_pre_ping=True,  # 连接前预检
    connect_args={
        'autoCommit': True,  # 自动提交
        'connection_timeout': 10
    }
)

# 创建会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# 声明基类
Base = declarative_base()

1.2. FastAPI 依赖注入与路由

# main_sync.py
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from config import SessionLocal, engine
from models import User

app = FastAPI(title="DM8 Sync API")

# 创建表(仅开发使用)
Base.metadata.create_all(bind=engine)

# 依赖项:获取数据库会话
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

@app.post("/users/")
def create_user(username: str, email: str, db: Session = Depends(get_db)):
    # 检查用户名是否存在
    if db.query(User).filter(User.username == username).first():
        raise HTTPException(status_code=400, detail="Username already registered")
    
    db_user = User(username=username, email=email)
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

@app.get("/users/{user_id}")
def read_user(user_id: int, db: Session = Depends(get_db)):
    user = db.query(User).filter(User.id == user_id).first()
    if user is None:
        raise HTTPException(status_code=404, detail="User not found")
    return user

2. 异步连接模式(高性能非阻塞)

FastAPI 的强项在于异步,搭配达梦异步驱动可实现高并发。

2.1 异步引擎配置 db_connect.py

重要提示:截至当前,达梦官方 dmPython驱动主要为同步驱动。若需实现真正的异步 I/O,需等待官方发布 dmAsync驱动或使用第三方异步适配器。以下配置为 SQLAlchemy 2.0 异步语法,实际执行仍可能通过线程池转换。

from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import event, text

from project.core.config import settings
from project.core.config.settings import logger

if settings.DBConfig.CONNECT_METHOD == 'dm':
    # 达梦数据库引擎配置
    engine = create_async_engine(
        settings.DBConfig.DM.DM8_URL,
        echo=settings.DBConfig.DM.ECHO,  # 根据环境配置是否输出SQL
        future=True,  # 使用 2.0 风格
        pool_size=20,
        max_overflow=10,
    )
else:
    # 创建mysql异步数据库引擎
    engine = create_async_engine(
        settings.DBConfig.MySql.MYSQL_URL,
        # 连接池配置
        pool_size=settings.DBConfig.MySql.POOL_SIZE,  # 4核 × 2 + 1
        max_overflow=settings.DBConfig.MySql.MAX_OVERFLOW,  # 允许临时增加5个连接
        pool_recycle=settings.DBConfig.MySql.POOL_RECYCLE,  # 30分钟回收连接
        pool_timeout=settings.DBConfig.MySql.POOL_TIMEOUT,  # 获取连接超时30秒
        pool_pre_ping=settings.DBConfig.MySql.POOL_PRE_PING,  # 自动检查连接有效性
        echo=settings.DBConfig.MySql.ECHO,  # 根据环境配置是否输出SQL
        connect_args={
            "connect_timeout": settings.DBConfig.MySql.CONNECT_TIMEOUT,
        }
    )

# 创建异步会话工厂
SessionLocal = sessionmaker(
    bind=engine,
    class_=AsyncSession,
    autocommit=False,
    autoflush=False,
    expire_on_commit=False  # 避免提交后属性访问问题
)

# 声明基类
Base = declarative_base()


async def verify_connection():
    """验证数据库连接是否可用"""
    async with engine.connect() as conn:
        await conn.execute(text("SELECT 1"))
        logger.info("Database connection verified successfully")


async def dispose_engine():
    """关闭连接池并释放所有连接"""
    await engine.dispose()
    logger.info("Database engine disposed successfully")

2.2. astAPI 依赖注入与路由 db_session.py

from contextlib import asynccontextmanager
from fastapi import HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.exc import SQLAlchemyError

from project.core.config.settings import logger
from project.core.database.db_connect import SessionLocal
import asyncio
from project.core.database.db_connect import verify_connection, dispose_engine


@asynccontextmanager
async def _get_db() -> AsyncSession:
    """内部使用的上下文管理器"""
    session = SessionLocal()
    try:
        yield session
        await session.commit()
    except SQLAlchemyError as e:
        await session.rollback()
        logger.error(f"Rollback failed: {e}")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Database operation failed: {str(e)}"
        ) from e
    finally:
        await session.close()


async def get_db() -> AsyncSession:
    """供路由使用的依赖项"""
    async with _get_db() as session:
        yield session


async def test_connection():
    try:
        await verify_connection()
    except Exception as e:
        logger.error(f"Database connection failed: {e}")
    finally:
        await dispose_engine()


asyncio.run(test_connection())

if __name__ == '__main__':
    # 测试数据库连接
    async def test_connection():
        try:
            await verify_connection()
            print("✅ Database connection test successful")
        except Exception as e:
            print(f"❌ Database connection failed: {e}")
        finally:
            await dispose_engine()


    asyncio.run(test_connection())

四. 达梦数据启动

1. 镜像版本v1【uxuclassmate/dameng:latest】启动脚本:

#!/bin/bash

# 配置
NAME="dmdbv1"
PORT="5237"
DATA="dmdatav1"
PASS="dm#1q2w3e"
IMG="xuxuclassmate/dameng:latest"

echo "🚀 启动达梦数据库..."

# 停止并删除旧容器
docker rm -f "$NAME" 2>/dev/null

# 启动新容器
docker run -d \
  --name "$NAME" \
  --restart=always \
  --privileged=true \
  -p "$PORT:5236" \
  -e TZ=Asia/Shanghai \
  -e SYSDBA_PWD="$PASS" \
  -v "$(pwd)/$DATA:/opt/dmdbms/data" \
  "$IMG"

# 检查状态
sleep 3
echo -e "\n📊 状态:"
docker ps | grep "$NAME"

echo -e "\n🔗 连接:"
echo "  host: localhost"
echo "  port: $PORT"
echo "  user: SYSDBA"
echo "  pass: $PASS"

2. 镜像版本v8【dameng/dm8:slim】启动脚本:

#!/bin/bash
# 达梦数据库启动脚本(本地目录挂载版)
# 使用: chmod +x dm8-start.sh && ./dm8-start.sh

# 配置
CONTAINER_NAME="dm8-slim"
HOST_PORT="5236"
SYSDBA_PWD="Dameng123"
IMAGE="dameng/dm8:slim"

# 本地目录
BASE_DIR="./dm8_slim_data"           # 基础目录
DATA_DIR="$BASE_DIR/data"        # 数据目录
ARCH_DIR="$BASE_DIR/arch"        # 归档目录
BAK_DIR="$BASE_DIR/bak"          # 备份目录

echo "================== 启动达梦数据库 =================="

# 检查Docker
if ! docker ps &> /dev/null; then
    echo "❌ Docker未运行"
    exit 1
fi

# 创建本地目录
echo "📁 创建本地目录..."
mkdir -p $DATA_DIR
mkdir -p $ARCH_DIR
mkdir -p $BAK_DIR

# 设置目录权限
chmod 777 $DATA_DIR 2>/dev/null || true
chmod 777 $ARCH_DIR 2>/dev/null || true
chmod 777 $BAK_DIR 2>/dev/null || true

# 清理旧容器
echo "🧹 清理旧容器..."
docker stop $CONTAINER_NAME 2>/dev/null
docker rm $CONTAINER_NAME 2>/dev/null

# 检查镜像
if ! docker images | grep -q "dameng/dm8.*slim"; then
    echo "⬇️ 拉取镜像..."
    docker pull $IMAGE
fi

# 启动容器
echo "🚀 启动容器..."
docker run -d \
  -p $HOST_PORT:5236 \
  --restart=always \
  --name $CONTAINER_NAME \
  --privileged=true \
  -e PAGE_SIZE=32 \
  -e CASE_SENSITIVE=0 \
  -e TZ=Asia/Shanghai \
  -e SYSDBA_PWD="$SYSDBA_PWD" \
  -v $(pwd)/$DATA_DIR:/dmdata/data \
  -v $(pwd)/$ARCH_DIR:/dmdata/arch \
  -v $(pwd)/$BAK_DIR:/dmdata/bak \
  $IMAGE

# 等待启动
echo -n "⏳ 等待启动"
for i in {1..30}; do
    if docker logs $CONTAINER_NAME 2>&1 | tail -10 | grep -q -i "start.*success\|ready\|database.*start"; then
        echo " ✅"
        echo "✅ 启动成功"
        break
    fi
    echo -n "."
    sleep 2
done

# 显示信息
echo ""
echo "🔗 连接信息:"
echo "  容器名称: $CONTAINER_NAME"
echo "  网络:     $NETWORK"
echo "  地址:     localhost:$HOST_PORT"
echo "  用户:     SYSDBA"
echo "  密码:     $SYSDBA_PWD"
echo "  JDBC URL: jdbc:dm://localhost:$HOST_PORT"
echo ""
echo "📁 本地目录:"
echo "  数据目录: $(pwd)/$DATA_DIR"
echo "  归档目录: $(pwd)/$ARCH_DIR"
echo "  备份目录: $(pwd)/$BAK_DIR"
echo ""
echo "🔧 管理命令:"
echo "  查看日志: docker logs -f $CONTAINER_NAME"
echo "  进入容器: docker exec -it $CONTAINER_NAME /bin/bash"
echo "  停止:     docker stop $CONTAINER_NAME"
echo "  启动:     docker start $CONTAINER_NAME"
echo "  删除:     docker rm -f $CONTAINER_NAME"
echo "=================================================="

可通过以下命令创建模式(数据库)或者数据库管理工具

# 创建模式
docker exec -i dm8-slim disql SYSDBA/Dameng123@ip:port << 'EOF'
-- 创建用户(这会自动创建同名模式)
CREATE USER disys IDENTIFIED BY "Disys123";

-- 授予基本权限
GRANT CONNECT TO disys;
GRANT RESOURCE TO disys;
GRANT CREATE TABLE TO disys;
GRANT CREATE VIEW TO disys;

-- 设置表空间配额
ALTER USER disys QUOTA UNLIMITED ON MAIN;

COMMIT;
EOF

五. 总结与注意事项

  • 驱动安装:dmPython是 C 扩展,在 Linux 部署需确保 LD_LIBRARY_PATH包含达梦 bin目录。
  • 异步现状:目前达梦在 Python 生态的异步支持较弱,高并发场景建议使用同步模式 + 多进程部署,或联系达梦官方获取
    dmAsync进展。
  • 数据类型映射:达梦的 CLOB等类型在 SQLAlchemy 中需注意映射,建议使用 String或 Text。
  • 事务控制:达梦默认自动提交可能与其他数据库行为不同,建议在 sessionmaker中显式设置 autocommit=False。

以上就是有关【python+fastapi+sqlalchemy】达梦数据库连接适配的介绍,希望对你有所帮助!

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐