0.概述

在现代应用程序开发中,数据库是存储和管理数据的核心组件。Python 作为一种强大的编程语言,提供了多种库来连接和操作数据库。本文介绍了如何使用 pymysql 库连接到 MySQL 或 Apache Doris 数据库。首先,本文概述了数据库连接的基本步骤,包括建立连接、执行查询、处理结果和关闭连接。然后,详细讲解了使用 pymysql 库的代码示例,展示了如何通过 Python 程序实现与数据库的交互。最后,本文还讨论了一些常见的数据库操作,如插入数据、查询数据,以及批量处理数据的方法。这些操作对于构建高效的数据库驱动应用至关重要。

1. 连接数据库

import pymysql

# 连接到 Doris
connection = pymysql.connect(
    host='your_doris_host',  # Doris 主机地址
    port=your_doris_port,    # Doris 端口
    user='your_username',    # Doris 用户名
    password='your_password',# Doris 密码
    database='your_database',  # 没有创建就不填
    charset='utf8mb4',
    cursorclass=pymysql.cursors.DictCursor
)

2. 创建数据库

#创建数据库
try:
    with connection.cursor() as cursor:
        # 创建数据库的SQL语句
        create_database_sql = "CREATE DATABASE IF NOT EXISTS loophole_data;"
        
        # 执行创建数据库的SQL语句
        cursor.execute(create_database_sql)
        print(f"数据库 `your_database_name` 创建成功")

finally:
    # 关闭数据库连接
    connection.close()

3. 创建表

try:
    with connection.cursor() as cursor:
        # 创建表的SQL语句
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS cnvd_cnnvd_data (
            id BIGINT,
            data_source VARCHAR(255),
            cvnd_number VARCHAR(255),
            cve_number VARCHAR(255),
            cnnvd_number VARCHAR(255),
            public_date DATE,
            severity_level VARCHAR(50),
            affected_products TEXT,
            vulnerability_description TEXT,
            vulnerability_type VARCHAR(255),
            solution TEXT,
            cve_link TEXT,
            vulnerability_name VARCHAR(1000),
            submission_time DATE,
            patch_link TEXT,
            patch_name TEXT,
            patch_description TEXT,
            reference_url TEXT
        )
        DISTRIBUTED BY HASH(id) BUCKETS 10
        PROPERTIES("replication_num" = "1");
        """
        # 执行创建表的SQL语句
        cursor.execute(create_table_sql)
        connection.commit()
        print("表创建成功")

finally:
    # 关闭数据库连接
    connection.close()

4. 删除表

try:
    with connection.cursor() as cursor:
        # 删除现有表
        drop_table_sql = "DROP TABLE IF EXISTS cnvd_cnnvd_data;"
        cursor.execute(drop_table_sql)
        print("表已删除")

finally:
    # 关闭数据库连接
    connection.close()

5. 获取所有数据库、表名称

 try:
     with connection.cursor() as cursor:
         # 执行SHOW DATABASES命令
         cursor.execute("SHOW DATABASES;")
        
         # 获取所有数据库的名称
         databases = cursor.fetchall()
         print("Doris 中的数据库:")
         for db in databases:
             print(db['Database'])

 finally:
     # 关闭数据库连接
     connection.close()
try:
    with connection.cursor() as cursor:
        # 执行SHOW TABLES命令,查看数据库中的所有表
        cursor.execute("SHOW TABLES;")
        
        # 获取并打印所有表的名称
        tables = cursor.fetchall()
        
        # 如果返回的是字节字符串,需要将其转换为普通字符串
        db_name = connection.db.decode() if isinstance(connection.db, bytes) else connection.db

        print(f"数据库 `{db_name}` 中的表:")
        for table in tables:
            # 直接获取字典的第一个键
            print(table[list(table.keys())[0]])

finally:
    # 关闭数据库连接
    connection.close()

6. 批量插入csv数据


import pandas as pd
cnnvd_data = "cnvd_data_bak/cnnvd_data.csv"

# 读取 CSV 文件
df_cnnvd = pd.read_csv(cnnvd_data)
try:
    with connection.cursor() as cursor:
        # 插入数据的SQL语句
        insert_sql = """
        INSERT INTO cnvd_cnnvd_data (
            id,data_source,vulnerability_name,cnnvd_number,severity_level,cve_number,vulnerability_type,public_date,submission_time,vulnerability_description,
            reference_url,patch_link
        ) VALUES (
           %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
        );
        """
        batch_data = []
        for index, row in df_cnnvd.iterrows():
            id = index + 115488
            data_source = 'cnnvd'
            row = row.fillna('无')
            row_data = (id, data_source) + tuple(row.drop("厂商"))
            batch_data.append(row_data)
            # 当批量数据达到batch_size时,执行插入操作
            if len(batch_data) >= 1000:
                try:
                    cursor.executemany(insert_sql, batch_data)
                    connection.commit()
                    print(f"{len(batch_data)} 条数据已插入")
                    batch_data = []  # 清空批量数据列表
                except Exception as e:
                    print("*"*100)
                    print("插入失败",e)
                    break
        # 插入剩余的数据
        if batch_data:
            cursor.executemany(insert_sql, batch_data)
            connection.commit()
            print(f"{len(batch_data)} 条数据已插入")

finally:
    # 关闭数据库连接
    connection.close()

7. 条件查询

try:
    with connection.cursor() as cursor:
        # 查询特定 id 的数据
        id_to_query = 115488
        select_sql = "SELECT * FROM cnvd_cnnvd_data WHERE id = %s;"
        
        # 执行查询
        cursor.execute(select_sql, (id_to_query,))
        
        # 获取查询结果
        result = cursor.fetchone()
        
        # 打印结果
        if result:
            print(result)
        else:
            print(f"No data found for id = {id_to_query}")
            
finally:
    # 关闭数据库连接
    connection.close()
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐