Flask-SQLAlchemy

予早 2024-05-27 22:50:23
Categories: Tags:

https://flask-sqlalchemy.palletsprojects.com

安装

pip

pip install Flask-SQLAlchemy

版本

# 查看版本,3.2.x废弃
import flask_sqlalchemy
print(flask_sqlalchemy.__version__)
# 3.1.1

# 查看版本
from importlib import metadata
print(metadata.version("flask-sqlalchemy"))
# 3.1.1

示例

flask-sqlalchemy 3.0.x以及之前的风格

https://flask-sqlalchemy.palletsprojects.com/en/3.0.x/

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

from sqlalchemy import ForeignKey, create_engine
from sqlalchemy.orm import relationship, sessionmaker

app = Flask(__name__)
app.config['SQLALCHEMY_ECHO'] = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:mYsql123456_@192.168.10.128:3306/test_db'

db = SQLAlchemy()
db.init_app(app)


class User(db.Model):
    __tablename__ = 't_user'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(length=20))
    age = db.Column(db.Integer())
    addresses = relationship('Address')

    def __repr__(self):
        return f"User(id={self.id}, name={self.name}, age={self.age}, addresses={self.addresses})"


class Address(db.Model):
    __tablename__ = 't_address'
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, ForeignKey('t_user.id'))
    country = db.Column(db.String(length=30))
    province = db.Column(db.String(length=30))
    city = db.Column(db.String(length=30))
    detail = db.Column(db.String(length=150))

    def __repr__(self):
        return (f"Address(id={self.id}, user_id={self.user_id}, country={self.country}, province={self.province}, "
                f"city={self.city}, detail={self.detail})")


with app.app_context():
    # 必须在flask上下文中执行
    db.drop_all()
    db.create_all()


@app.get("/")
def index():
    # flask_sqlalchemy.query.Query是sqlalchemy.orm.query.Query的子类,增强了相关功能,例如分页
    from flask_sqlalchemy.query import Query

    # 风格一:SQLAlchemy风格
    query: Query = db.session.query(User)
    print(query.all())

    # 风格二:Flask-SQLAlchemy风格
    query: Query = User.query
    print(query.all())

    user_info = db.session.query(User).paginate(page=1, per_page=10, error_out=False)
    print(user_info.total)
    return "success"


@app.route('/1')
def example1():
    with db.session.begin():
        print(User.query.filter(User.name == 'Jane').first())

    with db.session.begin_nested():
        print(User.query.filter(User.name == 'Jane').first())

    with db.session.begin_nested():
        print(User.query.filter(User.name == 'Jane').first())

    db.session.commit()
    with db.session.begin_nested():
        print(User.query.filter(User.name == 'Jane').first())

    return "success"


@app.route('/2')
def example2():
    engine = create_engine(app.config['SQLALCHEMY_DATABASE_URI'])
    g_session = sessionmaker(bind=engine, autobegin=False)()

    db.session.begin()
    user = db.session.query(User).filter(User.name == "Jack").first()
    if user:
        raise Exception('此时不应当查到Jack因为没有数据')
    with g_session.begin():
        g_session.add(User(name="Jack"))
    user = db.session.query(User).filter(User.name == "Jack").first()
    if not user:
        raise Exception('查不到Jack是因为事务')
    db.session.commit()

    return "success"


@app.route('/3')
def example3():
    engine = create_engine(app.config['SQLALCHEMY_DATABASE_URI'])
    g_session = sessionmaker(bind=engine, autobegin=False)()

    db.session.begin()
    with g_session.begin():
        g_session.add(User(name="Alex"))
    user = db.session.query(User).filter(User.name == "Alex").first()
    # begin不是在数据库层面开启事务,所以这里能查到数据
    assert user is not None
    db.session.commit()

    return "success"


if __name__ == '__main__':
    app.run(debug=True)

3.1.x的风格

https://flask-sqlalchemy.palletsprojects.com/en/3.1.x/

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

# 数据库连接地址
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@127.0.0.1:3306/test'

# 追踪修改
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True

# 打印原始SQL
app.config['SQLALCHEMY_ECHO'] = True

# SQLALCHEMY_ENGINE_OPTIONS

db = SQLAlchemy(app)


class Goods(db.Model):
    __tablename__ = 't_good'  # 设置表名
    id = db.Column(db.Integer, primary_key=True)  # 设置主键
    name = db.Column(db.String(20), unique=True)  # 商品名称
    count = db.Column(db.Integer)  # 剩余数量


def xxx():
    # 重置数据库数据
    db.drop_all()
    db.create_all()

    # 添加一条测试数据
    goods = Goods(name='方便面', count=1)
    db.session.add(goods)
    db.session.commit()
    app.run(debug=True)


if __name__ == '__main__':
    # 删除所有继承自db.Model的表
    db.drop_all()
    # 创建所有继承自db.Model的表
    db.create_all()

    # 添加一条测试数据
    goods = Goods(name='方便面', count=1)
    db.session.add(goods)
    db.session.commit()
    app.run(debug=True)

    # 更新方式1: 先查询后更新
    # 缺点: 并发情况下, 容易出现更新丢失问题 (Lost Update)

    # 1.执行查询语句, 获取目标模型对象
    goods = Goods.query.filter(Goods.name == '方便面').first()
    # 2.对模型对象的属性进行赋值 (更新数据)
    goods.count = goods.count - 1
    # 3.提交会话
    db.session.commit()

    # 更新方式2: update子查询   可以避免更新丢失问题
    # update t_good set count = count - 1 where name = '方便面';
    Goods.query.filter(Goods.name == '方便面').update({'count': Goods.count - 1})
    # 提交会话
    db.session.commit()

# session.commit()  # 直接将数据库内存中的数据提交到数据库,此操作会内部调用session.flush(),其余的事务可以访问最新的数据;
#
# session.rollback()  # 是回滚当前事务的变更数据操作;
#
# session.flush()  # 作用是在事务管理内与数据库发生交互, 对应的实例状态被反映到数据库,比如自增 ID 被填充上值,但是数据库中当前事务的数据值并未更新上;相当于预提交,等于提交到数据库内存,还未写入数据库文件;deletions and modifications to the database as INSERTs, DELETEs, UPDATE;
#
# session.merge(obj)  # 查询更新操作;就是更新之前先查询,如果没有自动插入;



def xxx():
    try:
        user = session.Query(User).first()
        user.name = u'改名字
        session.commit()
    except:
        session.rollback()


def xxx2():
    t1 = Admin(username='test', password='123456')  # 生成admin表要插入的一条数据
    t2 = Admin(username='test1', password='abcdef')  # 生成admin表要插入的一条数据

    session.add(t2)

    try:
        with session.begin_nested():
            session.add(t1)  # 或使用session.merge(t1),表示查询更新操作
    except Exception as e:
        print(e)
        session.rollback()

    session.commit()


def xxx3():
    t1 = Admin(username='test1', password='123456')  # 生成admin表要插入的一条数据
    t2 = Admin(username='test2', password='abcdef')  # 生成admin表要插入的一条数据

    session.add(t2)

    # 创建一个子嵌套事务,第一个commit只是将子事务的数据托管到父事务,并未提交到数据库
    session.begin_nested()
    session.add(t1)
    session.commit()

    # 父事务执行提交,才真正将t1,t2提交到数据库
    session.commit()
from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

# 设置数据库连接地址
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:mysql@127.0.0.1:3306/test31'
# 是否追踪数据库修改(开启后会触发一些钩子函数)  一般不开启, 会影响性能
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 是否显示底层执行的SQL语句
app.config['SQLALCHEMY_ECHO'] = True

db = SQLAlchemy(app)


# 构建模型类  类->表  类属性->字段  实例对象->记录
# 用户表  一   一个用户可以有多个地址
class User(db.Model):
    __tablename__ = 't_user'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(20))
    addresses = db.relationship('Address')  # 1.定义关系属性 relationship("关联数据所在的模型类")


# 地址表   多
class Address(db.Model):
    __tablename__ = 't_adr'
    id = db.Column(db.Integer, primary_key=True)

    # 2. 外键字段设置外键参数  db.ForeignKey('主表名.主键')
    user_id = db.Column(db.Integer, db.ForeignKey('t_user.id'))
    detail = db.Column(db.String(20))


@app.route("/add", methods=["GET"])
def add():
    # sqlalchemy会自动创建隐式事务
    user1 = User(name='zs', age=20)
    db.session.add(user1)
    # 添加多条记录
    # db.session.add_all([user1, user2, user3])
    db.session.commit()





@app.route("/update", methods=["GET"])
def update():
    # 更新方式1: 先查询后更新
    # 缺点: 并发情况下, 容易出现更新丢失问题 (Lost Update)

    # 1.执行查询语句, 获取目标模型对象
    user = User.query.filter(User.name == 'zs').first()
    # 2.对模型对象的属性进行赋值 (更新数据)
    user.name = 'jack'
    # 3.提交会话
    db.session.commit()

    # 更新方式2: update子查询   可以避免更新丢失问题
    # update t_good set count = count - 1 where name = '方便面';
    User.query.filter(User.name == 'zs').update({'name': 'jack'})
    # 提交会话
    db.session.commit()


@app.route("/delete", methods=["GET"])
def delete():
    # 方式1: 先查后删除
    users = User.query.filter(User.name == 'jack').first()
    # 删除数据
    db.session.delete(users)
    # 提交会话 增删改都要提交会话
    db.session.commit()

    # 方式2: delete子查询
    User.query.filter(User.name == 'jack').delete()
    # 提交会话
    db.session.commit()


@app.route("/read", methods=["GET"])
def read():
    # 查询所有用户数据
    User.query.all() # 返回列表, 元素为模型对象

    # 查询有多少个用户
    User.query.count()

    # 查询第1个用户
    User.query.first()
    # 返回模型对象 / None

    # 查询id为4的用户[3种方式]
    # 方式1: 根据id查询  返回模型对象/None
    User.query.get(4)
    # 方式2: 等值过滤器 关键字实参设置字段值  返回BaseQuery对象
    # BaseQuery对象可以续接其他过滤器/执行器  如 all/count/first等
    User.query.filter_by(id=4).all()
    # 方式3: 复杂过滤器  参数为比较运算/函数引用等  返回BaseQuery对象
    User.query.filter(User.id == 4).first()

    # 查询名字结尾字符为g的所有用户[开始 / 包含]
    User.query.filter(User.name.endswith("g")).all()
    User.query.filter(User.name.startswith("w")).all()
    User.query.filter(User.name.contains("n")).all()
    User.query.filter(User.name.like("w%n%g")).all()  # 模糊查询

    # 查询名字和邮箱都以li开头的所有用户[2种方式]
    User.query.filter(User.name.startswith('li'), User.email.startswith('li')).all()
    from sqlalchemy import and_
    User.query.filter(and_(User.name.startswith('li'), User.email.startswith('li'))).all()

    # 查询age是25 或者 `email`以`itheima.com`结尾的所有用户
    from sqlalchemy import or_
    User.query.filter(or_(User.age == 25, User.email.endswith("itheima.com"))).all()

    # 查询名字不等于wang的所有用户[2种方式]
    from sqlalchemy import not_
    User.query.filter(not_(User.name == 'wang')).all()
    User.query.filter(User.name != 'wang').all()

    # 查询id为[1, 3, 5, 7, 9]的用户
    User.query.filter(User.id.in_([1, 3, 5, 7, 9])).all()

    # 所有用户先按年龄从小到大, 再按id从大到小排序, 取前5个
    User.query.order_by(User.age, User.id.desc()).limit(5).all()

    # 查询年龄从小到大第2-5位的数据   2 3 4 5
    User.query.order_by(User.age).offset(1).limit(4).all()

    # 分页查询, 每页3个, 查询第2页的数据  paginate(页码, 每页条数)
    pn = User.query.paginate(2, 3)
    # pn.pages
    # 总页数
    # pn.page
    # 当前页码
    # pn.items
    # 当前页的数据
    # pn.total
    # 总条数

    # 查询每个年龄的人数    select age, count(name) from t_user group by age  分组聚合
    from sqlalchemy import func
    data = db.session.query(User.age, func.count(User.id).label("count")).group_by(User.age).all()
    for item in data:
        # print(item[0], item[1])
        print(item.age, item.count)  # 建议通过label()方法给字段起别名, 以属性方式获取数据

    # 只查询所有人的姓名和邮箱  优化查询   User.query.all()  # 相当于select *
    from sqlalchemy.orm import load_only
    data = User.query.options(load_only(User.name, User.email)).all()  # flask-sqlalchemy的语法
    for item in data:
        print(item.name, item.email)

    data = db.session.query(User.name, User.email).all()  # sqlalchemy本体的语法
    for item in data:
        print(item.name, item.email)


@app.route("/related_read", methods=["GET"])
def related_read():
    # 关联查询步骤: (以主查从为例)
    # 先查询主表数据
    # 再通过外键字段查询
    # 关联的从表数据

    # 1.
    user1 = User.query.filter_by(name='张三').first()
    adrs = Address.query.filter_by(user_id=user1.id).all()
    for adr in adrs:
        print(adr.detail)

    # 2.关系属性
    user1 = User.query.filter_by(name='张三').first()
    for address in user1.addresses:
       print(address.detail)

    # 3.join
    # 格式: db.session.query(主表模型字段1, 主表模型字段2, 从表模型字段1, xx..).join(从表模型类, 主表模型类.主键 == 从表模型类.外键)
    # sqlalchemy的join查询
    data = db.session.query(User.id, Address.detail).join(Address, User.id == Address.user_id).filter(
        User.name == '张三').all()
    for item in data:
        print(item.detail, item.id)


    # 三者对比
    # 无论使用 外键 还是 关系属性 查询关联数据, 都需要查询两次, 一次查询用户数据, 一次查询地址数据
    #
    # 两次查询就需要发送两次请求给数据库服务器, 如果数据库和web应用不在一台服务器中, 则 网络IO会对查询效率产生一定影响
    #
    # 可以考虑使用 连接查询 join 使用一条语句就完成关联数据的查询

@app.route("/session", methods=["GET"])
def session():
    """
    session声明周期
    :return:
    """
    # flask-sqlalchemy 对于 sqlalchemy本体 的 Session 进行了一定的封装:
    # Session的生命周期和请求相近
    #
    # 	请求中的首次数据操作会创建Session
    # 	整个请求过程中使用的Session为同一个, 并且线程隔离
    # 	请求结束时会自动销毁Session(释放内存)



    # db.session.flush()


    """事务1"""
    try:
        user1 = User(name='zs', age=20)
        db.session.add(user1)
        db.session.commit()
    except BaseException:
        # 手动回滚   同一个session中, 前一个事务如果失败, 必须手动回滚, 否则无法创建新的事务
        db.session.rollback()

    """事务2"""
    user1 = User(name='lisi', age=30)
    db.session.add(user1)
    db.session.commit()

if __name__ == '__main__':
    db.drop_all()
    db.create_all()

    app.run(debug=True)