Python3.x:SQLAlchemy操作数据库
前言
SQLAlchemy是一个ORM框架(Object Rational Mapping,对象关系映射),它可以帮助我们更加优雅、更加高效的实现数据库操作,而且还不限于mysql。
SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
MySQL-Python mysql+mysqldb://: @ [: ]/ pymysql mysql+pymysql:// : @ / [? ] MySQL-Connector mysql+mysqlconnector:// : @ [: ]/ cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]pyodbc sybase+pyodbc:// : @ [/ ]Python-Sybase sybase+pysybase:// : @ /[database name]mxODBC sybase+mxodbc:// : @
SQLAlchemy库安装
pip install sqlalchemy#安装mysqlpip install pymysql#安装mysql-connector2.2.3版本会报错:Unable to find Protobuf include directory.#所以我们指定安装的版本pip install mysql-connector==2.1.4
示例代码(mysql数据库)
# python3# author lizm# datetime 2018-01-28 12:00:00''' Demo:sqlalchemy对mysql数据库的操作'''from sqlalchemy import Column,Integer, String, create_enginefrom sqlalchemy.orm import sessionmakerfrom sqlalchemy.ext.declarative import declarative_baseimport pymysql# 创建对象的基类:Base = declarative_base()# 定义Channel对象:class Channel(Base): # 表名 __tablename__ = 'playback' # 表结构 # Column:行声明,可指定主键 Integer:数据类型 String:数据类型,可指定长度 id = Column(Integer,primary_key=True,autoincrement=True) channel_name = Column(String(45),unique=True, nullable=False) address = Column(String(80),unique=True, nullable=False) service_name = Column(String(45),unique=True, nullable=False) def __init__(self,id,channel_name,address,service_name): self.id = id self.channel_name = channel_name self.address = address self.service_name = service_name# 初始化数据库连接,# 传入参数:数据库类型+连接库+用户名+密码+主机,字符编码,是否打印建表细节engine = create_engine('mysql+mysqlconnector://root:lizm@localhost:3306/pythondb',encoding='utf-8')# 创建表Base.metadata.create_all(engine)# 删除表# Base.metadata.drop_all(engine) # 创建DBSession类型:DBSession = sessionmaker(bind=engine)session = DBSession()try: # 增操作 item1 = Channel(id='1',channel_name='cctv8',address='http://10.10.10.1/cctv8',service_name='news') session.add(item1) item2 = Channel(id='2',channel_name='cctv10',address='http://10.10.10.1/cctv10',service_name='sports') session.add(item2) item3 = Channel(id='3',channel_name='cctv12',address='http://10.10.10.1/cctv12',service_name='economics') session.add(item3) #提交数据 session.commit()except Exception as e: session.rollback()finally: #关闭 session.close()# 查操作session1 = DBSession()# 输出sql 语句print("查询sql语句:%s"%session1.query(Channel).filter(Channel.id < '3'))# 返回的是一个类似列表的对象channel = session1.query(Channel).filter(Channel.id < '3').all()for i in range(len(channel)): print(channel[i].id) print(channel[i].channel_name) print(channel[i].address) print(channel[i].service_name)session1.close()# 改操作session2 = DBSession()session2.query(Channel).filter(Channel.id == '2').update({Channel.service_name: 'movie',Channel.address: '127.0.0.1'}, synchronize_session=False)session2.commit()session2.close()## 查看修改结果session3 = DBSession()print(session3.query(Channel).filter(Channel.id == '2').one().service_name)session3.close()# 删操作session4 = DBSession()session4.query(Channel).filter(Channel.id == '3').delete()session4.commit()session4.close()
示例代码(sybase数据库:odbc连接方式需要先配置好odbc连接):
# python3# author lizm# datetime 2018-02-28 12:00:00''' Demo:sqlalchemy对sybase数据库的操作'''from selenium import webdriverfrom sqlalchemy import Column,Integer,String,DateTime,create_enginefrom sqlalchemy.orm import sessionmakerfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import and_,funcimport pyodbcimport time,datetime# 创建对象的基类:Base = declarative_base()# 定义Channel对象:class Channel(Base): # 表名 __tablename__ = 'shrjj' # 表结构 # Column:行声明,可指定主键 Integer:数据类型 String:数据类型,可指定长度 id = Column(Integer,primary_key=True,autoincrement=True) rpname = Column(String(500), nullable=False) rpdate = Column(String(50)) jjzt = Column(String(255)) fbsjj = Column(String(255)) etf = Column(String(255)) lof = Column(String(255)) fjlof = Column(String(255)) create_date = Column(DateTime,nullable=False) update_date = Column(DateTime,nullable=False) def __init__(self,rpname,rpdate,jjzt,fbsjj,etf,lof,fjlof,create_date,update_date): self.rpname = rpname self.rpdate = rpdate self.jjzt = jjzt self.fbsjj = fbsjj self.etf = etf self.lof = lof self.fjlof = fjlof self.create_date = create_date self.update_date = update_date# 初始化数据库连接,# 传入参数:数据库类型+连接库+用户名+密码+主机,字符编码,是否打印建表细节engine = create_engine('sybase+pyodbc://username:password@dns')# 创建表#Base.metadata.create_all(engine)# 创建DBSession类型:DBSession = sessionmaker(bind=engine)# 查操作session1 = DBSession()# 输出sql 语句print("查询sql语句:%s"%session1.query(Channel).filter(Channel.rpdate == '2018-02-12'))# 返回的是一个类似列表的对象channel = session1.query(Channel).filter(Channel.rpdate == '2018-02-12').all()for i in range(len(channel)): print(channel[i].rpname) print(channel[i].rpdate) print(channel[i].jjzt)session1.close()# 改操作session2 = DBSession()session2.query(Channel).filter(Channel.rpdate == '2018-02-12').update({Channel.jjzt: '0'}, synchronize_session=False)session2.commit()session2.close()# 查看修改结果session3 = DBSession()print(session3.query(Channel).filter(Channel.rpdate == '2018-02-12').one().jjzt)session3.close()#新增数据操作session4 = DBSession()# 增操作item1 = Channel(rpname='市价总值',rpdate='2018-02-29',jjzt='1',fbsjj='1',etf='1',lof='1',fjlof='1',create_date=time.strftime('%Y-%m-%d %H:%M:%S'),update_date=time.strftime('%Y-%m-%d %H:%M:%S'))session4.add(item1)#提交数据session4.commit()#关闭session4.close()
SQLAlchemy中列类型.配置选项和关系选项
常见的列类型: 类型名称 python类型 描述Integer int 常规整形,通常为32位SmallInteger int 短整形,通常为16位BigInteger int或long 精度不受限整形Float float 浮点数Numeric decimal.Decimal 定点数String str 可变长度字符串Text str 可变长度字符串,适合大量文本Unicode unicode 可变长度Unicode字符串Boolean bool 布尔型Date datetime.date 日期类型Time datetime.time 时间类型Interval datetime.timedelta 时间间隔Enum str 字符列表PickleType 任意Python对象 自动Pickle序列化LargeBinary str 二进制 常见的SQLALCHEMY列选项:可选参数 描述primary_key 如果设置为True,则为该列表的主键unique 如果设置为True,该列不允许相同值index 如果设置为True,为该列创建索引,查询效率会更高nullable 如果设置为True,该列允许为空。如果设置为False,该列不允许空值default 定义该列的默认值
常见sqlalchemy查询
#简单查询print(session.query(User).all())print(session.query(User.name, User.fullname).all())print(session.query(User, User.name).all()) #带条件查询print(session.query(User).filter_by(name='user1').all())print(session.query(User).filter(User.name == "user").all())print(session.query(User).filter(User.name.like("user%")).all()) #多条件查询from sqlalchemy import and_print(session.query(User).filter(and_(User.name.like("user%"), User.fullname.like("first%"))).all())from sqlalchemy import or_print(session.query(User).filter(or_(User.name.like("user%"), User.password != None)).all()) #sql过滤print(session.query(User).filter("id>:id").params(id=1).all()) #关联查询 print(session.query(User, Address).filter(User.id == Address.user_id).all())print(session.query(User).join(User.addresses).all())print(session.query(User).outerjoin(User.addresses).all()) #聚合查询print(session.query(User.name, func.count('*').label("user_count")).group_by(User.name).all())print(session.query(User.name, func.sum(User.id).label("user_id_sum")).group_by(User.name).all()) #子查询stmt = session.query(Address.user_id, func.count('*').label("address_count")).group_by(Address.user_id).subquery()print(session.query(User, stmt.c.address_count).outerjoin((stmt, User.id == stmt.c.user_id)).order_by(User.id).all()) #existsprint(session.query(User).filter(exists().where(Address.user_id == User.id)))print(session.query(User).filter(User.addresses.any()))# 限制ret = session.query(Users)[1:2]# 排序ret = session.query(Users).order_by(Users.name.desc()).all()ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()# 分组from sqlalchemy.sql import funcret = session.query(Users).group_by(Users.extra).all()ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).all()ret = session.query( func.max(Users.id), func.sum(Users.id),func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()# 连表ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()ret = session.query(Person).join(Favor).all()ret = session.query(Person).join(Favor, isouter=True).all()# 组合q1 = session.query(Users.name).filter(Users.id > 2)q2 = session.query(Favor.caption).filter(Favor.nid < 2)ret = q1.union(q2).all()q1 = session.query(Users.name).filter(Users.id > 2)q2 = session.query(Favor.caption).filter(Favor.nid < 2)ret = q1.union_all(q2).all()
注意:SQLAlchemy中的model必须要有主键,才可以使用;
作者:整合侠
链接:来源:博客园著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。