https://flask-sqlalchemy.palletsprojects.com
安装
pip
pip install Flask-SQLAlchemy
版本
# 查看版本,3.2.x废弃
import flask_sqlalchemy
print(flask_sqlalchemy.__version__)
# 3.1.1
# 查看版本
from importlib import metadata
print(metadata.version("flask-sqlalchemy"))
# 3.1.1
示例
flask-sqlalchemy 3.0.x以及之前的风格
https://flask-sqlalchemy.palletsprojects.com/en/3.0.x/
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import ForeignKey, create_engine
from sqlalchemy.orm import relationship, sessionmaker
app = Flask(__name__)
app.config['SQLALCHEMY_ECHO'] = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:mYsql123456_@192.168.10.128:3306/test_db'
db = SQLAlchemy()
db.init_app(app)
class User(db.Model):
__tablename__ = 't_user'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(length=20))
age = db.Column(db.Integer())
addresses = relationship('Address')
def __repr__(self):
return f"User(id={self.id}, name={self.name}, age={self.age}, addresses={self.addresses})"
class Address(db.Model):
__tablename__ = 't_address'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, ForeignKey('t_user.id'))
country = db.Column(db.String(length=30))
province = db.Column(db.String(length=30))
city = db.Column(db.String(length=30))
detail = db.Column(db.String(length=150))
def __repr__(self):
return (f"Address(id={self.id}, user_id={self.user_id}, country={self.country}, province={self.province}, "
f"city={self.city}, detail={self.detail})")
with app.app_context():
# 必须在flask上下文中执行
db.drop_all()
db.create_all()
@app.get("/")
def index():
# flask_sqlalchemy.query.Query是sqlalchemy.orm.query.Query的子类,增强了相关功能,例如分页
from flask_sqlalchemy.query import Query
# 风格一:SQLAlchemy风格
query: Query = db.session.query(User)
print(query.all())
# 风格二:Flask-SQLAlchemy风格
query: Query = User.query
print(query.all())
user_info = db.session.query(User).paginate(page=1, per_page=10, error_out=False)
print(user_info.total)
return "success"
@app.route('/1')
def example1():
with db.session.begin():
print(User.query.filter(User.name == 'Jane').first())
with db.session.begin_nested():
print(User.query.filter(User.name == 'Jane').first())
with db.session.begin_nested():
print(User.query.filter(User.name == 'Jane').first())
db.session.commit()
with db.session.begin_nested():
print(User.query.filter(User.name == 'Jane').first())
return "success"
@app.route('/2')
def example2():
engine = create_engine(app.config['SQLALCHEMY_DATABASE_URI'])
g_session = sessionmaker(bind=engine, autobegin=False)()
db.session.begin()
user = db.session.query(User).filter(User.name == "Jack").first()
if user:
raise Exception('此时不应当查到Jack因为没有数据')
with g_session.begin():
g_session.add(User(name="Jack"))
user = db.session.query(User).filter(User.name == "Jack").first()
if not user:
raise Exception('查不到Jack是因为事务')
db.session.commit()
return "success"
@app.route('/3')
def example3():
engine = create_engine(app.config['SQLALCHEMY_DATABASE_URI'])
g_session = sessionmaker(bind=engine, autobegin=False)()
db.session.begin()
with g_session.begin():
g_session.add(User(name="Alex"))
user = db.session.query(User).filter(User.name == "Alex").first()
# begin不是在数据库层面开启事务,所以这里能查到数据
assert user is not None
db.session.commit()
return "success"
if __name__ == '__main__':
app.run(debug=True)
3.1.x的风格
https://flask-sqlalchemy.palletsprojects.com/en/3.1.x/
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
# 数据库连接地址
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test'
# 追踪修改
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
# 打印原始SQL
app.config['SQLALCHEMY_ECHO'] = True
# SQLALCHEMY_ENGINE_OPTIONS
db = SQLAlchemy(app)
class Goods(db.Model):
__tablename__ = 't_good' # 设置表名
id = db.Column(db.Integer, primary_key=True) # 设置主键
name = db.Column(db.String(20), unique=True) # 商品名称
count = db.Column(db.Integer) # 剩余数量
def xxx():
# 重置数据库数据
db.drop_all()
db.create_all()
# 添加一条测试数据
goods = Goods(name='方便面', count=1)
db.session.add(goods)
db.session.commit()
app.run(debug=True)
if __name__ == '__main__':
# 删除所有继承自db.Model的表
db.drop_all()
# 创建所有继承自db.Model的表
db.create_all()
# 添加一条测试数据
goods = Goods(name='方便面', count=1)
db.session.add(goods)
db.session.commit()
app.run(debug=True)
# 更新方式1: 先查询后更新
# 缺点: 并发情况下, 容易出现更新丢失问题 (Lost Update)
# 1.执行查询语句, 获取目标模型对象
goods = Goods.query.filter(Goods.name == '方便面').first()
# 2.对模型对象的属性进行赋值 (更新数据)
goods.count = goods.count - 1
# 3.提交会话
db.session.commit()
# 更新方式2: update子查询 可以避免更新丢失问题
# update t_good set count = count - 1 where name = '方便面';
Goods.query.filter(Goods.name == '方便面').update({'count': Goods.count - 1})
# 提交会话
db.session.commit()
# session.commit() # 直接将数据库内存中的数据提交到数据库,此操作会内部调用session.flush(),其余的事务可以访问最新的数据;
#
# session.rollback() # 是回滚当前事务的变更数据操作;
#
# session.flush() # 作用是在事务管理内与数据库发生交互, 对应的实例状态被反映到数据库,比如自增 ID 被填充上值,但是数据库中当前事务的数据值并未更新上;相当于预提交,等于提交到数据库内存,还未写入数据库文件;deletions and modifications to the database as INSERTs, DELETEs, UPDATE;
#
# session.merge(obj) # 查询更新操作;就是更新之前先查询,如果没有自动插入;
def xxx():
try:
user = session.Query(User).first()
user.name = u'改名字
session.commit()
except:
session.rollback()
def xxx2():
t1 = Admin(username='test', password='123456') # 生成admin表要插入的一条数据
t2 = Admin(username='test1', password='abcdef') # 生成admin表要插入的一条数据
session.add(t2)
try:
with session.begin_nested():
session.add(t1) # 或使用session.merge(t1),表示查询更新操作
except Exception as e:
print(e)
session.rollback()
session.commit()
def xxx3():
t1 = Admin(username='test1', password='123456') # 生成admin表要插入的一条数据
t2 = Admin(username='test2', password='abcdef') # 生成admin表要插入的一条数据
session.add(t2)
# 创建一个子嵌套事务,第一个commit只是将子事务的数据托管到父事务,并未提交到数据库
session.begin_nested()
session.add(t1)
session.commit()
# 父事务执行提交,才真正将t1,t2提交到数据库
session.commit()
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
# 设置数据库连接地址
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:mysql@127.0.0.1:3306/test31'
# 是否追踪数据库修改(开启后会触发一些钩子函数) 一般不开启, 会影响性能
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 是否显示底层执行的SQL语句
app.config['SQLALCHEMY_ECHO'] = True
db = SQLAlchemy(app)
# 构建模型类 类->表 类属性->字段 实例对象->记录
# 用户表 一 一个用户可以有多个地址
class User(db.Model):
__tablename__ = 't_user'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(20))
addresses = db.relationship('Address') # 1.定义关系属性 relationship("关联数据所在的模型类")
# 地址表 多
class Address(db.Model):
__tablename__ = 't_adr'
id = db.Column(db.Integer, primary_key=True)
# 2. 外键字段设置外键参数 db.ForeignKey('主表名.主键')
user_id = db.Column(db.Integer, db.ForeignKey('t_user.id'))
detail = db.Column(db.String(20))
@app.route("/add", methods=["GET"])
def add():
# sqlalchemy会自动创建隐式事务
user1 = User(name='zs', age=20)
db.session.add(user1)
# 添加多条记录
# db.session.add_all([user1, user2, user3])
db.session.commit()
@app.route("/update", methods=["GET"])
def update():
# 更新方式1: 先查询后更新
# 缺点: 并发情况下, 容易出现更新丢失问题 (Lost Update)
# 1.执行查询语句, 获取目标模型对象
user = User.query.filter(User.name == 'zs').first()
# 2.对模型对象的属性进行赋值 (更新数据)
user.name = 'jack'
# 3.提交会话
db.session.commit()
# 更新方式2: update子查询 可以避免更新丢失问题
# update t_good set count = count - 1 where name = '方便面';
User.query.filter(User.name == 'zs').update({'name': 'jack'})
# 提交会话
db.session.commit()
@app.route("/delete", methods=["GET"])
def delete():
# 方式1: 先查后删除
users = User.query.filter(User.name == 'jack').first()
# 删除数据
db.session.delete(users)
# 提交会话 增删改都要提交会话
db.session.commit()
# 方式2: delete子查询
User.query.filter(User.name == 'jack').delete()
# 提交会话
db.session.commit()
@app.route("/read", methods=["GET"])
def read():
# 查询所有用户数据
User.query.all() # 返回列表, 元素为模型对象
# 查询有多少个用户
User.query.count()
# 查询第1个用户
User.query.first()
# 返回模型对象 / None
# 查询id为4的用户[3种方式]
# 方式1: 根据id查询 返回模型对象/None
User.query.get(4)
# 方式2: 等值过滤器 关键字实参设置字段值 返回BaseQuery对象
# BaseQuery对象可以续接其他过滤器/执行器 如 all/count/first等
User.query.filter_by(id=4).all()
# 方式3: 复杂过滤器 参数为比较运算/函数引用等 返回BaseQuery对象
User.query.filter(User.id == 4).first()
# 查询名字结尾字符为g的所有用户[开始 / 包含]
User.query.filter(User.name.endswith("g")).all()
User.query.filter(User.name.startswith("w")).all()
User.query.filter(User.name.contains("n")).all()
User.query.filter(User.name.like("w%n%g")).all() # 模糊查询
# 查询名字和邮箱都以li开头的所有用户[2种方式]
User.query.filter(User.name.startswith('li'), User.email.startswith('li')).all()
from sqlalchemy import and_
User.query.filter(and_(User.name.startswith('li'), User.email.startswith('li'))).all()
# 查询age是25 或者 `email`以`itheima.com`结尾的所有用户
from sqlalchemy import or_
User.query.filter(or_(User.age == 25, User.email.endswith("itheima.com"))).all()
# 查询名字不等于wang的所有用户[2种方式]
from sqlalchemy import not_
User.query.filter(not_(User.name == 'wang')).all()
User.query.filter(User.name != 'wang').all()
# 查询id为[1, 3, 5, 7, 9]的用户
User.query.filter(User.id.in_([1, 3, 5, 7, 9])).all()
# 所有用户先按年龄从小到大, 再按id从大到小排序, 取前5个
User.query.order_by(User.age, User.id.desc()).limit(5).all()
# 查询年龄从小到大第2-5位的数据 2 3 4 5
User.query.order_by(User.age).offset(1).limit(4).all()
# 分页查询, 每页3个, 查询第2页的数据 paginate(页码, 每页条数)
pn = User.query.paginate(2, 3)
# pn.pages
# 总页数
# pn.page
# 当前页码
# pn.items
# 当前页的数据
# pn.total
# 总条数
# 查询每个年龄的人数 select age, count(name) from t_user group by age 分组聚合
from sqlalchemy import func
data = db.session.query(User.age, func.count(User.id).label("count")).group_by(User.age).all()
for item in data:
# print(item[0], item[1])
print(item.age, item.count) # 建议通过label()方法给字段起别名, 以属性方式获取数据
# 只查询所有人的姓名和邮箱 优化查询 User.query.all() # 相当于select *
from sqlalchemy.orm import load_only
data = User.query.options(load_only(User.name, User.email)).all() # flask-sqlalchemy的语法
for item in data:
print(item.name, item.email)
data = db.session.query(User.name, User.email).all() # sqlalchemy本体的语法
for item in data:
print(item.name, item.email)
@app.route("/related_read", methods=["GET"])
def related_read():
# 关联查询步骤: (以主查从为例)
# 先查询主表数据
# 再通过外键字段查询
# 关联的从表数据
# 1.
user1 = User.query.filter_by(name='张三').first()
adrs = Address.query.filter_by(user_id=user1.id).all()
for adr in adrs:
print(adr.detail)
# 2.关系属性
user1 = User.query.filter_by(name='张三').first()
for address in user1.addresses:
print(address.detail)
# 3.join
# 格式: db.session.query(主表模型字段1, 主表模型字段2, 从表模型字段1, xx..).join(从表模型类, 主表模型类.主键 == 从表模型类.外键)
# sqlalchemy的join查询
data = db.session.query(User.id, Address.detail).join(Address, User.id == Address.user_id).filter(
User.name == '张三').all()
for item in data:
print(item.detail, item.id)
# 三者对比
# 无论使用 外键 还是 关系属性 查询关联数据, 都需要查询两次, 一次查询用户数据, 一次查询地址数据
#
# 两次查询就需要发送两次请求给数据库服务器, 如果数据库和web应用不在一台服务器中, 则 网络IO会对查询效率产生一定影响
#
# 可以考虑使用 连接查询 join 使用一条语句就完成关联数据的查询
@app.route("/session", methods=["GET"])
def session():
"""
session声明周期
:return:
"""
# flask-sqlalchemy 对于 sqlalchemy本体 的 Session 进行了一定的封装:
# Session的生命周期和请求相近
#
# 请求中的首次数据操作会创建Session
# 整个请求过程中使用的Session为同一个, 并且线程隔离
# 请求结束时会自动销毁Session(释放内存)
# db.session.flush()
"""事务1"""
try:
user1 = User(name='zs', age=20)
db.session.add(user1)
db.session.commit()
except BaseException:
# 手动回滚 同一个session中, 前一个事务如果失败, 必须手动回滚, 否则无法创建新的事务
db.session.rollback()
"""事务2"""
user1 = User(name='lisi', age=30)
db.session.add(user1)
db.session.commit()
if __name__ == '__main__':
db.drop_all()
db.create_all()
app.run(debug=True)