删除案例:
from sqlalchemy.orm import scoped_session from sqlalchemy.orm import sessionmaker # 导入引擎,模型表等 from models import * # 通过Session绑定引擎和数据库建立关系 Session = sessionmaker(bind=engine) # 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离 session = scoped_session(Session) session.query(Users).filter_by(id=2).delete() # 提交 session.commit() # 关闭链接 session.close() 批量增加批量增加:
from sqlalchemy.orm import scoped_session from sqlalchemy.orm import sessionmaker # 导入引擎,模型表等 from models import * # 通过Session绑定引擎和数据库建立关系 Session = sessionmaker(bind=engine) # 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离 session = scoped_session(Session) # 批量增加 session.add_all([ Users(name="user002",age=21,phone="13269867233",addr="ShangHai"), Users(name="user003",age=18,phone="13269867234",addr="GuangZhou"), Users(name="user003",age=24,phone="13269867235",addr="ChongQing"), ]) # 提交 session.commit() # 关闭链接 session.close() 单表查询 基本查询基本查询:
from sqlalchemy.orm import scoped_session from sqlalchemy.orm import sessionmaker # 导入引擎,模型表等 from models import * # 通过Session绑定引擎和数据库建立关系 Session = sessionmaker(bind=engine) # 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离 session = scoped_session(Session) # 查询 # -- 查所有 -- result_01 = session.query(Users).all() # -- 过滤 -- result_02 = session.query(Users).filter(Users.name == "USER001").all() # Python表达式的形式过滤 result_03 = session.query(Users).filter_by(name="user002").all() # ORM形式过滤 result_04 = session.query(Users).filter_by(name="user003").first() # ORM形式过滤 取第一个 print(result_01) # [<models.Users>,<models.Users>,<models.Users>] print(result_02) print(result_03) print(result_04) # object:<id:3 name:user003> 通过__str__拿到结果 # 提交 session.commit() # 关闭链接 session.close() 其他过滤条件查询:
from sqlalchemy.orm import scoped_session from sqlalchemy.orm import sessionmaker # 导入引擎,模型表等 from models import * # 通过Session绑定引擎和数据库建立关系 Session = sessionmaker(bind=engine) # 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离 session = scoped_session(Session) # 只拿某字段 result_00 = session.query(Users.name,Users.age).first() print(result_00) # and(用逗号或者用and_) result_01 = session.query(Users).filter( Users.id > 1,Users.age < 23).all() print(result_01) from sqlalchemy import and_ result_02 = session.query(Users).filter(and_( Users.id > 1,Users.age < 23)).all() print(result_02) # or from sqlalchemy import or_ result_03 = session.query(Users).filter(or_(Users.id > 3,Users.age < 23)).all() print(result_03) # and与or的组合使用 result_04 = session.query(Users).filter(or_( Users.id > 1, and_(Users.id > 2, Users.age < 24) )).all() print(result_04) # 范围 result_05 = session.query(Users).filter(Users.age.between(18,24)).all() print(result_05) # 包含 result_06 = session.query(Users).filter(Users.age.in_([18,21,24])).all() print(result_06) # 取反 ~ result_07 = session.query(Users).filter(~Users.age.in_([18,21,24])).all() print(result_07) # 通配符 result_08 = session.query(Users).filter(Users.name.like("us%")).all() print(result_08) # 分页 result_09 = session.query(Users).all()[0:1] print(result_09) # 排序 result_10 = session.query(Users).order_by(Users.id.desc()).all() # 倒序 print(result_10) result_11 = session.query(Users).order_by(Users.id.asc()).all() # 正序 print(result_11) # 分组 result_12 = session.query(Users).group_by(Users.id).all() print(result_12) # 聚合函数 from sqlalchemy.sql import func result_13 = session.query( func.max(Users.age), func.sum(Users.age), func.min(Users.age), ).group_by(Users.name).having(func.max(Users.age > 12)).all() print(result_13) # 提交 session.commit() # 关闭链接 session.close() 多表相关 一对多首先是建立一对多的关系,使用relationship做逻辑一对多,不会在物理表中创建关系,但是可以通过该字段进行增删改查:
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import Column, Integer, String, ForeignKey from sqlalchemy.orm import relationship from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base # 基础类 Base = declarative_base() # 创建引擎 engine = create_engine( "mysql+pymysql://root@127.0.0.1:3306/db1?charset=utf8", # "mysql+pymysql://root:123@127.0.0.1:3306/db1?charset=utf8", # 有密码时 max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) class Classes(Base): __tablename__ = "classes" id = Column(Integer, primary_key=True) name = Column(String(32), nullable=False) class Students(Base): __tablename__ = "students" id = Column(Integer, primary_key=True) name = Column(String(32), nullable=False) # 真实约束字段:避免脏数据写入,在物理表中会创建真实字段关系 # 可选级联操作:CASCADE,DELETE、RESTRICT fk_class = Column(Integer, ForeignKey("classes.id",ondelete="CASCADE",onupdate="CASCADE")) # 逻辑关系字段:不会在真实物理表中创建字段,但是可以通过该逻辑字段进行增删改查 # backref:反向查询的名字 re_class = relationship("Classes",backref="students") def create_tb(): """ 创建表 :return: """ Base.metadata.create_all(engine) def drop_tb(): """ 删除表 :return: """ Base.metadata.drop_all(engine) if __name__ == '__main__': drop_tb() create_tb()