数据库的使用

Flask本身不限定数据库的选择,可以选择SQL或NOSQL的任何一种,也可以选择更方便的SQLALchemy,类似于Django的ORM。SQLALchemy实际上是对数据库的抽象,让开发者不用直接和SQL语句打交道,而是通过Python对象来操作数据库,在舍弃一些性能开销的同时,换来的是开发效率的较大提升
SQLAlchemy是一个关系型数据库框架,它提供了高层的ORM和底层的原生数据库的操作
flask-sqlalchemy是一个简化了SQLAlchemy操作的flask扩展
安装pip install flask-sqlalchemy

SQLALchemy常用配置参数
# 连接数据库的URL,格式为:<数据库方式>://<用户名>:<密码>@<IP地址>:<端口号>/<数据库>
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test'
# Python3中连接MySQL需要使用pymysql,连接格式如下:
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:mysql@127.0.0.1:3306/db'

# 数据跟踪,数据库中的表格式修改后,模型类会跟着自动修改
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True

# 查询时显示原始SQL语句
app.config['SQLALCHEMY_ECHO'] = True

# 每次请求结束后自动提交数据库中的改动,不推荐使用
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
定义模型类和生成表
  • 常用的SQLAlchemy字段类型
    类型名python中类型说明
    Integerint普通整数,一般是32位
    SmallIntegerint取值范围小的整数,一般是16位
    BigIntegerint或long不限制精度的整数
    Floatfloat浮点数
    Numericdecimal.Decimal普通整数,一般是32位
    Stringstr变长字符串
    Textstr变长字符串,对较长或不限长度的字符串做了优化
    Unicodeunicode变长Unicode字符串
    UnicodeTextunicode变长Unicode字符串,对较长或不限长度的字符串做了优化
    Booleanbool布尔值
    Datedatetime.date时间
    Timedatetime.datetime日期和时间
    LargeBinarystr二进制文件
  • 常用的SQLAlchemy列选项
    选项名说明
    primary_key如果为True,代表表的主键
    unique如果为True,代表这列不允许出现重复的值
    index如果为True,为这列创建索引,提高查询效率
    nullable如果为True,允许有空值,如果为False,不允许有空值
    default为这列定义默认值
  • 常用的SQLAlchemy关系选项
    选项名说明
    backref在关系的另一模型中添加反向引用
    primary join明确指定两个模型之间使用的联结条件
    uselist如果为False,不使用列表,而使用标量值
    order_by指定关系中记录的排序方式
    secondary指定多对多中记录的排序方式
    secondary join在SQLAlchemy中无法自行决定时,指定多对多关系中的二级联结条件

示例代码:

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

class Config(object):
    # SQLALchemy的配置参数,Python3中需要使用pymysql
    SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://root:mysql@127.0.0.1:3306/db_flask'
    # 数据跟踪,数据库中的表格式修改后,模型类会跟着自动修改
    SQLALCHEMY_TRACK_MODIFICATIONS = True

app.config.from_object(Config)
# 创建数据库SQLAlchemy的工具对象
db = SQLAlchemy(app)

# 创建数据库模型类
class User(db.Model):
    __tablename__ = 'tbl_users'  # 指定表名
    id = db.Column(db.Integer, primary_key=True)  # 整型的主键,默认会自动增长
    name = db.Column(db.String(64), unique=True)
    email = db.Column(db.String(128), unique=True)
    password = db.Column(db.String(128), unique=True)
    role_id = db.Column(db.Integer, db.ForeignKey('tbl_roles.id'))  # 设置外键
    
    # 定义显示信息,定义之后,查询时显示对象的时候更直观
    def __repr__(self):
        return 'User object: name=%s' % self.name

class Role(db.Model):
    __tablename__ = 'tbl_roles'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(32), unique=True)
    users = db.relationship('User', backref='role')  # 建立表之间的关系
    
    # 定义显示信息,定义之后,查询时显示对象的时候更直观
    def __repr__(self):
        return 'Role object: name=%s' % self.name

if __name__ == '__main__':
    db.drop_all()  # 清除数据库中的所有数据
    db.create_all()  # 创建所有的表
添加数据
# 向数据库中添加数据
role1 = Role(name='admin')  # 创建对象
db.session.add(role1)  # 用session记录对象任务,一次添加一条数据
db.session.commit()  # 提交任务到数据库中

role2 = Role(name='stuff')
db.session.add(role2)
db.session.commit()

user1 = User(name='wang', email='wang@163.com', password='123456', role_id=role1.id)
user2 = User(name='zhang', email='zhang@189.com', password='201512', role_id=role2.id)
user3 = User(name='chen', email='chen@126.com', password='987654', role_id=role2.id)
user4 = User(name='zhou', email='zhou@163.com', password='456789', role_id=role1.id)
db.session.add_all([user1, user2, user3, user4])  # 一次添加多条数据
db.session.commit()
查询数据
  • 常用的SQLAlchemy查询过滤器
    过滤器说明
    filter()把过滤器添加到原查询上,返回一个新查询
    filter_by()把等值过滤器添加到原查询上,返回一个新查询
    limit使用指定的值限定原查询返回的结果
    offset()偏移原查询返回的结果,返回一个新查询,相当于跳过几条
    order_by()根据指定条件对原查询结果进行排序,返回一个新查询
    group_by()根据指定条件对原查询结果进行分组,返回一个新查询
  • 常用的SQLAlchemy查询执行器
    方法说明
    all()以列表形式返回查询的所有结果
    first()返回查询的第一个结果,如果未查到,返回None
    first_or_404()返回查询的第一个结果,如果未查到,返回404
    get()返回指定主键对应的行,如不存在,返回None
    get_or_404()返回指定主键对应的行,如不存在,返回404
    count()返回查询结果的数量
    paginate()返回一个Paginate对象,它包含指定范围内的结果
  • 查询的使用
    Role.query.all():查询表中的所有数据,返回一个列表
    Role.query.first():返回查询到的第一条数据
    Role.query.get(2):返回指定主键对应的数据
    db.session.query(Role).all():使用session进行查询,方法都相同
    User.query.filter_by(name='wang').all():返回namewang的结果
    User.query.filter_by(name='wang', role_id=1).all():使用多个条件进行查询
    User.query.filter(User.name=='wang', User.role_id==1).all():使用filter()进行多个条件查询
    User.query.filter().offset().limit().order_by().all():链式调用
    User.query.offset(2).all()offset()相当于跳过几条数据
    User.query.limit(2).all()limit()表示取出几条数据
    User.query.order_by(User.id.desc()).all():通过id降序排列
    或者条件进行查询:
    from sqlalchemy import or_, and_, not_
    li = User.query.filter(or_(User.name=='wang', User.email.endswith('163.com'))).all()
    # and_和not_的使用方法和or_相同
    
    分组并统计数量:
    from sqlalchemy import func
    db.session.query(User.role_id, func.count(User.role_id)).group_by(User.role_id).all()
    # query()中表示的是在结果中显示的内容
    # func.count()表示统计每组的数量
    
  • 分页的使用
    # 查询数据
    user_query = User.query.filter(...).order_by(...)
    # 处理分页,page:当前显示的页数,per_page:每页显示的数量,error_out:自动的错误输出
    page_obj = user_query.paginate(page=1, per_page=3, error_out=False)
    # 获取当前页面数据,
    user_li = page_obj.items
    # 获取总页数
    total_page = page_obj.pages
    
修改和删除数据
  • 修改数据
    user = User.query.get(1) 
    user.name = 'python'
    db.session.add(user) 
    db.session.commit()
    
  • 使用update()修改数据
    User.query.filter_by(name='zhou').update({'name': 'html', 'email': 'html@python.com'})  
    db.session.commit()
    
  • 删除数据
    user = User.query.get(3) 
    db.session.delete(user) 
    db.session.commit()
    

数据库迁移

在开发过程中,需要修改数据库模型,而且还要在修改之后更新数据库。最直接的方式就是删除旧表,但这样会丢失数据。更好的解决办法是使用数据库迁移框架,它可以追踪数据库模式的变化,然后把变动应用到数据库中

在Flask中可以使用Flask-Migrate扩展来实现数据迁移,并且可以集成到Flask-Script中,所有操作通过命令就能完成

为了导出数据库迁移命令,Flask-Migrate提供了一个MigrateCommand类,可以附加到flask-script的manager对象上
安装pip install flask-migrate
pip install Flask-Script

文件代码
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand

app = Flask(__name__)

# 配置参数
class Config(object):
    SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://root:mysql@127.0.0.1:3306/db_flask'
    SQLALCHEMY_TRACK_MODIFICATIONS = True

app.config.from_object(Config)
db = SQLAlchemy(app)

# 创建Flask脚本管理对象
manager = Manager(app)

# 创建数据库迁移工具对象
Migrate(app, db)

# 向manager对象中添加数据库的操作命令
manager.add_command('db', MigrateCommand)

# 定义数据库的模型类
class Author(db.Model):
    __tablename__ = 'tbl_authors'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(32), unique=True)
    books = db.relationship('Book', backref='author')

class Book(db.Model):
    __tablename__ = 'tbl_books'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(64), unique=True)
    author_id = db.Column(db.Integer, db.ForeignKey('tbl_authors.id'))

if __name__ == '__main__':
    manager.run()  # 通过manager对象启动程序
迁移命令

1> 创建迁移仓库:

# 这个命令会自动创建migrations文件夹,所有迁移文件都放在里面
python database.py db init

2> 创建迁移脚本:

# -m的作用是添加备注信息,可不写
python database.py db migrate -m 'this is a remark'

3> 更新数据库:

python database.py db upgrade

4> 查看迁移记录:

python database.py db history

5> 回退数据库:

python database.py db downgrade 版本号
python database.py db downgrade base  # 回退到最初的版本

发送邮件

安装pip install Flask-Mail
参考文档:http://www.pythondoc.com/flask-mail/

from flask import Flask
from flask_mail import Mail, Message

app = Flask(__name__)

# 添加邮件配置:服务器、端口、传输层安全协议、邮箱名、密码/授权码
app.config.update(
    MAIL_SERVER='smtp.qq.com',
    MAIL_PROT=465,
    MAIL_USE_TLS=True,
    MAIL_USERNAME='aaaa@qq.com',
    MAIL_PASSWORD='xxxxxxxxxx',
)

# 创建mail对象
mail = Mail(app)

@app.route('/')
def index():
    # sender:发送方;recipients:接收方列表
    msg = Message("This is a test ", sender='aaaa@qq.com', recipients=['bbbb@qq.com', 'cccc@qq.com'])
    # 邮件正文内容
    msg.body = "Flask test mail"
    # 发送邮件
    mail.send(msg)
    return "Send Succeed"

if __name__ == "__main__":
    app.run(debug=True)
Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐