- 原文链接: https://chowyi.com/SQLAlchemy初体验/
- 版权声明: 文章采用 CC BY-NC-SA 4.0 协议进行授权,转载请注明出处!
使用Django已经有一段时间了,对Django的ORM印象深刻,它简单易用,也足够灵活。但是在自己做一些小实验小例子时,仅为了ORM而使用Django这么一个web框架显然不太合适。于是我找到了SQLAlchemy。从官方介绍来看,SQLAlchemy是Python下的一款强大且灵活的ORM框架及SQL工具集。那么我就从官方教程入手,一边学习,一边感受一下吧。
安装
使用pip或easy_install即可安装最新版本的SQLAlchemy。我安装的是当前的最新版本1.0.12。
1 | pip install sqlalchemy |
版本检查 Install
1 | import sqlalchemy |
连接数据库 Connecting
教程中使用了仅在内存中的SQLite数据库(in-memory-only SQLite database)。使用create_engine()
来连接:
1 | from sqlalchemy import create_engine |
echo
参数用来设置是否开启SQLAlchemy日志,此功能基于标准的Pythonlogging
模块。开启日志,我们可以看到SQLAlchemy生成的SQL语句。如果你不想看到那么多输出,把它设置为False
就好了。
create_engine()
的返回值是Engine
的一个实例,它代表了到数据库的核心接口,适配不同的数据库和DBAPI。
懒连接 (Lazy Connecting)
需要注意,使用create_engine()
创建出Engine
时,并没有真正的连接数据库,直到调用Engine.execute()
或Engine.connect()
等方法时,Engine
才会真正建立一个连接到数据库,然后发送SQL脚本。
数据库连接字符串
create_engine()
方法基于一个URL来创建Engine
对象。URL遵循RFC-1738,通常包含username,password,hostname,database name等可选参数。典型的URL是这样的:
1 | dialect+driver://username:password@host:port/database |
使用create_engine()
连接几种不同数据库的例子:
Postgresql
Postgresql dialect使用psycopg2作为默认DBAPI,也可以用纯python的pg8000作为替代:
1 | # default |
MySQL
MySQL dialect使用mysql-python作为默认DBAPI。MySQL有许多可用的DBAPI,包括MySQL-connector-python和OurSQL:
1 | # default |
Oracle
Oracle dialect使用cx_oracle作为默认的DBAPI:
1 | engine = create_engine('oracle://scott:tiger@127.0.0.1:1521/sidname') |
Microsoft SQL Server
SQL Server dialect使用pyodbc作为默认的DBAPI,pymssql也是可用的:
1 | # pyodbc |
SQLite
SQLite连接文件数据,默认使用Python内建的sqlite3
。
由于SQLite连接本地文件,所以URL格式略有不同。
相对路径:
1 | # sqlite://<nohostname>/<path> |
绝对路径:
1 | #Unix/Mac - 4 initial slashes in total |
使用SQLite内存数据库:
1 | engine = create_engine('sqlite://') |
声明映射 Declare a Mapping
使用ORM时,配置过程从描述数据表开始,然后定义我们将要映射到这些表的类。在现代的SQLAlchemy中,这两个任务通常被放在一起。使用Declarative系统,可以让我们创建包含了表描述的类。
使用Declarative System的类映射被定义在declarative base class中,它包含了类和表的关系。我们的项目中通常只需要一个该类的实例。我们通过declarative_base()
来创建base类,像下面这样:
1 | from sqlalchemy.ext.declarative import declarative_base |
有了”base”,我们就可以基于它定义其他需要映射的类了。
下面的示例展示了把User
类映射到users
数据表,类的定义包含了表明,列名及数据类型:
1 | from sqlalchemy import Column, Integer, String |
Tip__repr__()
方法的定义是可选的,我们实现它仅仅是为了以友好的格式展示User
对象。`
Session
Session
用来处理ORM与数据库之间的交流。当我们第一次建立起项目时,我们需要定义一个Session
类用来创建新的Session
对象:
1 | from sqlalchemy.orm import sessionmaker |
如果你还没有Engine
对象,你也可以在创建Engine
对象后把它连接到Session
:
1 | Session = sessionmaker() |
通过Session
类可以创建出绑定到我们数据库的Session
对象。
1 | >>> session = Session() |
这个session
对象关联了我们的engine
,但它还没有打开任何连接。当它第一次被使用的时候,它会从engine
维护的连接池中获取一个连接,它保持连接直到我们提交了(commit)所有的更改或者关闭了session对象。
添加和更新对象
为了持久化我们的User
对象,我们使用add()
方法把它加入到Session
中:
1 | >>> ed_user = User(name='ed', fullname='Ed Jones', password='edspassword') |
这时,我们称这个对象为pending状态。现在还没有SQL脚本被发送到数据库,这个对象也没有被存到数据库中。Session
会在需要的时候发出SQL把对象保存到数据库中,这一过程叫做flush。当我们在数据库中查询Ed Jones
时,所有pending状态的信息会首先flushed,然后立刻执行查询。
举个例子,我们创建一个查询,查到的结果与我们添加的对象是相等的:
1 | >>> our_user = session.query(User).filter_by(name='ed').first() # doctest:+NORMALIZE_WHITESPACE |
我们还可以使用add_all()
方法一次添加多个User
对象:
1 | >>> session.add_all([ |
现在我们觉得Ed的密码不够安全,我们来改一下:
1 | ed_user.password = 'f8s7ccs' |
当你做出了修改,Session
是知道的:
1 | >>> session.dirty |
还有三个新的User
对象处于pending状态:
1 | >>> session.new # doctest: +SKIP |
通过commit()
方法告知Session
我们想要保存修改并提交此次transaction。Session
会使用UPDATE
语句更改”ed”的密码,并用INSERT
语句添加我们新创建的三个User
对象:
1 | >>> session.commit() |
commit()
会flush所有的改变保存到数据库并提交此次transaction。现在session所引用的连接资源重新回到了连接池中。这个session接下来的操作会维持在一个新的transaction中,在需要的时候会重新获取连接资源。
我们查看一下Ed的id
属性,之前还是None
,现在已经有值了:
1 | ed_user.id |
回滚 Rolling Back
因为Session
的工作使用了transaction,所以我们可以回滚所做的改变。
现在我们改变ed_user
的name,然后再添加一个错误的user对象:
1 | >>> ed_user.name = 'Edwardo' |
查询session,我们可以看到刚才的改变已经被flushed到当前的transaction中了:
1 | >>> session.query(User).filter(User.name.in_(['Edwardo', 'fakeuser'])).all() |
现在回滚,我们可以看到ed_user
的name已经改回了ed
,fake_user
也从session中剔除了:
1 | session.rollback() |
使用SELECT来看看我们到底对数据库做了哪些改变:
1 | >>> session.query(User).filter(User.name.in_(['ed', 'fakeuser'])).all() |
查询 Querying
使用Session
的query()
方法可以创建一个Query
的对象。这个方法可以接受多个参数,可以是classes和class的描述符。下面我们指定一个加载了User
实例的Query
对象。查询结果会返回User
列表:
1 | >>> for instance in session.query(User).order_by(User.id): |
query()
还可以接受基于列的实体作为参数,返回值是tuples的列表:
1 | for name, fullname in session.query(User.name, User.fullname): |
还可以这样:
1 | >>> for row in session.query(User, User.name).all(): |
官方教程这么介绍:
The tuples returned by
Query
are named tuples, supplied by theKeyedTuple
class, and can be treated much like an ordinary Python object. The names are the same as the attribute’s name for an attribute, and the class name for a class
我的疑问
上面列子中的row
用法的确同官方教程中介绍的KeyedTuple
一样,但我在试验中得出的是下面的结果:
1 | from sqlalchemy.util._collections import KeyedTuple |
而且我在源码中并没有找到result这个类,对此表示疑惑。希望知道原因的朋友能指点一下!
你还可以使用label()
来控制查询结果每一列的名字:
1 | for row in session.query(User.name.label('name_label')).all(): |
看了SQL就会发现其实就是AS语句:
1 |
|
还可以给整个实体例如User
起别名,使用aliased()
:
1 | >>> from sqlalchemy.orm import aliased |
Query
的基本操作包含了LIMIT和OFFSET,但更简单的方法是使用python的切片slices:
1 | session.query(User.name).offset(1).limit(2) |
这两种语句的查询结果是一样的,不同的是前者返回的是一个Query
对象,后者是一个list。
毫无疑问,前者使用了SQL中的LIMIT和OFFSET语句。那么后者是从数据库中取出全部结果后再做切片的吗?这还有待考证。
过滤有两种方法,使用filter_by()
或者filter()
。filter_by()
使用关键字参数:
1 | for name, in session.query(User.name).filter_by(fullname='Ed Jones'): |
filter()
使用Python表达式:
1 | for name, in session.query(User.name).filter(User.fullname=='Ed Jones'): |
Query
对象上的大部分方法调用返回的仍是Query
对象,这样你可以添加更多条件,比如过滤两次或更多,它们使用AND
关系连接:
1 | >>> for user in session.query(User).filter(User.name=='ed').filter(User.fullname=='Ed Jones'): |
但是如果你在offset()
和limit()
之后调用filter()
,会引发错误:
1 | session.query(User.name).offset(1).limit(2).filter(User.name=="wendy") |
错误信息解释的也很明白了,你只需要先使用from_self()
方法,像这样:
1 | session.query(User.name).offset(1).limit(2).from_self().filter(User.name=="wendy") |
常见过滤操作符
下面是用在filter()
中的一些常见操作符:
- equals
1
query.filter(User.name == 'ed')
- not equals
1
query.filter(User.name != 'ed')
- LIKE
1
query.filter(User.name.like('%ed%'))
- IN
1
2
3
4
5
6query.filter(User.name.in_(['ed', 'wendy', 'jack']))
# works with query objects too:
query.filter(User.name.in_(
session.query(User.name).filter(User.name.like('%ed%'))
)) - NOT IN
1
query.filter(~User.name.in_(['ed', 'wendy', 'jack']))
- IS NULL
1
2
3
4query.filter(User.name == None)
# alternatively, if pep8/linters are a concern
query.filter(User.name.is_(None)) - IS NOT NULL
1
2
3
4query.filter(User.name != None)
# alternatively, if pep8/linters are a concern
query.filter(User.name.isnot(None)) - AND
1
2
3
4
5
6
7
8
9# use and_()
from sqlalchemy import and_
query.filter(and_(User.name == 'ed', User.fullname == 'Ed Jones'))
# or send multiple expressions to .filter()
query.filter(User.name == 'ed', User.fullname == 'Ed Jones')
# or chain multiple filter()/filter_by() calls
query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones') - OR
1
2from sqlalchemy import or_
query.filter(or_(User.name == 'ed', User.name == 'wendy')) - MATCH 注意
1
query.filter(User.name.match('wendy'))
match()
使用数据库指定的MATCH
或CONTAINS
函数。它的行为因数据库后端不同而不同。在某些数据库中是不能用的,比如SQLite。
返回Lists和Scalars
Query
对象上有一些方法会立刻执行SQL并返回结果。
all()
returns a list:1
2
3
4>>> query = session.query(User).filter(User.name.like('%ed')).order_by(User.id)
>>> query.all()
[<User(name='ed', fullname='Ed Jones', password='f8s7ccs')>,
<User(name='fred', fullname='Fred Flinstone', password='blah')>]first()
返回查询结果中的第一项:1
2>>> query.first()
<User(name='ed', fullname='Ed Jones', password='f8s7ccs')>one()
取所有的行(rows),如果结果不止一个,会报错MultipleResultsFound
。如果结果一个也没有,会报错NoResultFound
。one_or_none()
同one()
相似,只是没有结果是返回None
而不报错。scalar()
会先调用one()
,如果没有报错,会返回行的第一列:1
2
3query = session.query(User.id).filter(User.name == 'ed').order_by(User.id)
query.scalar()
1
使用SQL字符串
Query
的大多数方法还可以通过使用text()
来接受字符串作为参数。例如filter()
和order_by()
:
1 | from sqlalchemy import text |
SQL字符串还可以通过params()
方法指定参数:
1 | >>> session.query(User).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(User.id).one() |
要使用完整的SQL字符串表达式,使用from_statement()
方法:
1 | >>> session.query(User).from_statement( |
计数 Counting
Query
有个便捷的计数方法count()
:
1 | >>> session.query(User).filter(User.name.like('%ed')).count() |
来看看这行代码产生的SQL:
1 | SELECT count(*) AS count_1 |
在某些情况下,使用SELECT count(*) FROM table
会更简单。但现在新版本的SQLAlchemy总是用更加精确的SQL来表达更加准确的含义。
有时我们需要指定对哪些项来计数,可以使用func.count()
表达式:
1 | from sqlalchemy import func |
要产生SELECT count(*) FROM table
这样简单的SQL,我们可以这样做:
1 | >>> session.query(func.count('*')).select_from(User).scalar() |
更简单的做法,可以不使用select_from()
:
1 | >>> session.query(func.count(User.id)).scalar() |
建立关系 Building a Relationship
现在来想想我们的第二张表(table)吧,它与User
关联,可以被映射和查询。假设在我们的系统中用户可以保存多个email地址,那么,我们要将users
表通过一对多的关系关联到一个叫做addresses
的新表上。我们通过定义Address
类来映射出这张表:
1 | from sqlalchemy import ForeignKey |
注意
relationship.back_populates
参数是SQLAlchemy中relationship.backref
的更新版本。relationship.backref
仍然可以使用。relationship.back_populates
除了一点冗长和更易操作,其实是一样的。
与Django的模型定义相比,SQLAlchemy中的写法似乎稍微麻烦了一些,我想应该有它的道理吧,再深入学习也许能体会到。用过Django的同学应该不难看懂。这里官方教程介绍的很详细,我觉得是有点啰嗦了,有兴趣的同学可以仔细看看。
现在我们需要在数据库中创建出addresses
表,因此我们会从metadata中发起一个新的CREATE语句。不必担心,已经创建的表将会被跳过:
1 | >>> Base.metadata.create_all(engine) |
使用关联对象 Working with Related Objects
现在当我们创建一个User
对象,它会有一个空的addresses
容器(collection)。collection可以有多种类型,比如集合set或是字典dictionary,默认情况下是Python的列表list。
1 | >>> jack = User(name='jack', fullname='Jack Bean', password='gjffdd') |
我们可以自由的对User
对象添加Address
对象。
1 | jack.addresses = [ |
添加了关系的元素可以向下面这样双向引用,这种行为通过Python实现而不需要任何SQL:
1 | >>> jack.addresses[1] |
我们把User
对象Jack Bean
commit到数据库,通过叫做级联(cascading)的过程,两个Address
对象也会被保存到数据库中。
1 | >>> session.add(jack) |
现在查询Jack,我们仅仅取回了Jack,没有任何有关Jack的addresses的SQL:
1 | >>> jack = session.query(User).filter_by(name='jack').one() |
查看addresses
容器时,才会发出查询addresses
的SQL:
1 | >>> jack.addresses |
这是关系懒加载(lazy loading relationship)的一个例子。
关联查询 Querying with Joins
现在我们有两张表了,可以演示Query
更多的特性了。
下面我们同时加载User
和Address
,使用Query.filter()
时可以看作它们的列已经关联在一起了:
1 | >>> for u, a in session.query(User, Address).\ |
使用Query.join()
时最简单的办法来达到标准的SQL JOIN语法:
1 | >>> session.query(User).join(Address).filter(Address.email_address=='jack@google.com').all() |
Query.join()
知道该如何连接User
和Address
因为它们之间有且仅有一个外键关系。如果没有外键或是有多个,用下面的几种形式之一,Query.join()
会更好的工作:
1 | query.join(Address, User.id==Address.user_id) # explicit condition |
正如你所期望的,使用outerjoin()
可以达到外关联(outer join):
1 | query.outerjoin(User.addresses) # LEFT OUTER JOIN |
对数据库的各种关联关系我了解的很少,也是需要加强的地方。更详细的文档可以看这里。
使用别名 Using Aliases
多表查询时,有时同一张表会被引用不止一次,这时就需要使用别名(Aliases)。下面我们把Address
关联了两次,来查询两个email地址不一样的用户:
1 | from sqlalchemy.orm import aliased |
Using EXISTS
在SQL中EXISTS关键字是一个布尔运算符,当给定的表达式包含任何行时,就会返回True。在许多情况下它可以用来代替joins。
下面是EXISTS的一个使用实例:
1 | from sqlalchemy.sql import exists |
Query
的几个操作符也会自动的调用EXISTS。上面的例子也可以用下面的方式表达:
1 | for name, in session.query(User.name).\ |
any()
也可以用条件来做限制:
1 | for name, in session.query(User.name).\ |
has()
和any()
类似,它是多对一关系的操作符(注意~
代表NOT):
1 | session.query(Address).\ |
常用关系运算符 Common Relationship Operators
eq() (many-to-one “equals” comparison):
1
query.filter(Address.user == someuser)
ne() (many-to-one “not equals” comparison):
1
query.filter(Address.user != someuser)
IS NULL (many-to-one comparison, also uses eq()):
1
query.filter(Address.user == None)
contains() (used for one-to-many collections):
1
query.filter(User.addresses.contains(someaddress))
any() (used for collections):
1
2
3
4query.filter(User.addresses.any(Address.email_address == 'bar'))
# also takes keyword arguments:
query.filter(User.addresses.any(email_address='bar'))has() (used for scalar references):
1
query.filter(Address.user.has(name='ed'))
Query.with_parent() (used for any relationship):
1
session.query(Address).with_parent(someuser, 'addresses')
删除 Deleting
我们来试着删除jack
看看会发生什么。
1 | >>> session.delete(jack) |
目前为止,一切正常。那Jack的那些Address
对象呢?
1 | session.query(Address).filter( |
看,它们还在!分析SQL我们可以看到每个address的user_id
列被设置为NULL,而此address行并没有删除。除非你明确的指定,SQLAlchemy不会做级联删除。
配置级联删除 Configuring delete/delete-orphan Cascade
我们可以配置User.addresses
关系上的cascade选项来改变这种行为。虽然SQLAlchemy允许你在任何时候添加新的属性或是关系,但是这样需要删除已经存在的关系。所以我们需要推翻之前的关系映射重来。现在关闭Session
:
1 | session.close() |
现在我们重新声明User
和Address
类:
1 | class User(Base): |
注意,这次我们把关系定义在User
类中。
现在,删除Jack的同时也会删除与这个user关联的Address
对象:
1 | session.delete(jack) |
创建多对多的关系 Building a Many To Many Relationship
现在来建立一个多对多的关系。假设我们有一个博客应用,文章BlogPost
和关键词Keyword
就是多对多关系。
我们需要创建一个没有映射的Table
作为中间表。
1 | from sqlalchemy import Table, Text |
然后我们定义BlogPost
和Keyword
:
1 | class BlogPost(Base): |
现在我们还希望BlogPost
类能有一个author
字段:
1 | BlogPost.author = relationship(User, back_populates="posts") |
创建新表:
1 | Base.metadata.create_all(engine) |
添加一些数据:
1 | wendy = session.query(User).filter_by(name='wendy').one() |
下面是几种不同的查询示例:
1 | >>> session.query(BlogPost).filter(BlogPost.keywords.any(keyword='firstpost')).all() |
1 | >>> session.query(BlogPost).\ |
1 | >>> wendy.posts.filter(BlogPost.keywords.any(keyword='firstpost')).all() |
更多参考 Further Reference
后记
这是我第一次接触SQLAlchemy,文章是我一边学习一边记录下来的。文章中的代码大部分来自SQLAlchemy的官方文档,我都亲自做了实验并运行成功。我尽力保证文章的准确性,既是对自己的要求,也不希望因我的错误而误导读者。但因个人技术水平有限,文章中难免会有不准确的地方,希望发现错误的朋友能慷慨向我指出。学习SQLAlchemy还是请以官方文档为准。
- 原文链接: https://chowyi.com/SQLAlchemy初体验/
- 版权声明: 文章采用 CC BY-NC-SA 4.0 协议进行授权,转载请注明出处!