SQLAlchemy

予早 2024-05-26 22:19:36
Categories: Tags:

安装

pip

pip install sqlalchemy

版本

import sqlalchemy

print(sqlalchemy.__version__)
# 2.0.30

Engine

连接池+数据库dialect

Connects a :class:`~sqlalchemy.pool.Pool` and
:class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a
source of database connectivity and behavior.
from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://root:mYsql123456_@192.168.10.128:3306/test_db")

Connection

调用DBAPI与数据库进行交互

from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://root:mYsql123456_@192.168.10.128:3306/test_db")

from sqlalchemy import text

with engine.connect() as conn:
    result = conn.execute(text("select 'hello world'"))
    print(result.all())
    # conn.scalar()
    # conn.scalars()
    
with engine.connect() as conn:
    conn.execute(text("CREATE TABLE some_table (x int, y int)"))
    conn.execute(
        text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
        [{"x": 1, "y": 1}, {"x": 2, "y": 4}],
    )
    conn.commit()

with engine.begin() as conn:
    conn.execute(
        text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
        [{"x": 6, "y": 8}, {"x": 9, "y": 10}],
    )

Session

auto_flush

若auto_flush为True,所有查询操作执行之前会先执行Session.flush()

基于connection,管理ORM映射对象(ORM-mapped object)的相关操作。

from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://root:mYsql123456_@192.168.10.128:3306/test_db")

# 1. 直接创建session
from sqlalchemy.orm import Session

session = Session(bind=engine)

# 2.通过sessionmaker创建session
# 2.1.sessionmaker绑定engine
from sqlalchemy.orm import sessionmaker

Session = sessionmaker(bind=engine)
session = Session()

# 2.2.Session绑定engine
Session = sessionmaker()
Session.configure(bind=engine)
session = Session()

# 3.线程安全的session
from sqlalchemy.orm import scoped_session

ScopedSession = scoped_session(session_factory=sessionmaker(bind=engine))
session = ScopedSession()


# 分片session,sqlalchemy.ext.horizontal_shard.ShardedSession,可用于读写分离

Model

ORM Mapping Style

# CORE
# Table 对应数据库中的表
# Metadata 定义表时指定metedata

# 指令式
# Table 对应数据库中的表
# Metadata 定义表时指定metedata
# ORM mapped object
# registry   ORM mapped class 与 Table的映射

# 声明式
# 等价于指令式

Imperative Mapping

from sqlalchemy import Table, Column, Integer, String, ForeignKey
from sqlalchemy.orm import registry

mapper_registry = registry()

user_table = Table(
    "user",
    mapper_registry.metadata,
    Column("id", Integer, primary_key=True),
    Column("name", String(50)),
    Column("fullname", String(50)),
    Column("nickname", String(12)),
)


class User:
    pass


mapper_registry.map_imperatively(User, user_table)

Declarative Mapping

1版本的声明风格

import sqlalchemy
from sqlalchemy import Column, Integer, String, create_engine, ForeignKey, and_, or_, not_, distinct
from sqlalchemy.orm import sessionmaker, relationship, scoped_session, declarative_base
from sqlalchemy.sql import func, text

engine = create_engine('mysql+pymysql://root:mYsql123456_@192.168.10.128:3306/test_db', echo='debug', echo_pool='debug')
ScopedSession = scoped_session(sessionmaker(bind=engine))

Base = declarative_base()


class User(Base):
    __tablename__ = 't_user'
    id = Column(Integer, primary_key=True)
    name = Column(String(length=20))
    age = Column(Integer())
    addresses = relationship('Address')

    def __repr__(self):
        return f"User(id={self.id}, name={self.name}, age={self.age}, addresses={self.addresses})"


class Address(Base):
    __tablename__ = 't_address'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('t_user.id'))
    country = Column(String(length=30))
    province = Column(String(length=30))
    city = Column(String(length=30))
    detail = Column(String(length=150))

    def __repr__(self):
        return f"Address(id={self.id}, user_id={self.user_id}, country={self.country}, province={self.province}, city={self.city}, detail={self.detail})"

2版本风格

from typing import List

from sqlalchemy import ForeignKey, String, Integer, create_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, sessionmaker, scoped_session

engine = create_engine('mysql+pymysql://root:mYsql123456_@192.168.10.128:3306/test_db', echo='debug', echo_pool='debug')
ScopedSession = scoped_session(sessionmaker(bind=engine))


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "t_user"
    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(30))
    age: Mapped[int] = mapped_column(Integer())
    addresses: Mapped[List["Address"]] = relationship(
        back_populates="user", cascade="all, delete-orphan"
    )

    def __repr__(self) -> str:
        return f"User(id={self.id!r}, name={self.name!r}, age={self.age!r})"


class Address(Base):
    __tablename__ = "address"
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int] = mapped_column(ForeignKey("t_user.id"))
    country: Mapped[str] = mapped_column(String(30))
    province: Mapped[str] = mapped_column(String(30))
    city: Mapped[str] = mapped_column(String(30))
    detail: Mapped[str] = mapped_column(String(100))
    user: Mapped["User"] = relationship(back_populates="addresses")

    def __repr__(self) -> str:
        return (f"Address(id={self.id!r}, user_id={self.user_id!r}, country={self.country!r}, "
                f"province={self.province!r}, city={self.city!r}, detail={self.detail!r})")

通过metadata在数据库创建表和删除表

# 通过metadata创建表或者删除表
# 删除所有模型类创建的表
Base.metadata.drop_all(engine)
# 创建所有模型类的表格,有则不创建
Base.metadata.create_all(engine)

ORM Mapped Object

object state

https://docs.sqlalchemy.org/en/20/orm/session_events.html#object-lifecycle-events

Transient

新创建的对象即为transient状态

user = User(name='davis')

Pending

session.add()会使得对象变为pending状态

session.add(user)

Persistent

将pending状态的对象flush到数据库可以获得persistent状态的对象

session.flush(user)

从数据库中查询可以获得persistent状态的对象


将persistent对象从其他session中启动到当前session


Deleted

事务提交时,该状态对象会转变为detached状态

事务回滚时,该状态对象会转变为persistent状态

Detached

detached状态的对象将包含数据库id等信息,但由于它与会话无关,不能保证这些信息就是当前数据库中的信息,detached状态的对象可以安全地正常使用,除了对象无法加载未加载的属性、懒加载的属性(relationship())或以前标记为“过期”的属性。

inspect object

import sqlalchemy
from sqlalchemy import Column, Integer, String, create_engine, ForeignKey, and_, or_, not_, distinct, inspect
from sqlalchemy.orm import sessionmaker, relationship, scoped_session, declarative_base, InstanceState
from sqlalchemy.sql import func, text

engine = create_engine('mysql+pymysql://root:mYsql123456_@192.168.10.128:3306/test_db', echo='debug', echo_pool='debug')
ScopedSession = scoped_session(sessionmaker(bind=engine))

Base = declarative_base()


class User(Base):
    __tablename__ = 't_user'
    id = Column(Integer, primary_key=True)
    name = Column(String(length=20))
    age = Column(Integer())
    addresses = relationship('Address')

    def __repr__(self):
        return f"User(id={self.id}, name={self.name}, age={self.age}, addresses={self.addresses})"


class Address(Base):
    __tablename__ = 't_address'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('t_user.id'))
    country = Column(String(length=30))
    province = Column(String(length=30))
    city = Column(String(length=30))
    detail = Column(String(length=150))

    def __repr__(self):
        return f"Address(id={self.id}, user_id={self.user_id}, country={self.country}, province={self.province}, city={self.city}, detail={self.detail})"


Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)

user = User(name="yuzao")
insp: InstanceState = inspect(user)
print(type(insp), insp.transient)

object state、expired、flush分析

Session.flush()

Session.commit()

Session.rollback()

Session.close()

from sqlalchemy import Column, Integer, String, create_engine, ForeignKey, inspect, select
from sqlalchemy.orm import sessionmaker, relationship, scoped_session, InstanceState, declarative_base

engine = create_engine('mysql+pymysql://root:mYsql123456_@192.168.10.128:3306/ye_cao_edu?charset=utf8mb4', echo='debug',
                       echo_pool='debug')
Base = declarative_base()
ScopedSession = scoped_session(sessionmaker(bind=engine, autoflush=True))


class User(Base):
    __tablename__ = 't_user'
    id = Column(Integer, primary_key=True)
    name = Column(String(length=20))
    addresses = relationship('Address')


class Address(Base):
    __tablename__ = 't_address'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('t_user.id'))
    detail = Column(String(length=20))


session = ScopedSession()

user1 = User(name="yuzao")
address1 = Address(user_id=1, detail="nothing")
print('in session', user1 in session)

insp: InstanceState = inspect(user1)
print('transient', insp.transient)

session.add(user1)
session.add(user1)  # 多次add同一个对象只会保留最后一次操作,而不会操作两次
session.add_all([address1])
print('in session', user1 in session)
print('pending', insp.pending)

# add的object保存在identity_map中
print("identity_set", session.new)
print("identity_map", session._new)

# 在此处才真正和数据库交互执行SQL
session.flush()

print('persistent', insp.persistent)

# 执行SQL之后可以i获取到id
print(user1.id, address1.id)

user2 = User(name="alice")
session.add(user2)

# 查询会触发autoflush,但由于入参user2.id为None,flush之后user2.id才变为真正地id,所以执行SQL时id依然根据None查询
user2_from_db_by_get = session.get(User, user2.id)
print(user2_from_db_by_get, user2.id)

# 修改值,即使实质上没有修改
user1.name = "davis"
print(insp.persistent)
print("identity_set", session.dirty)

user1.name = "jack"
name_from_db_by_execute = session.execute(select(User.name).where(User.id == user1.id)).scalar_one()
print(user1.name, name_from_db_by_execute)

# delete本身不会触发数据库操作
session.delete(user1)
print('persistent', insp.persistent)
print("identity_set", session.deleted)
print("identity_map", session._deleted)

# 触发数据库操作,然后object state更改
session.flush()
print('deleted', insp.deleted)

print(session.expire_on_commit)
session.commit()
print('detached', insp.detached)
print('expired', insp.expired)

user2 = User(name="frank")
user3 = User(name="jane")
user4 = User(name="black")
session.add_all([user2, user3, user4])
session.commit()

user1 = User(name="eric")
session.add(user1)

user2.name = "llll"

user3_from_db_by_get = session.get(User, user3.id)
print(user3 == user3_from_db_by_get)

session.delete(user4)
session.commit()

print(inspect(user1).expired)  # True
print(inspect(user2).expired)  # True
print(inspect(user3).expired)  # True
print(inspect(user4).expired)  # False

user2 = User(name="frank")
user3 = User(name="jane")
user4 = User(name="black")
session.add_all([user2, user3, user4])
session.commit()

user1 = User(name="eric")
session.add(user1)

user2.name = "llll"

user3_from_db_by_get = session.get(User, user3.id)
print(user3 == user3_from_db_by_get)

session.delete(user4)
session.rollback()

print(inspect(user1).expired)  # False
print(inspect(user2).expired)  # True
print(inspect(user3).expired)  # True
print(inspect(user4).expired)  # True

CRUD

Session.add()
Session.add_all()
Session.get()
Session.delete()
Session.query()
Session.execute()
import sqlalchemy
from sqlalchemy import Column, Integer, String, create_engine, ForeignKey, and_, or_, not_, distinct
from sqlalchemy.orm import sessionmaker, relationship, scoped_session, declarative_base
from sqlalchemy.sql import func, text

engine = create_engine('mysql+pymysql://root:mYsql123456_@192.168.10.128:3306/test_db', echo='debug', echo_pool='debug')
Base = declarative_base()
ScopedSession = scoped_session(sessionmaker(bind=engine))


class User(Base):
    __tablename__ = 't_user'
    id = Column(Integer, primary_key=True)
    name = Column(String(length=20))
    age = Column(Integer())
    addresses = relationship('Address')

    def __repr__(self):
        return f"User(id={self.id}, name={self.name}, age={self.age}, addresses={self.addresses})"


class Address(Base):
    __tablename__ = 't_address'
    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey('t_user.id'))
    country = Column(String(length=30))
    province = Column(String(length=30))
    city = Column(String(length=30))
    detail = Column(String(length=150))

    def __repr__(self):
        return f"Address(id={self.id}, user_id={self.user_id}, country={self.country}, province={self.province}, city={self.city}, detail={self.detail})"


Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)

def add_example():
    session = ScopedSession()
    try:
        user1 = User(name='Jack', age=14)  # update_example、delete_example
        session.add(user1)

        user2 = User(name='Bob', age=17)
        user3 = User(name='Alex', age=26)
        user4 = User(name='Frank', age=23)
        # 并非批量操作,实质为循环调用Session.add()
        session.add_all([user2, user3, user4])

        session.flush()
        address4 = Address(id=user4.id, country="China", province="Beijing", city="Beijing", detail="......")
        session.add(address4)
        session.commit()
    except Exception as e:
        print(e)
        session.rollback()
    finally:
        session.close()


add_example()

def update_example1():
    session = ScopedSession()
    try:
        # 先查询出来再更新,两次数据库操作
        user = session.query(User).filter_by(id=1).first()
        if user:
            user.name = 'update_example1'
        session.commit()
    except Exception as e:
        print(e)
        session.rollback()
    finally:
        session.close()


update_example1()


def update_example2():
    session = ScopedSession()
    try:
        # 直接更新,一次数据库操作
        update_count = session.query(User).filter_by(id=1).update({User.name: 'update_example2'})
        print(f"更新了{update_count}条数据")
        session.commit()
    except Exception as e:
        print(e)
        session.rollback()
    finally:
        session.close()


update_example2()


# select for update ?
 with session.begin():
     query = session.query(User).filter(User.name == 'Alice').with_for_update()
     user = query.first()
     user.age = user.age + 1

def delete_example1():
    session = ScopedSession()
    try:
        # 先查询出来再删除,两次数据库操作
        del_data = session.query(User).filter_by(id=1).first()
        session.delete(del_data)
        session.commit()
    except Exception as e:
        print(e)
        session.rollback()
    finally:
        session.close()


delete_example1()


def delete_example2():
    session = ScopedSession()
    try:
        # 直接删除,一次数据库操作
        del_count = session.query(User).filter_by(id=1).delete()
        print(f"删除了{del_count}条数据")
        session.commit()
    except Exception as e:
        print(e)
        session.rollback()
    finally:
        session.close()


delete_example2()

def get_example1():
    """
    根据id查询数据
    """
    session = ScopedSession()

    # Session.get()获取不到返回None
    print(session.get(User, 2))

    # Session.get_one()获取不到抛出异常
    print(session.get_one(User, 3))

    # 基于Query的get,2.0版本建议使用session.get()
    print(session.query(User).get(4))
    # query().get()不允许查询指定字段、复杂条件等语法
    # session.query(User.name, User.age).get(4)
    # session.query(User).filter(User.id != 4).get(4)


get_example1()


def get_example2():
    """
    通过Query构造查询,指定字段
    """
    session = ScopedSession()

    # 直接session.query(User).all()会返回User的对象列表
    query: sqlalchemy.orm.query.Query = session.query(User)
    users: list = query.all()
    print(users)
    # [User(self.id=2, self.name='Bob', self.age=17, self.addresses=[]), User(self.id=3, self.name='Alex', self.age=26, self.addresses=[]), User(self.id=4, self.name='Frank', self.age=23, self.addresses=[])]

    # session.query(User.name, User.age)返回对应字段的元组列表
    query: sqlalchemy.orm.query.Query = session.query(User.name, User.age)
    users: list = query.all()
    print(users)
    # [('Bob', 17), ('Alex', 26), ('Frank', 23)]
    
    # 使用with_entities可以重新设置要查询的字段,在一些情况下可以复用构建的query
    query: sqlalchemy.orm.query.Query = session.query(User.name)
    query = query.with_entities(User.age)
    users: list = query.all()
    print(users)
    # [(14,), (17,), (26,), (23,)]
    
    # 可以先查询User然后使用with_entitiesz
    query: sqlalchemy.orm.query.Query = session.query(User)
    users: list = query.with_entities(User.name, User.age).all()
    print(users) 
    # [('Jack', 14), ('Bob', 17), ('Alex', 26), ('Frank', 23)]


get_example2()


def get_example3():
    """
    通过Query构造查询,过滤条件
    """
    session = ScopedSession()
    query: sqlalchemy.orm.query.Query = session.query(User)

    # filter_by基于kw进行过滤,仅适用于field=value形式的过滤,没有类似于Django Model中丰富的关键字语法
    print(query.filter_by(name="Bob").all())

    # filter基于model构造查询条件,支持复杂条件
    # 风格1:query.filter(User.name == 'Frank', User.age != 23).all()
    # 风格2:query.filter(User.name.is('Frank'), User.age.isnot(23)).all()

    # sqlalchemy/sql/operators.py

    # 相等与不等
    print(query.filter(User.name == 'Frank', User.age != 23).all())
    print(query.filter(User.name == None).all())
    print(query.filter(User.name.is_(None)).all())
    print(query.filter(User.name != None).all())
    print(query.filter(User.name.isnot(None)).all())

    # 模糊条件
    print(query.filter(User.name.like('B%'), User.name.notlike('%k')).all())

    # in 、 not in
    print(query.filter(User.name.in_(['Bob', 'Frank']), ~User.age.in_([17, 13])).all())

    # 与
    print(query.filter(and_(User.name == 'Bob', User.age == 17)).all())
    print(query.filter(User.name == 'Bob', User.age == 17).all())
    print(query.filter(User.name == 'Bob').filter(User.age == 17).all())

    # 或
    print(query.filter(or_(User.name == 'Bob', User.age == 17)).all())

    # 非
    # not_与~等价
    print(query.filter(not_(User.name == 'Bob')).all())
    print(query.filter(~(User.name == 'Bob')).all())
    print(query.filter(~and_(User.name == 'Bob', User.age == 13)).all())


get_example3()


def get_example4():
    """
    直接执行SQL
    """
    session = ScopedSession()
    print(session.execute(text("select * from t_user where id=:id"), {"id": 4}).fetchall())
    # [(4, 'Frank', 23)]


get_example4()


def get_example5():
    """
    多表查询
    """
    session = ScopedSession()

    # SELECT t_user.name AS t_user_name, t_address.country AS t_address_country  FROM t_user, t_address WHERE t_user.id = t_address.user_id
    print(session.query(User.name, Address.country).filter(User.id == Address.user_id).all())
    # SELECT t_user.name AS t_user_name, t_address.country AS t_address_country FROM t_user INNER JOIN t_address ON t_user.id = t_address.user_id
    print(session.query(User.name, Address.country).join(Address, User.id == Address.user_id).all())

    # SELECT t_user.name AS t_user_name, t_address.country AS t_address_country FROM t_user LEFT OUTER JOIN t_address ON t_user.id = t_address.user_id
    print(session.query(User.name, Address.country).outerjoin(Address, User.id == Address.user_id).all())

    # SELECT anon_1.t_user_name AS anon_1_t_user_name FROM (SELECT t_user.name AS t_user_name FROM t_user UNION SELECT t_address.country AS t_address_country FROM t_address) AS anon_1
    print(session.query(User.name).union(session.query(Address.country)).all())


get_example5()


def get_example6():
    """
    Query的结果集
    """
    session = ScopedSession()
    query: sqlalchemy.orm.query.Query = session.query(User)

    # 结果集中包含满足条件数据中的所有数据,没有数据则为None
    print(query.all())

    # 结果集中包含满足条件数据中的第一条数据,没有数据则为None
    print(query.first())

    # 结果集中包含恰好满足条件的唯一的一条数据,若没有查到数据或者有多条满足条件的数据都会抛出异常
    try:
        print(query.one())
    except Exception as e:
        print(e)

    # 结果集中包含恰好满足条件的唯一的一条数据,若没有查到数据则为None,有多条满足条件的数据会抛出异常
    try:
        print(query.one_or_none())
    except Exception as e:
        print(e)


get_example6()


def get_example7():
    """
    排序
    """
    session = ScopedSession()
    query: sqlalchemy.orm.query.Query = session.query(User)
    # 升序
    print(query.order_by(User.id).all())
    # 降序
    print(query.order_by(-User.id).all())
    # 多条件排序
    print(query.order_by(-User.id, User.name).all())


get_example7()


def get_example8():
    """
    分组
    """
    session = ScopedSession()
    query: sqlalchemy.orm.query.Query = session.query(User.name)
    print(query.group_by(User.name).all())


get_example8()


def get_example9():
    """
    聚合函数
    :return:
    """
    session = ScopedSession()
    print(session.query(
        func.max(User.age),
        func.min(User.age),
        func.sum(User.age),
        func.avg(User.age),
        func.count(User.age),
    ).all())


get_example9()


def get_example10():
    # 去重
    session = ScopedSession()
    print(session.query(User.name, User.age).distinct().all())


get_example10()


def get_example11():
    """
    分页
    """
    session = ScopedSession()
    query: sqlalchemy.orm.query.Query = session.query(User)

    # 偏移分页
    print(query.offset(1).limit(1).all())

    # 使用slice()偏移分页,等价于 limit start, end - start
    print(session.query(User).slice(1, 2).all())

    # 该方法并不能在数据库层面实现分页,而是全部返回数据由Python切片
    # session.query(User).all()[1:2]


get_example11()

事务操作分析

def transaction_example1():
    """
    同一个session不允许开启一个事务的时候再开启另一个事务
    """
    session = ScopedSession()
    session.begin()
    session.add(User(name='Jack'))
    session.begin()
    session.add(User(name='Jack'))
    session.commit()


def transaction_example2():
    """
    两个session开启两个事务
    """
    Session = sessionmaker(bind=engine, autobegin=False)
    session1, session2 = Session(), Session()
    session1.begin()
    session1.add(User(name='Jack'))
    session2.begin()
    session2.add(User(name='Jack'))
    session1.commit()
    session2.commit()


def transaction_example3():
    """
    在同一个session中不允许先尝试开启嵌套事务然后开启一般事务
    """
    session = ScopedSession()
    session.begin(nested=True)
    session.add(User(name='Jack'))
    session.begin()
    session.add(User(name='Jack'))
    session.commit()


def transaction_example4():
    """
    在同一个session中允许先尝试开启一般事务然后开启嵌套事务
    """
    session = ScopedSession()
    session.begin()
    session.add(User(name='Jack'))
    session.begin(nested=True)
    session.add(User(name='Jack'))
    session.commit()


def transaction_example5():
    name = "transaction_example5"
    Session = sessionmaker(bind=engine, autobegin=False)
    session1, session2 = Session(), Session()

    session1.begin()
    user = session1.query(User).filter(User.name == name).first()
    if user:
        raise Exception("测试前提:库中不应当有这个数据")
    with session2.begin():
        # session2添加该数据
        session2.add(User(name=name))
    user = session1.query(User).filter(User.name == name).first()
    if not user:
        raise Exception(
            f'测试结果:由于隔离级别{session1.connection().get_isolation_level()}限制,session1中查不到session2中的数据')
    session1.commit()


def transaction_example6():
    """
    begin并非在数据库层面开启事务
    """
    Session = sessionmaker(bind=engine, autobegin=False)
    session1, session2 = Session(), Session()
    session2.begin()
    # begin之后执行完毕session1事务,session2的事务可以获取到数据
    session1.begin()
    session1.add(User(name='Jane'))
    session1.commit()
    session2.query(User).filter(User.name == "Jane").one()
    session2.commit()

SQL 打印

SQLAlchemy 中打印 SQL 语句,flask-sqlalchemy中同样适用。

方法一,设置配置

SQLALCHEMY_ECHO=True

方法二,直接打印

str(session.query(model.Name).order_by(model.Name.value))

Glossary

unit of work pattern

一种软件架构,持久化系统(例如object relational mapper)维护一系列对象的更改列表,并定期将所有这些待处理的更改刷新到数据库中。

SQLALchemy中的Session实现了工作单元模式,使用相关方法(例如Session.add())将object添加到Session,随后以工作单元模式风格进行持久化。

attached

表明ORM object关联到了某一个Session

expire//expiring/expired

在 SQLAlchemy ORM 中,指persistent对象(有时是detached对象)中的数据被擦除时,当下次访问该对象的属性时,将发出懒加载 SQL 查询,以刷新当前正在进行的事务中存储的该对象的数据。

cascade

SQLAlchemy 中的一个术语,用于描述在特定对象上进行的 ORM 持久化操作如何扩展到与该对象直接关联的其他对象。在 SQLAlchemy 中,这些对象关联是使用 relationship() 结构配置的。 relationship() 包含一个称为 relationship.cascade 的参数,它提供了某些持久化操作如何级联的选项。

ORM mapped class

A mapped class typically refers to a single particular database table, the name of which is indicated by using the __tablename__ class-level attribute.

BEGIN (implicit)

You might have noticed the log line “BEGIN (implicit)” at the start of a transaction block. “implicit” here means that SQLAlchemy did not actually send any command to the database; it just considers this to be the start of the DBAPI’s implicit transaction. You can register event hooks to intercept this event, for example.

SQLAlchemy 使用注意

1、查询结果和数据库数据不一致

现象:数据id==1的数据的name==“test”,事务A 修改 id==1的数据name改为“hello world”并commit,事务B 在事务A执行玩之前 查询 id==1 的name==“test”,等事务A执行完成,再从查询id==1的name还是“test”,而不是“hello world”

原因:由于数据事务机制(参考:彻底搞懂 MySQL 事务的隔离级别-阿里云开发者社区),导致事务B在第一次查询的时候,真正请求数据库进行查询,但后面的查询直接读取的已经缓存在内存中的数据,而没有真正去数据库中查询

解决方案:

在每次query之后执行commit

2、长时间未请求连接自动断开

现象:长时间服务端没有连接数据库,数据库连接自动断开

原因:1、sqlalchemy在create_engine时,使用连接池并没有指定连接池回收时间,则连接池的连接不会自动被回收,并默认使用QueuePool进行连接池管理,调用session.close(),不会断开连接,2、数据库,例如mysql会设置一个wait_timeout(默认8个小时),当连接空闲8个小时,则自动断开

解决方案:

 create_engine重要参数:
   pool_size:连接数,采用了惰性思想,例如:pool_size=10,如果项目中只使用了5个,则连接池中的连接数,只有5个,但当项目同时使用了10个连接,则后续连接池中的连接数为10个
   max_overflow:超出连接数时,允许再新建的连接数,例如:pool_size=10,max_overflow=8,最大连接数18个,但其中8个不在使用时,直接回收,连接池中的连接数为10个
   pool_timeout:等待可用连接时间,超时则报错,默认为30秒
   pool_recycle:连接生存时长,超过则该连接被回收,再生存新连接,可把这个值改成小于wait_timeout;设置-1时,则不回收连接
 from sqlalchemy import create_engine
 from sqlalchemy.orm import sessionmaker
 from sqlalchemy.pool import NullPool
 
 
 class DbHandler(object):
     def __init__(self):
         self.db_addr = "XXXXXX"
 
     def create_session(self):
         engine = create_engine(self.db_addr, poolclass=NullPool)
         session = sessionmaker(bind=engine)()
         return session

3、全局session在多进程下被断开

想象:sqlalchemy连接池,经常报错:mysql server has gone away

原因:父进程创建子进程,子进程会使用父进程的数据连接,当子进程执行完成,断开数据库的连接,则全局session和连接被释放,后续要使用连接数据库时,则报错

解决方案:多进程中,尽量没有进程新建连接和session

通过数据库表生成 SQLAlalchemy model

v1

pip install sqlacodegen
pip install sqlalchemy
 
sqlacodegen --table [表名user] mysql+pymysql://[root]:[password]@127.0.0.1/[db_name]> user.py
sqlacodegen --table t_user mysql+pymysql://root:123456@192.168.1.1/db > user_model.py

v1 flask

pip install flask-sqlacodegen
python -m sqlacodegen.main --flask --outfile models.py mysql+pymysql://<username>:<password>@<database-ip>:<port>/<database-name> [--tables <tablenames>] [--notables]
 
python -m sqlacodegen.main --flask --outfile models.py mysql+pymysql://root:123456@192.168.1.1/db

v2

pip install sqlacodegen-v2
 
sqlacodegen_v2 --table t_user mysql+pymysql://root:123456@192.168.1.1/db > user_model.py

sqlalchemy 通过数据库反向生成model

https://www.jiege.tech/extensions/flask-sqlalchemy.html

https://blog.csdn.net/lijianping962464/article/details/125723812

https://blog.51cto.com/u_16175441/8135724

https://blog.csdn.net/qq_45632139/article/details/114286909

https://blog.csdn.net/lijianping962464/article/details/125723812