博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python3.x:SQLAlchemy操作数据库
阅读量:5147 次
发布时间:2019-06-13

本文共 9142 字,大约阅读时间需要 30 分钟。

Python3.x:SQLAlchemy操作数据库

前言

SQLAlchemy是一个ORM框架(Object Rational Mapping,对象关系映射),它可以帮助我们更加优雅、更加高效的实现数据库操作,而且还不限于mysql。

SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:

MySQL-Python    mysql+mysqldb://
:
@
[:
]/
pymysql mysql+pymysql://
:
@
/
[?
] MySQL-Connector mysql+mysqlconnector://
:
@
[:
]/
cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]pyodbc sybase+pyodbc://
:
@
[/
]Python-Sybase sybase+pysybase://
:
@
/[database name]mxODBC sybase+mxodbc://
:
@

SQLAlchemy库安装

pip install sqlalchemy#安装mysqlpip install pymysql#安装mysql-connector2.2.3版本会报错:Unable to find Protobuf include directory.#所以我们指定安装的版本pip install mysql-connector==2.1.4

示例代码(mysql数据库)

# python3# author lizm# datetime 2018-01-28 12:00:00'''    Demo:sqlalchemy对mysql数据库的操作'''from sqlalchemy import Column,Integer, String, create_enginefrom sqlalchemy.orm import sessionmakerfrom sqlalchemy.ext.declarative import declarative_baseimport pymysql# 创建对象的基类:Base = declarative_base()# 定义Channel对象:class Channel(Base):    # 表名    __tablename__ = 'playback'    # 表结构    # Column:行声明,可指定主键 Integer:数据类型 String:数据类型,可指定长度     id = Column(Integer,primary_key=True,autoincrement=True)    channel_name = Column(String(45),unique=True, nullable=False)    address = Column(String(80),unique=True, nullable=False)    service_name = Column(String(45),unique=True, nullable=False)    def __init__(self,id,channel_name,address,service_name):        self.id = id        self.channel_name = channel_name        self.address = address        self.service_name = service_name# 初始化数据库连接,# 传入参数:数据库类型+连接库+用户名+密码+主机,字符编码,是否打印建表细节engine = create_engine('mysql+mysqlconnector://root:lizm@localhost:3306/pythondb',encoding='utf-8')# 创建表Base.metadata.create_all(engine)# 删除表# Base.metadata.drop_all(engine) # 创建DBSession类型:DBSession = sessionmaker(bind=engine)session = DBSession()try:    # 增操作    item1 = Channel(id='1',channel_name='cctv8',address='http://10.10.10.1/cctv8',service_name='news')    session.add(item1)    item2 = Channel(id='2',channel_name='cctv10',address='http://10.10.10.1/cctv10',service_name='sports')    session.add(item2)    item3 = Channel(id='3',channel_name='cctv12',address='http://10.10.10.1/cctv12',service_name='economics')    session.add(item3)    #提交数据    session.commit()except Exception as e:    session.rollback()finally:    #关闭    session.close()# 查操作session1 = DBSession()# 输出sql 语句print("查询sql语句:%s"%session1.query(Channel).filter(Channel.id < '3'))# 返回的是一个类似列表的对象channel = session1.query(Channel).filter(Channel.id < '3').all()for i in range(len(channel)):    print(channel[i].id)    print(channel[i].channel_name)    print(channel[i].address)    print(channel[i].service_name)session1.close()# 改操作session2 = DBSession()session2.query(Channel).filter(Channel.id == '2').update({Channel.service_name: 'movie',Channel.address: '127.0.0.1'}, synchronize_session=False)session2.commit()session2.close()## 查看修改结果session3 = DBSession()print(session3.query(Channel).filter(Channel.id == '2').one().service_name)session3.close()# 删操作session4 = DBSession()session4.query(Channel).filter(Channel.id == '3').delete()session4.commit()session4.close()

 

示例代码(sybase数据库:odbc连接方式需要先配置好odbc连接):

# python3# author lizm# datetime 2018-02-28 12:00:00'''    Demo:sqlalchemy对sybase数据库的操作'''from selenium import webdriverfrom sqlalchemy import Column,Integer,String,DateTime,create_enginefrom sqlalchemy.orm import sessionmakerfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import and_,funcimport pyodbcimport time,datetime# 创建对象的基类:Base = declarative_base()# 定义Channel对象:class Channel(Base):    # 表名        __tablename__ = 'shrjj'    # 表结构    # Column:行声明,可指定主键 Integer:数据类型 String:数据类型,可指定长度     id = Column(Integer,primary_key=True,autoincrement=True)    rpname = Column(String(500), nullable=False)    rpdate = Column(String(50))    jjzt = Column(String(255))    fbsjj = Column(String(255))    etf = Column(String(255))    lof = Column(String(255))    fjlof = Column(String(255))    create_date = Column(DateTime,nullable=False)    update_date = Column(DateTime,nullable=False)    def __init__(self,rpname,rpdate,jjzt,fbsjj,etf,lof,fjlof,create_date,update_date):        self.rpname = rpname        self.rpdate = rpdate        self.jjzt = jjzt        self.fbsjj = fbsjj        self.etf = etf        self.lof = lof        self.fjlof = fjlof        self.create_date = create_date        self.update_date = update_date# 初始化数据库连接,# 传入参数:数据库类型+连接库+用户名+密码+主机,字符编码,是否打印建表细节engine = create_engine('sybase+pyodbc://username:password@dns')# 创建表#Base.metadata.create_all(engine)# 创建DBSession类型:DBSession = sessionmaker(bind=engine)# 查操作session1 = DBSession()# 输出sql 语句print("查询sql语句:%s"%session1.query(Channel).filter(Channel.rpdate == '2018-02-12'))# 返回的是一个类似列表的对象channel = session1.query(Channel).filter(Channel.rpdate == '2018-02-12').all()for i in range(len(channel)):    print(channel[i].rpname)    print(channel[i].rpdate)    print(channel[i].jjzt)session1.close()# 改操作session2 = DBSession()session2.query(Channel).filter(Channel.rpdate == '2018-02-12').update({Channel.jjzt: '0'}, synchronize_session=False)session2.commit()session2.close()# 查看修改结果session3 = DBSession()print(session3.query(Channel).filter(Channel.rpdate == '2018-02-12').one().jjzt)session3.close()#新增数据操作session4 = DBSession()# 增操作item1 = Channel(rpname='市价总值',rpdate='2018-02-29',jjzt='1',fbsjj='1',etf='1',lof='1',fjlof='1',create_date=time.strftime('%Y-%m-%d %H:%M:%S'),update_date=time.strftime('%Y-%m-%d %H:%M:%S'))session4.add(item1)#提交数据session4.commit()#关闭session4.close()

SQLAlchemy中列类型.配置选项和关系选项

常见的列类型: 类型名称       python类型          描述Integer       int                常规整形,通常为32位SmallInteger  int                短整形,通常为16位BigInteger    int或long          精度不受限整形Float         float              浮点数Numeric       decimal.Decimal    定点数String        str                可变长度字符串Text          str                可变长度字符串,适合大量文本Unicode       unicode            可变长度Unicode字符串Boolean       bool               布尔型Date          datetime.date      日期类型Time          datetime.time      时间类型Interval      datetime.timedelta 时间间隔Enum          str                字符列表PickleType    任意Python对象      自动Pickle序列化LargeBinary   str                二进制 常见的SQLALCHEMY列选项:可选参数       描述primary_key   如果设置为True,则为该列表的主键unique        如果设置为True,该列不允许相同值index         如果设置为True,为该列创建索引,查询效率会更高nullable      如果设置为True,该列允许为空。如果设置为False,该列不允许空值default       定义该列的默认值

 常见sqlalchemy查询

#简单查询print(session.query(User).all())print(session.query(User.name, User.fullname).all())print(session.query(User, User.name).all())    #带条件查询print(session.query(User).filter_by(name='user1').all())print(session.query(User).filter(User.name == "user").all())print(session.query(User).filter(User.name.like("user%")).all())    #多条件查询from sqlalchemy import and_print(session.query(User).filter(and_(User.name.like("user%"), User.fullname.like("first%"))).all())from sqlalchemy import or_print(session.query(User).filter(or_(User.name.like("user%"), User.password != None)).all())    #sql过滤print(session.query(User).filter("id>:id").params(id=1).all())    #关联查询 print(session.query(User, Address).filter(User.id == Address.user_id).all())print(session.query(User).join(User.addresses).all())print(session.query(User).outerjoin(User.addresses).all())    #聚合查询print(session.query(User.name, func.count('*').label("user_count")).group_by(User.name).all())print(session.query(User.name, func.sum(User.id).label("user_id_sum")).group_by(User.name).all())    #子查询stmt = session.query(Address.user_id, func.count('*').label("address_count")).group_by(Address.user_id).subquery()print(session.query(User, stmt.c.address_count).outerjoin((stmt, User.id == stmt.c.user_id)).order_by(User.id).all())    #existsprint(session.query(User).filter(exists().where(Address.user_id == User.id)))print(session.query(User).filter(User.addresses.any()))# 限制ret = session.query(Users)[1:2]# 排序ret = session.query(Users).order_by(Users.name.desc()).all()ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()# 分组from sqlalchemy.sql import funcret = session.query(Users).group_by(Users.extra).all()ret = session.query(    func.max(Users.id),    func.sum(Users.id),    func.min(Users.id)).group_by(Users.name).all()ret = session.query(    func.max(Users.id),    func.sum(Users.id),func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()# 连表ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()ret = session.query(Person).join(Favor).all()ret = session.query(Person).join(Favor, isouter=True).all()# 组合q1 = session.query(Users.name).filter(Users.id > 2)q2 = session.query(Favor.caption).filter(Favor.nid < 2)ret = q1.union(q2).all()q1 = session.query(Users.name).filter(Users.id > 2)q2 = session.query(Favor.caption).filter(Favor.nid < 2)ret = q1.union_all(q2).all()

 注意:SQLAlchemy中的model必须要有主键,才可以使用;

 

作者:整合侠

链接:
来源:博客园
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

 

转载于:https://www.cnblogs.com/lizm166/p/8370633.html

你可能感兴趣的文章
C语言进阶——const 和 volatile 分析09
查看>>
字符串的查找删除
查看>>
NOI2018垫底记
查看>>
快速切题 poj 1002 487-3279 按规则处理 模拟 难度:0
查看>>
判断线段是否相交
查看>>
Codeforces Round #277 (Div. 2)
查看>>
一步步学Mybatis-搭建最简单的开发环境-开篇(1)
查看>>
微信小程序图片上传
查看>>
【更新】智能手机批量添加联系人
查看>>
NYOJ-128前缀式计算
查看>>
centos6.7 配置外网端口映射
查看>>
淡定,啊。数据唯一性
查看>>
java并发编程之lock锁
查看>>
深入理解 JavaScript 事件循环(一)— event loop
查看>>
Hive(7)-基本查询语句
查看>>
Redis快速入门
查看>>
注意java的对象引用
查看>>
C++ 面向对象 类成员函数this指针
查看>>
inline函数的总结
查看>>
SPSS-生存分析
查看>>