安装
pip
pip install sqlalchemy
版本
import sqlalchemy
print(sqlalchemy.__version__)
# 2.0.30
Engine
连接池+数据库dialect
Connects a :class:`~sqlalchemy.pool.Pool` and
:class:`~sqlalchemy.engine.interfaces.Dialect` together to provide a
source of database connectivity and behavior.
from sqlalchemy import create_engine
engine = create_engine("mysql+pymysql://root:mYsql123456_@192.168.10.128:3306/test_db")
Connection
调用DBAPI与数据库进行交互
from sqlalchemy import create_engine
engine = create_engine("mysql+pymysql://root:mYsql123456_@192.168.10.128:3306/test_db")
from sqlalchemy import text
with engine.connect() as conn:
result = conn.execute(text("select 'hello world'"))
print(result.all())
# conn.scalar()
# conn.scalars()
with engine.connect() as conn:
conn.execute(text("CREATE TABLE some_table (x int, y int)"))
conn.execute(
text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
[{"x": 1, "y": 1}, {"x": 2, "y": 4}],
)
conn.commit()
with engine.begin() as conn:
conn.execute(
text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
[{"x": 6, "y": 8}, {"x": 9, "y": 10}],
)
Session
auto_flush
若auto_flush为True,所有查询操作执行之前会先执行Session.flush()
基于connection,管理ORM映射对象(ORM-mapped object)的相关操作。
from sqlalchemy import create_engine
engine = create_engine("mysql+pymysql://root:mYsql123456_@192.168.10.128:3306/test_db")
# 1. 直接创建session
from sqlalchemy.orm import Session
session = Session(bind=engine)
# 2.通过sessionmaker创建session
# 2.1.sessionmaker绑定engine
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
session = Session()
# 2.2.Session绑定engine
Session = sessionmaker()
Session.configure(bind=engine)
session = Session()
# 3.线程安全的session
from sqlalchemy.orm import scoped_session
ScopedSession = scoped_session(session_factory=sessionmaker(bind=engine))
session = ScopedSession()
# 分片session,sqlalchemy.ext.horizontal_shard.ShardedSession,可用于读写分离
Model
ORM Mapping Style
# CORE
# Table 对应数据库中的表
# Metadata 定义表时指定metedata
# 指令式
# Table 对应数据库中的表
# Metadata 定义表时指定metedata
# ORM mapped object
# registry ORM mapped class 与 Table的映射
# 声明式
# 等价于指令式
Imperative Mapping
from sqlalchemy import Table, Column, Integer, String, ForeignKey
from sqlalchemy.orm import registry
mapper_registry = registry()
user_table = Table(
"user",
mapper_registry.metadata,
Column("id", Integer, primary_key=True),
Column("name", String(50)),
Column("fullname", String(50)),
Column("nickname", String(12)),
)
class User:
pass
mapper_registry.map_imperatively(User, user_table)
Declarative Mapping
1版本的声明风格
import sqlalchemy
from sqlalchemy import Column, Integer, String, create_engine, ForeignKey, and_, or_, not_, distinct
from sqlalchemy.orm import sessionmaker, relationship, scoped_session, declarative_base
from sqlalchemy.sql import func, text
engine = create_engine('mysql+pymysql://root:mYsql123456_@192.168.10.128:3306/test_db', echo='debug', echo_pool='debug')
ScopedSession = scoped_session(sessionmaker(bind=engine))
Base = declarative_base()
class User(Base):
__tablename__ = 't_user'
id = Column(Integer, primary_key=True)
name = Column(String(length=20))
age = Column(Integer())
addresses = relationship('Address')
def __repr__(self):
return f"User(id={self.id}, name={self.name}, age={self.age}, addresses={self.addresses})"
class Address(Base):
__tablename__ = 't_address'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('t_user.id'))
country = Column(String(length=30))
province = Column(String(length=30))
city = Column(String(length=30))
detail = Column(String(length=150))
def __repr__(self):
return f"Address(id={self.id}, user_id={self.user_id}, country={self.country}, province={self.province}, city={self.city}, detail={self.detail})"
2版本风格
from typing import List
from sqlalchemy import ForeignKey, String, Integer, create_engine
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, sessionmaker, scoped_session
engine = create_engine('mysql+pymysql://root:mYsql123456_@192.168.10.128:3306/test_db', echo='debug', echo_pool='debug')
ScopedSession = scoped_session(sessionmaker(bind=engine))
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "t_user"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(30))
age: Mapped[int] = mapped_column(Integer())
addresses: Mapped[List["Address"]] = relationship(
back_populates="user", cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"User(id={self.id!r}, name={self.name!r}, age={self.age!r})"
class Address(Base):
__tablename__ = "address"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("t_user.id"))
country: Mapped[str] = mapped_column(String(30))
province: Mapped[str] = mapped_column(String(30))
city: Mapped[str] = mapped_column(String(30))
detail: Mapped[str] = mapped_column(String(100))
user: Mapped["User"] = relationship(back_populates="addresses")
def __repr__(self) -> str:
return (f"Address(id={self.id!r}, user_id={self.user_id!r}, country={self.country!r}, "
f"province={self.province!r}, city={self.city!r}, detail={self.detail!r})")
通过metadata在数据库创建表和删除表
# 通过metadata创建表或者删除表
# 删除所有模型类创建的表
Base.metadata.drop_all(engine)
# 创建所有模型类的表格,有则不创建
Base.metadata.create_all(engine)
ORM Mapped Object
object state
https://docs.sqlalchemy.org/en/20/orm/session_events.html#object-lifecycle-events
Transient
不在session中
未保存到数据库(例如没有数据库id)
新创建的对象即为transient状态
user = User(name='davis')
Pending
- 在session中
- 有增加或更新操作但未flush到数据库
session.add()会使得对象变为pending状态
session.add(user)
Persistent
- 在session中
- 数据库中有记录
将pending状态的对象flush到数据库可以获得persistent状态的对象
session.flush(user)
从数据库中查询可以获得persistent状态的对象
将persistent对象从其他session中启动到当前session
Deleted
在session中
有删除操作且flush到数据库,但事务未提交
事务提交时,该状态对象会转变为detached状态
事务回滚时,该状态对象会转变为persistent状态
Detached
- 不在session中
- 与数据库中的记录相对应或以前相对应
detached状态的对象将包含数据库id等信息,但由于它与会话无关,不能保证这些信息就是当前数据库中的信息,detached状态的对象可以安全地正常使用,除了对象无法加载未加载的属性、懒加载的属性(relationship())或以前标记为“过期”的属性。
inspect object
import sqlalchemy
from sqlalchemy import Column, Integer, String, create_engine, ForeignKey, and_, or_, not_, distinct, inspect
from sqlalchemy.orm import sessionmaker, relationship, scoped_session, declarative_base, InstanceState
from sqlalchemy.sql import func, text
engine = create_engine('mysql+pymysql://root:mYsql123456_@192.168.10.128:3306/test_db', echo='debug', echo_pool='debug')
ScopedSession = scoped_session(sessionmaker(bind=engine))
Base = declarative_base()
class User(Base):
__tablename__ = 't_user'
id = Column(Integer, primary_key=True)
name = Column(String(length=20))
age = Column(Integer())
addresses = relationship('Address')
def __repr__(self):
return f"User(id={self.id}, name={self.name}, age={self.age}, addresses={self.addresses})"
class Address(Base):
__tablename__ = 't_address'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('t_user.id'))
country = Column(String(length=30))
province = Column(String(length=30))
city = Column(String(length=30))
detail = Column(String(length=150))
def __repr__(self):
return f"Address(id={self.id}, user_id={self.user_id}, country={self.country}, province={self.province}, city={self.city}, detail={self.detail})"
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
user = User(name="yuzao")
insp: InstanceState = inspect(user)
print(type(insp), insp.transient)
object state、expired、flush分析
Session.flush()
- 与数据库交互,执行SQL
- session中所有object置为persistent状态
Session.commit()
- 当前事务被提交
- session中所有object置为persistent状态,并且置所有object为expired(可以由Session.expire_on_commit)
Session.rollback()
- 当前事务被回滚
- session中所有object被expired(下次访问object属性时会懒加载select查询)
Session.close()
- 若当前事务未提交,则Session.rollback()
- 释放连接资源回到连接池
- session中所有object被清理并置为detached状态,object是否expired取决于Session.commit()还是Session.rollback()
from sqlalchemy import Column, Integer, String, create_engine, ForeignKey, inspect, select
from sqlalchemy.orm import sessionmaker, relationship, scoped_session, InstanceState, declarative_base
engine = create_engine('mysql+pymysql://root:mYsql123456_@192.168.10.128:3306/ye_cao_edu?charset=utf8mb4', echo='debug',
echo_pool='debug')
Base = declarative_base()
ScopedSession = scoped_session(sessionmaker(bind=engine, autoflush=True))
class User(Base):
__tablename__ = 't_user'
id = Column(Integer, primary_key=True)
name = Column(String(length=20))
addresses = relationship('Address')
class Address(Base):
__tablename__ = 't_address'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('t_user.id'))
detail = Column(String(length=20))
session = ScopedSession()
user1 = User(name="yuzao")
address1 = Address(user_id=1, detail="nothing")
print('in session', user1 in session)
insp: InstanceState = inspect(user1)
print('transient', insp.transient)
session.add(user1)
session.add(user1) # 多次add同一个对象只会保留最后一次操作,而不会操作两次
session.add_all([address1])
print('in session', user1 in session)
print('pending', insp.pending)
# add的object保存在identity_map中
print("identity_set", session.new)
print("identity_map", session._new)
# 在此处才真正和数据库交互执行SQL
session.flush()
print('persistent', insp.persistent)
# 执行SQL之后可以i获取到id
print(user1.id, address1.id)
user2 = User(name="alice")
session.add(user2)
# 查询会触发autoflush,但由于入参user2.id为None,flush之后user2.id才变为真正地id,所以执行SQL时id依然根据None查询
user2_from_db_by_get = session.get(User, user2.id)
print(user2_from_db_by_get, user2.id)
# 修改值,即使实质上没有修改
user1.name = "davis"
print(insp.persistent)
print("identity_set", session.dirty)
user1.name = "jack"
name_from_db_by_execute = session.execute(select(User.name).where(User.id == user1.id)).scalar_one()
print(user1.name, name_from_db_by_execute)
# delete本身不会触发数据库操作
session.delete(user1)
print('persistent', insp.persistent)
print("identity_set", session.deleted)
print("identity_map", session._deleted)
# 触发数据库操作,然后object state更改
session.flush()
print('deleted', insp.deleted)
print(session.expire_on_commit)
session.commit()
print('detached', insp.detached)
print('expired', insp.expired)
user2 = User(name="frank")
user3 = User(name="jane")
user4 = User(name="black")
session.add_all([user2, user3, user4])
session.commit()
user1 = User(name="eric")
session.add(user1)
user2.name = "llll"
user3_from_db_by_get = session.get(User, user3.id)
print(user3 == user3_from_db_by_get)
session.delete(user4)
session.commit()
print(inspect(user1).expired) # True
print(inspect(user2).expired) # True
print(inspect(user3).expired) # True
print(inspect(user4).expired) # False
user2 = User(name="frank")
user3 = User(name="jane")
user4 = User(name="black")
session.add_all([user2, user3, user4])
session.commit()
user1 = User(name="eric")
session.add(user1)
user2.name = "llll"
user3_from_db_by_get = session.get(User, user3.id)
print(user3 == user3_from_db_by_get)
session.delete(user4)
session.rollback()
print(inspect(user1).expired) # False
print(inspect(user2).expired) # True
print(inspect(user3).expired) # True
print(inspect(user4).expired) # True
CRUD
Session.add()
Session.add_all()
Session.get()
Session.delete()
Session.query()
Session.execute()
import sqlalchemy
from sqlalchemy import Column, Integer, String, create_engine, ForeignKey, and_, or_, not_, distinct
from sqlalchemy.orm import sessionmaker, relationship, scoped_session, declarative_base
from sqlalchemy.sql import func, text
engine = create_engine('mysql+pymysql://root:mYsql123456_@192.168.10.128:3306/test_db', echo='debug', echo_pool='debug')
Base = declarative_base()
ScopedSession = scoped_session(sessionmaker(bind=engine))
class User(Base):
__tablename__ = 't_user'
id = Column(Integer, primary_key=True)
name = Column(String(length=20))
age = Column(Integer())
addresses = relationship('Address')
def __repr__(self):
return f"User(id={self.id}, name={self.name}, age={self.age}, addresses={self.addresses})"
class Address(Base):
__tablename__ = 't_address'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('t_user.id'))
country = Column(String(length=30))
province = Column(String(length=30))
city = Column(String(length=30))
detail = Column(String(length=150))
def __repr__(self):
return f"Address(id={self.id}, user_id={self.user_id}, country={self.country}, province={self.province}, city={self.city}, detail={self.detail})"
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
增
def add_example():
session = ScopedSession()
try:
user1 = User(name='Jack', age=14) # update_example、delete_example
session.add(user1)
user2 = User(name='Bob', age=17)
user3 = User(name='Alex', age=26)
user4 = User(name='Frank', age=23)
# 并非批量操作,实质为循环调用Session.add()
session.add_all([user2, user3, user4])
session.flush()
address4 = Address(id=user4.id, country="China", province="Beijing", city="Beijing", detail="......")
session.add(address4)
session.commit()
except Exception as e:
print(e)
session.rollback()
finally:
session.close()
add_example()
改
def update_example1():
session = ScopedSession()
try:
# 先查询出来再更新,两次数据库操作
user = session.query(User).filter_by(id=1).first()
if user:
user.name = 'update_example1'
session.commit()
except Exception as e:
print(e)
session.rollback()
finally:
session.close()
update_example1()
def update_example2():
session = ScopedSession()
try:
# 直接更新,一次数据库操作
update_count = session.query(User).filter_by(id=1).update({User.name: 'update_example2'})
print(f"更新了{update_count}条数据")
session.commit()
except Exception as e:
print(e)
session.rollback()
finally:
session.close()
update_example2()
# select for update ?
with session.begin():
query = session.query(User).filter(User.name == 'Alice').with_for_update()
user = query.first()
user.age = user.age + 1
删
def delete_example1():
session = ScopedSession()
try:
# 先查询出来再删除,两次数据库操作
del_data = session.query(User).filter_by(id=1).first()
session.delete(del_data)
session.commit()
except Exception as e:
print(e)
session.rollback()
finally:
session.close()
delete_example1()
def delete_example2():
session = ScopedSession()
try:
# 直接删除,一次数据库操作
del_count = session.query(User).filter_by(id=1).delete()
print(f"删除了{del_count}条数据")
session.commit()
except Exception as e:
print(e)
session.rollback()
finally:
session.close()
delete_example2()
查
def get_example1():
"""
根据id查询数据
"""
session = ScopedSession()
# Session.get()获取不到返回None
print(session.get(User, 2))
# Session.get_one()获取不到抛出异常
print(session.get_one(User, 3))
# 基于Query的get,2.0版本建议使用session.get()
print(session.query(User).get(4))
# query().get()不允许查询指定字段、复杂条件等语法
# session.query(User.name, User.age).get(4)
# session.query(User).filter(User.id != 4).get(4)
get_example1()
def get_example2():
"""
通过Query构造查询,指定字段
"""
session = ScopedSession()
# 直接session.query(User).all()会返回User的对象列表
query: sqlalchemy.orm.query.Query = session.query(User)
users: list = query.all()
print(users)
# [User(self.id=2, self.name='Bob', self.age=17, self.addresses=[]), User(self.id=3, self.name='Alex', self.age=26, self.addresses=[]), User(self.id=4, self.name='Frank', self.age=23, self.addresses=[])]
# session.query(User.name, User.age)返回对应字段的元组列表
query: sqlalchemy.orm.query.Query = session.query(User.name, User.age)
users: list = query.all()
print(users)
# [('Bob', 17), ('Alex', 26), ('Frank', 23)]
# 使用with_entities可以重新设置要查询的字段,在一些情况下可以复用构建的query
query: sqlalchemy.orm.query.Query = session.query(User.name)
query = query.with_entities(User.age)
users: list = query.all()
print(users)
# [(14,), (17,), (26,), (23,)]
# 可以先查询User然后使用with_entitiesz
query: sqlalchemy.orm.query.Query = session.query(User)
users: list = query.with_entities(User.name, User.age).all()
print(users)
# [('Jack', 14), ('Bob', 17), ('Alex', 26), ('Frank', 23)]
get_example2()
def get_example3():
"""
通过Query构造查询,过滤条件
"""
session = ScopedSession()
query: sqlalchemy.orm.query.Query = session.query(User)
# filter_by基于kw进行过滤,仅适用于field=value形式的过滤,没有类似于Django Model中丰富的关键字语法
print(query.filter_by(name="Bob").all())
# filter基于model构造查询条件,支持复杂条件
# 风格1:query.filter(User.name == 'Frank', User.age != 23).all()
# 风格2:query.filter(User.name.is('Frank'), User.age.isnot(23)).all()
# sqlalchemy/sql/operators.py
# 相等与不等
print(query.filter(User.name == 'Frank', User.age != 23).all())
print(query.filter(User.name == None).all())
print(query.filter(User.name.is_(None)).all())
print(query.filter(User.name != None).all())
print(query.filter(User.name.isnot(None)).all())
# 模糊条件
print(query.filter(User.name.like('B%'), User.name.notlike('%k')).all())
# in 、 not in
print(query.filter(User.name.in_(['Bob', 'Frank']), ~User.age.in_([17, 13])).all())
# 与
print(query.filter(and_(User.name == 'Bob', User.age == 17)).all())
print(query.filter(User.name == 'Bob', User.age == 17).all())
print(query.filter(User.name == 'Bob').filter(User.age == 17).all())
# 或
print(query.filter(or_(User.name == 'Bob', User.age == 17)).all())
# 非
# not_与~等价
print(query.filter(not_(User.name == 'Bob')).all())
print(query.filter(~(User.name == 'Bob')).all())
print(query.filter(~and_(User.name == 'Bob', User.age == 13)).all())
get_example3()
def get_example4():
"""
直接执行SQL
"""
session = ScopedSession()
print(session.execute(text("select * from t_user where id=:id"), {"id": 4}).fetchall())
# [(4, 'Frank', 23)]
get_example4()
def get_example5():
"""
多表查询
"""
session = ScopedSession()
# SELECT t_user.name AS t_user_name, t_address.country AS t_address_country FROM t_user, t_address WHERE t_user.id = t_address.user_id
print(session.query(User.name, Address.country).filter(User.id == Address.user_id).all())
# SELECT t_user.name AS t_user_name, t_address.country AS t_address_country FROM t_user INNER JOIN t_address ON t_user.id = t_address.user_id
print(session.query(User.name, Address.country).join(Address, User.id == Address.user_id).all())
# SELECT t_user.name AS t_user_name, t_address.country AS t_address_country FROM t_user LEFT OUTER JOIN t_address ON t_user.id = t_address.user_id
print(session.query(User.name, Address.country).outerjoin(Address, User.id == Address.user_id).all())
# SELECT anon_1.t_user_name AS anon_1_t_user_name FROM (SELECT t_user.name AS t_user_name FROM t_user UNION SELECT t_address.country AS t_address_country FROM t_address) AS anon_1
print(session.query(User.name).union(session.query(Address.country)).all())
get_example5()
def get_example6():
"""
Query的结果集
"""
session = ScopedSession()
query: sqlalchemy.orm.query.Query = session.query(User)
# 结果集中包含满足条件数据中的所有数据,没有数据则为None
print(query.all())
# 结果集中包含满足条件数据中的第一条数据,没有数据则为None
print(query.first())
# 结果集中包含恰好满足条件的唯一的一条数据,若没有查到数据或者有多条满足条件的数据都会抛出异常
try:
print(query.one())
except Exception as e:
print(e)
# 结果集中包含恰好满足条件的唯一的一条数据,若没有查到数据则为None,有多条满足条件的数据会抛出异常
try:
print(query.one_or_none())
except Exception as e:
print(e)
get_example6()
def get_example7():
"""
排序
"""
session = ScopedSession()
query: sqlalchemy.orm.query.Query = session.query(User)
# 升序
print(query.order_by(User.id).all())
# 降序
print(query.order_by(-User.id).all())
# 多条件排序
print(query.order_by(-User.id, User.name).all())
get_example7()
def get_example8():
"""
分组
"""
session = ScopedSession()
query: sqlalchemy.orm.query.Query = session.query(User.name)
print(query.group_by(User.name).all())
get_example8()
def get_example9():
"""
聚合函数
:return:
"""
session = ScopedSession()
print(session.query(
func.max(User.age),
func.min(User.age),
func.sum(User.age),
func.avg(User.age),
func.count(User.age),
).all())
get_example9()
def get_example10():
# 去重
session = ScopedSession()
print(session.query(User.name, User.age).distinct().all())
get_example10()
def get_example11():
"""
分页
"""
session = ScopedSession()
query: sqlalchemy.orm.query.Query = session.query(User)
# 偏移分页
print(query.offset(1).limit(1).all())
# 使用slice()偏移分页,等价于 limit start, end - start
print(session.query(User).slice(1, 2).all())
# 该方法并不能在数据库层面实现分页,而是全部返回数据由Python切片
# session.query(User).all()[1:2]
get_example11()
事务操作分析
def transaction_example1():
"""
同一个session不允许开启一个事务的时候再开启另一个事务
"""
session = ScopedSession()
session.begin()
session.add(User(name='Jack'))
session.begin()
session.add(User(name='Jack'))
session.commit()
def transaction_example2():
"""
两个session开启两个事务
"""
Session = sessionmaker(bind=engine, autobegin=False)
session1, session2 = Session(), Session()
session1.begin()
session1.add(User(name='Jack'))
session2.begin()
session2.add(User(name='Jack'))
session1.commit()
session2.commit()
def transaction_example3():
"""
在同一个session中不允许先尝试开启嵌套事务然后开启一般事务
"""
session = ScopedSession()
session.begin(nested=True)
session.add(User(name='Jack'))
session.begin()
session.add(User(name='Jack'))
session.commit()
def transaction_example4():
"""
在同一个session中允许先尝试开启一般事务然后开启嵌套事务
"""
session = ScopedSession()
session.begin()
session.add(User(name='Jack'))
session.begin(nested=True)
session.add(User(name='Jack'))
session.commit()
def transaction_example5():
name = "transaction_example5"
Session = sessionmaker(bind=engine, autobegin=False)
session1, session2 = Session(), Session()
session1.begin()
user = session1.query(User).filter(User.name == name).first()
if user:
raise Exception("测试前提:库中不应当有这个数据")
with session2.begin():
# session2添加该数据
session2.add(User(name=name))
user = session1.query(User).filter(User.name == name).first()
if not user:
raise Exception(
f'测试结果:由于隔离级别{session1.connection().get_isolation_level()}限制,session1中查不到session2中的数据')
session1.commit()
def transaction_example6():
"""
begin并非在数据库层面开启事务
"""
Session = sessionmaker(bind=engine, autobegin=False)
session1, session2 = Session(), Session()
session2.begin()
# begin之后执行完毕session1事务,session2的事务可以获取到数据
session1.begin()
session1.add(User(name='Jane'))
session1.commit()
session2.query(User).filter(User.name == "Jane").one()
session2.commit()
SQL 打印
SQLAlchemy 中打印 SQL 语句,flask-sqlalchemy中同样适用。
方法一,设置配置
SQLALCHEMY_ECHO=True
方法二,直接打印
str(session.query(model.Name).order_by(model.Name.value))
Glossary
unit of work pattern
一种软件架构,持久化系统(例如object relational mapper)维护一系列对象的更改列表,并定期将所有这些待处理的更改刷新到数据库中。
SQLALchemy中的Session实现了工作单元模式,使用相关方法(例如Session.add())将object添加到Session,随后以工作单元模式风格进行持久化。
attached
表明ORM object关联到了某一个Session
expire//expiring/expired
在 SQLAlchemy ORM 中,指persistent对象(有时是detached对象)中的数据被擦除时,当下次访问该对象的属性时,将发出懒加载 SQL 查询,以刷新当前正在进行的事务中存储的该对象的数据。
cascade
SQLAlchemy 中的一个术语,用于描述在特定对象上进行的 ORM 持久化操作如何扩展到与该对象直接关联的其他对象。在 SQLAlchemy 中,这些对象关联是使用 relationship() 结构配置的。 relationship() 包含一个称为 relationship.cascade 的参数,它提供了某些持久化操作如何级联的选项。
ORM mapped class
A mapped class typically refers to a single particular database table, the name of which is indicated by using the __tablename__ class-level attribute.
BEGIN (implicit)
You might have noticed the log line “BEGIN (implicit)” at the start of a transaction block. “implicit” here means that SQLAlchemy did not actually send any command to the database; it just considers this to be the start of the DBAPI’s implicit transaction. You can register event hooks to intercept this event, for example.
- 隐式事务:SQLAlchemy 自动处理事务的开始和结束,适用于简单的数据库操作。日志中的“BEGIN (implicit)”表示事务已开始但未发送显式命令。
- 显式事务:开发者通过显式调用事务方法begin来控制事务的生命周期,适用于需要更高控制度的复杂操作。
SQLAlchemy 使用注意
1、查询结果和数据库数据不一致
现象:数据id==1的数据的name==“test”,事务A 修改 id==1的数据name改为“hello world”并commit,事务B 在事务A执行玩之前 查询 id==1 的name==“test”,等事务A执行完成,再从查询id==1的name还是“test”,而不是“hello world”
原因:由于数据事务机制(参考:彻底搞懂 MySQL 事务的隔离级别-阿里云开发者社区),导致事务B在第一次查询的时候,真正请求数据库进行查询,但后面的查询直接读取的已经缓存在内存中的数据,而没有真正去数据库中查询
解决方案:
在每次query之后执行commit
2、长时间未请求连接自动断开
现象:长时间服务端没有连接数据库,数据库连接自动断开
原因:1、sqlalchemy在create_engine时,使用连接池并没有指定连接池回收时间,则连接池的连接不会自动被回收,并默认使用QueuePool进行连接池管理,调用session.close(),不会断开连接,2、数据库,例如mysql会设置一个wait_timeout(默认8个小时),当连接空闲8个小时,则自动断开
解决方案:
- 方案一:修改数据库的设置 wait_timeout(不推荐)
- 方案二:新建连接池时,设置连接回收时间,使这个值小于wait_timeout,sqlchemy的create_engine一些重要参数如下:
create_engine重要参数:
pool_size:连接数,采用了惰性思想,例如:pool_size=10,如果项目中只使用了5个,则连接池中的连接数,只有5个,但当项目同时使用了10个连接,则后续连接池中的连接数为10个
max_overflow:超出连接数时,允许再新建的连接数,例如:pool_size=10,max_overflow=8,最大连接数18个,但其中8个不在使用时,直接回收,连接池中的连接数为10个
pool_timeout:等待可用连接时间,超时则报错,默认为30秒
pool_recycle:连接生存时长,超过则该连接被回收,再生存新连接,可把这个值改成小于wait_timeout;设置-1时,则不回收连接
- 方案三:不使用链接池,在create_engine时指定连接池为NullPool,则使用session.close()后断开数据库链接
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool
class DbHandler(object):
def __init__(self):
self.db_addr = "XXXXXX"
def create_session(self):
engine = create_engine(self.db_addr, poolclass=NullPool)
session = sessionmaker(bind=engine)()
return session
3、全局session在多进程下被断开
想象:sqlalchemy连接池,经常报错:mysql server has gone away
原因:父进程创建子进程,子进程会使用父进程的数据连接,当子进程执行完成,断开数据库的连接,则全局session和连接被释放,后续要使用连接数据库时,则报错
解决方案:多进程中,尽量没有进程新建连接和session
通过数据库表生成 SQLAlalchemy model
v1
pip install sqlacodegen
pip install sqlalchemy
sqlacodegen --table [表名user] mysql+pymysql://[root]:[password]@127.0.0.1/[db_name]> user.py
sqlacodegen --table t_user mysql+pymysql://root:123456@192.168.1.1/db > user_model.py
v1 flask
pip install flask-sqlacodegen
python -m sqlacodegen.main --flask --outfile models.py mysql+pymysql://<username>:<password>@<database-ip>:<port>/<database-name> [--tables <tablenames>] [--notables]
python -m sqlacodegen.main --flask --outfile models.py mysql+pymysql://root:123456@192.168.1.1/db
v2
pip install sqlacodegen-v2
sqlacodegen_v2 --table t_user mysql+pymysql://root:123456@192.168.1.1/db > user_model.py
sqlalchemy 通过数据库反向生成model
https://www.jiege.tech/extensions/flask-sqlalchemy.html
https://blog.csdn.net/lijianping962464/article/details/125723812
https://blog.51cto.com/u_16175441/8135724
https://blog.csdn.net/qq_45632139/article/details/114286909
https://blog.csdn.net/lijianping962464/article/details/125723812