Flask-数据库
Flask使用数据库的简单介绍
·
数据库的使用
Flask本身不限定数据库的选择,可以选择SQL或NOSQL的任何一种,也可以选择更方便的SQLALchemy,类似于Django的ORM。SQLALchemy实际上是对数据库的抽象,让开发者不用直接和SQL语句打交道,而是通过Python对象来操作数据库,在舍弃一些性能开销的同时,换来的是开发效率的较大提升
SQLAlchemy是一个关系型数据库框架,它提供了高层的ORM和底层的原生数据库的操作
flask-sqlalchemy是一个简化了SQLAlchemy操作的flask扩展
安装:pip install flask-sqlalchemy
SQLALchemy常用配置参数
# 连接数据库的URL,格式为:<数据库方式>://<用户名>:<密码>@<IP地址>:<端口号>/<数据库>
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test'
# Python3中连接MySQL需要使用pymysql,连接格式如下:
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:mysql@127.0.0.1:3306/db'
# 数据跟踪,数据库中的表格式修改后,模型类会跟着自动修改
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
# 查询时显示原始SQL语句
app.config['SQLALCHEMY_ECHO'] = True
# 每次请求结束后自动提交数据库中的改动,不推荐使用
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
定义模型类和生成表
- 常用的SQLAlchemy字段类型
类型名 python中类型 说明 Integer int 普通整数,一般是32位 SmallInteger int 取值范围小的整数,一般是16位 BigInteger int或long 不限制精度的整数 Float float 浮点数 Numeric decimal.Decimal 普通整数,一般是32位 String str 变长字符串 Text str 变长字符串,对较长或不限长度的字符串做了优化 Unicode unicode 变长Unicode字符串 UnicodeText unicode 变长Unicode字符串,对较长或不限长度的字符串做了优化 Boolean bool 布尔值 Date datetime.date 时间 Time datetime.datetime 日期和时间 LargeBinary str 二进制文件 - 常用的SQLAlchemy列选项
选项名 说明 primary_key 如果为True,代表表的主键 unique 如果为True,代表这列不允许出现重复的值 index 如果为True,为这列创建索引,提高查询效率 nullable 如果为True,允许有空值,如果为False,不允许有空值 default 为这列定义默认值 - 常用的SQLAlchemy关系选项
选项名 说明 backref 在关系的另一模型中添加反向引用 primary join 明确指定两个模型之间使用的联结条件 uselist 如果为False,不使用列表,而使用标量值 order_by 指定关系中记录的排序方式 secondary 指定多对多中记录的排序方式 secondary join 在SQLAlchemy中无法自行决定时,指定多对多关系中的二级联结条件
示例代码:
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
class Config(object):
# SQLALchemy的配置参数,Python3中需要使用pymysql
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://root:mysql@127.0.0.1:3306/db_flask'
# 数据跟踪,数据库中的表格式修改后,模型类会跟着自动修改
SQLALCHEMY_TRACK_MODIFICATIONS = True
app.config.from_object(Config)
# 创建数据库SQLAlchemy的工具对象
db = SQLAlchemy(app)
# 创建数据库模型类
class User(db.Model):
__tablename__ = 'tbl_users' # 指定表名
id = db.Column(db.Integer, primary_key=True) # 整型的主键,默认会自动增长
name = db.Column(db.String(64), unique=True)
email = db.Column(db.String(128), unique=True)
password = db.Column(db.String(128), unique=True)
role_id = db.Column(db.Integer, db.ForeignKey('tbl_roles.id')) # 设置外键
# 定义显示信息,定义之后,查询时显示对象的时候更直观
def __repr__(self):
return 'User object: name=%s' % self.name
class Role(db.Model):
__tablename__ = 'tbl_roles'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(32), unique=True)
users = db.relationship('User', backref='role') # 建立表之间的关系
# 定义显示信息,定义之后,查询时显示对象的时候更直观
def __repr__(self):
return 'Role object: name=%s' % self.name
if __name__ == '__main__':
db.drop_all() # 清除数据库中的所有数据
db.create_all() # 创建所有的表
添加数据
# 向数据库中添加数据
role1 = Role(name='admin') # 创建对象
db.session.add(role1) # 用session记录对象任务,一次添加一条数据
db.session.commit() # 提交任务到数据库中
role2 = Role(name='stuff')
db.session.add(role2)
db.session.commit()
user1 = User(name='wang', email='wang@163.com', password='123456', role_id=role1.id)
user2 = User(name='zhang', email='zhang@189.com', password='201512', role_id=role2.id)
user3 = User(name='chen', email='chen@126.com', password='987654', role_id=role2.id)
user4 = User(name='zhou', email='zhou@163.com', password='456789', role_id=role1.id)
db.session.add_all([user1, user2, user3, user4]) # 一次添加多条数据
db.session.commit()
查询数据
- 常用的SQLAlchemy查询过滤器
过滤器 说明 filter() 把过滤器添加到原查询上,返回一个新查询 filter_by() 把等值过滤器添加到原查询上,返回一个新查询 limit 使用指定的值限定原查询返回的结果 offset() 偏移原查询返回的结果,返回一个新查询,相当于跳过几条 order_by() 根据指定条件对原查询结果进行排序,返回一个新查询 group_by() 根据指定条件对原查询结果进行分组,返回一个新查询 - 常用的SQLAlchemy查询执行器
方法 说明 all() 以列表形式返回查询的所有结果 first() 返回查询的第一个结果,如果未查到,返回None first_or_404() 返回查询的第一个结果,如果未查到,返回404 get() 返回指定主键对应的行,如不存在,返回None get_or_404() 返回指定主键对应的行,如不存在,返回404 count() 返回查询结果的数量 paginate() 返回一个Paginate对象,它包含指定范围内的结果 - 查询的使用
Role.query.all()
:查询表中的所有数据,返回一个列表
Role.query.first()
:返回查询到的第一条数据
Role.query.get(2)
:返回指定主键对应的数据
db.session.query(Role).all()
:使用session
进行查询,方法都相同
User.query.filter_by(name='wang').all()
:返回name
是wang
的结果
User.query.filter_by(name='wang', role_id=1).all()
:使用多个条件进行查询
User.query.filter(User.name=='wang', User.role_id==1).all()
:使用filter()
进行多个条件查询
User.query.filter().offset().limit().order_by().all()
:链式调用
User.query.offset(2).all()
:offset()
相当于跳过几条数据
User.query.limit(2).all()
:limit()
表示取出几条数据
User.query.order_by(User.id.desc()).all()
:通过id
降序排列
或者条件进行查询:
分组并统计数量:from sqlalchemy import or_, and_, not_ li = User.query.filter(or_(User.name=='wang', User.email.endswith('163.com'))).all() # and_和not_的使用方法和or_相同
from sqlalchemy import func db.session.query(User.role_id, func.count(User.role_id)).group_by(User.role_id).all() # query()中表示的是在结果中显示的内容 # func.count()表示统计每组的数量
- 分页的使用
# 查询数据 user_query = User.query.filter(...).order_by(...) # 处理分页,page:当前显示的页数,per_page:每页显示的数量,error_out:自动的错误输出 page_obj = user_query.paginate(page=1, per_page=3, error_out=False) # 获取当前页面数据, user_li = page_obj.items # 获取总页数 total_page = page_obj.pages
修改和删除数据
- 修改数据
user = User.query.get(1) user.name = 'python' db.session.add(user) db.session.commit()
- 使用update()修改数据
User.query.filter_by(name='zhou').update({'name': 'html', 'email': 'html@python.com'}) db.session.commit()
- 删除数据
user = User.query.get(3) db.session.delete(user) db.session.commit()
数据库迁移
在开发过程中,需要修改数据库模型,而且还要在修改之后更新数据库。最直接的方式就是删除旧表,但这样会丢失数据。更好的解决办法是使用数据库迁移框架,它可以追踪数据库模式的变化,然后把变动应用到数据库中
在Flask中可以使用Flask-Migrate扩展来实现数据迁移,并且可以集成到Flask-Script中,所有操作通过命令就能完成
为了导出数据库迁移命令,Flask-Migrate提供了一个MigrateCommand类,可以附加到flask-script的manager对象上
安装:pip install flask-migrate
pip install Flask-Script
文件代码
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand
app = Flask(__name__)
# 配置参数
class Config(object):
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://root:mysql@127.0.0.1:3306/db_flask'
SQLALCHEMY_TRACK_MODIFICATIONS = True
app.config.from_object(Config)
db = SQLAlchemy(app)
# 创建Flask脚本管理对象
manager = Manager(app)
# 创建数据库迁移工具对象
Migrate(app, db)
# 向manager对象中添加数据库的操作命令
manager.add_command('db', MigrateCommand)
# 定义数据库的模型类
class Author(db.Model):
__tablename__ = 'tbl_authors'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(32), unique=True)
books = db.relationship('Book', backref='author')
class Book(db.Model):
__tablename__ = 'tbl_books'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(64), unique=True)
author_id = db.Column(db.Integer, db.ForeignKey('tbl_authors.id'))
if __name__ == '__main__':
manager.run() # 通过manager对象启动程序
迁移命令
1> 创建迁移仓库:
# 这个命令会自动创建migrations文件夹,所有迁移文件都放在里面
python database.py db init
2> 创建迁移脚本:
# -m的作用是添加备注信息,可不写
python database.py db migrate -m 'this is a remark'
3> 更新数据库:
python database.py db upgrade
4> 查看迁移记录:
python database.py db history
5> 回退数据库:
python database.py db downgrade 版本号
python database.py db downgrade base # 回退到最初的版本
发送邮件
安装:pip install Flask-Mail
参考文档:http://www.pythondoc.com/flask-mail/
from flask import Flask
from flask_mail import Mail, Message
app = Flask(__name__)
# 添加邮件配置:服务器、端口、传输层安全协议、邮箱名、密码/授权码
app.config.update(
MAIL_SERVER='smtp.qq.com',
MAIL_PROT=465,
MAIL_USE_TLS=True,
MAIL_USERNAME='aaaa@qq.com',
MAIL_PASSWORD='xxxxxxxxxx',
)
# 创建mail对象
mail = Mail(app)
@app.route('/')
def index():
# sender:发送方;recipients:接收方列表
msg = Message("This is a test ", sender='aaaa@qq.com', recipients=['bbbb@qq.com', 'cccc@qq.com'])
# 邮件正文内容
msg.body = "Flask test mail"
# 发送邮件
mail.send(msg)
return "Send Succeed"
if __name__ == "__main__":
app.run(debug=True)
更多推荐
已为社区贡献1条内容
所有评论(0)