为什么SQLAlchemy插入sqlite比使用sqlite3直接慢25倍?

为什么使用SQLAlchemy插入100,000行的简单testing用例比使用sqlite3驱动程序直接慢25倍呢? 我在实际应用中看到类似的减速。 我做错了什么?

#!/usr/bin/env python # Why is SQLAlchemy with SQLite so slow? # Output from this program: # SqlAlchemy: Total time for 100000 records 10.74 secs # sqlite3: Total time for 100000 records 0.40 secs import time import sqlite3 from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, create_engine from sqlalchemy.orm import scoped_session, sessionmaker Base = declarative_base() DBSession = scoped_session(sessionmaker()) class Customer(Base): __tablename__ = "customer" id = Column(Integer, primary_key=True) name = Column(String(255)) def init_sqlalchemy(dbname = 'sqlite:///sqlalchemy.db'): engine = create_engine(dbname, echo=False) DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False) Base.metadata.drop_all(engine) Base.metadata.create_all(engine) def test_sqlalchemy(n=100000): init_sqlalchemy() t0 = time.time() for i in range(n): customer = Customer() customer.name = 'NAME ' + str(i) DBSession.add(customer) DBSession.commit() print "SqlAlchemy: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs" def init_sqlite3(dbname): conn = sqlite3.connect(dbname) c = conn.cursor() c.execute("DROP TABLE IF EXISTS customer") c.execute("CREATE TABLE customer (id INTEGER NOT NULL, name VARCHAR(255), PRIMARY KEY(id))") conn.commit() return conn def test_sqlite3(n=100000, dbname = 'sqlite3.db'): conn = init_sqlite3(dbname) c = conn.cursor() t0 = time.time() for i in range(n): row = ('NAME ' + str(i),) c.execute("INSERT INTO customer (name) VALUES (?)", row) conn.commit() print "sqlite3: Total time for " + str(n) + " records " + str(time.time() - t0) + " sec" if __name__ == '__main__': test_sqlalchemy(100000) test_sqlite3(100000) 

我尝试了很多变化(请参阅http://pastebin.com/zCmzDraU )

同步对数据库的更改时,SQLAlchemy ORM使用工作单元模式。 这种模式远远超出了简单的“插入”数据。 它包括使用属性检测系统接收在对象上分配的属性,该属性检测系统在对象作出时跟踪对象上的更改,包括插入的所有行都在一个标识映射中进行跟踪,该标识映射对每行SQLAlchemy必须检索其“最后插入的ID“,如果还没有给出,还涉及要插入的行被扫描并根据需要进行sorting以获得依赖关系。 对象还需要有相当程度的簿记才能保持所有这些运行,对于大量的行来说,一次可以在大数据结构上花费过多的时间,因此最好将这些花费在大量的数据结构上。

基本上,工作单元是一个很大程度上的自动化,以便自动执行将复杂的对象图保存到没有明确的持久性代码的关系数据库中的任务,并且这种自动化是有代价的。

所以ORM基本上不适用于高性能批量插入。 这就是为什么SQLAlchemy有两个单独的库的原因,如果你看http://docs.sqlalchemy.org/en/latest/index.html ,你会注意到你会看到两个不同的一半到索引页 -一个用于ORM,一个用于Core。 如果不同时理解SQLAlchemy,则无法使用SQLAlchemy。

对于快速批量插入的用例,SQLAlchemy提供了核心 ,它是ORM构build在其上的SQL生成和执行系统。 有效地使用这个系统,我们可以产生一个与原始SQLite版本相竞争的INSERT。 下面的脚本举例说明了这一点,以及预先分配主键标识符的ORM版本,以便ORM可以使用executemany()插入行。 两个ORM版本同时对1000个logging进行刷新,这对性能有显着的影响。

这里观察的运行时间是:

 SqlAlchemy ORM: Total time for 100000 records 16.4133379459 secs SqlAlchemy ORM pk given: Total time for 100000 records 9.77570986748 secs SqlAlchemy Core: Total time for 100000 records 0.568737983704 secs sqlite3: Total time for 100000 records 0.595796823502 sec 

脚本:

 import time import sqlite3 from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, create_engine from sqlalchemy.orm import scoped_session, sessionmaker Base = declarative_base() DBSession = scoped_session(sessionmaker()) class Customer(Base): __tablename__ = "customer" id = Column(Integer, primary_key=True) name = Column(String(255)) def init_sqlalchemy(dbname = 'sqlite:///sqlalchemy.db'): global engine engine = create_engine(dbname, echo=False) DBSession.remove() DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False) Base.metadata.drop_all(engine) Base.metadata.create_all(engine) def test_sqlalchemy_orm(n=100000): init_sqlalchemy() t0 = time.time() for i in range(n): customer = Customer() customer.name = 'NAME ' + str(i) DBSession.add(customer) if i % 1000 == 0: DBSession.flush() DBSession.commit() print "SqlAlchemy ORM: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs" def test_sqlalchemy_orm_pk_given(n=100000): init_sqlalchemy() t0 = time.time() for i in range(n): customer = Customer(id=i+1, name="NAME " + str(i)) DBSession.add(customer) if i % 1000 == 0: DBSession.flush() DBSession.commit() print "SqlAlchemy ORM pk given: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs" def test_sqlalchemy_core(n=100000): init_sqlalchemy() t0 = time.time() engine.execute( Customer.__table__.insert(), [{"name":'NAME ' + str(i)} for i in range(n)] ) print "SqlAlchemy Core: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs" def init_sqlite3(dbname): conn = sqlite3.connect(dbname) c = conn.cursor() c.execute("DROP TABLE IF EXISTS customer") c.execute("CREATE TABLE customer (id INTEGER NOT NULL, name VARCHAR(255), PRIMARY KEY(id))") conn.commit() return conn def test_sqlite3(n=100000, dbname = 'sqlite3.db'): conn = init_sqlite3(dbname) c = conn.cursor() t0 = time.time() for i in range(n): row = ('NAME ' + str(i),) c.execute("INSERT INTO customer (name) VALUES (?)", row) conn.commit() print "sqlite3: Total time for " + str(n) + " records " + str(time.time() - t0) + " sec" if __name__ == '__main__': test_sqlalchemy_orm(100000) test_sqlalchemy_orm_pk_given(100000) test_sqlalchemy_core(100000) test_sqlite3(100000) 

另见: http : //docs.sqlalchemy.org/en/latest/faq/performance.html

来自@zzzeek的优秀答案。 对于那些想查询相同统计信息的人,我已经稍微修改了@zzzeek代码,在插入这些logging后马上查询这些logging,然后将这些logging转换为一系列的logging。

结果如下

 SqlAlchemy ORM: Total time for 100000 records 11.9210000038 secs SqlAlchemy ORM query: Total time for 100000 records 2.94099998474 secs SqlAlchemy ORM pk given: Total time for 100000 records 7.51800012589 secs SqlAlchemy ORM pk given query: Total time for 100000 records 3.07699990273 secs SqlAlchemy Core: Total time for 100000 records 0.431999921799 secs SqlAlchemy Core query: Total time for 100000 records 0.389000177383 secs sqlite3: Total time for 100000 records 0.459000110626 sec sqlite3 query: Total time for 100000 records 0.103999853134 secs 

有趣的是,使用裸sqlite3查询仍然比使用SQLAlchemy Core快3倍。 我想这是你有一个ResultProxy返回而不是一个裸的sqlite3行支付的代价。

SQLAlchemy Core比使用ORM快8倍。 所以,无论如何查询使用ORM都会慢很多。

这是我使用的代码:

 import time import sqlite3 from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, create_engine from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.sql import select Base = declarative_base() DBSession = scoped_session(sessionmaker()) class Customer(Base): __tablename__ = "customer" id = Column(Integer, primary_key=True) name = Column(String(255)) def init_sqlalchemy(dbname = 'sqlite:///sqlalchemy.db'): global engine engine = create_engine(dbname, echo=False) DBSession.remove() DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False) Base.metadata.drop_all(engine) Base.metadata.create_all(engine) def test_sqlalchemy_orm(n=100000): init_sqlalchemy() t0 = time.time() for i in range(n): customer = Customer() customer.name = 'NAME ' + str(i) DBSession.add(customer) if i % 1000 == 0: DBSession.flush() DBSession.commit() print "SqlAlchemy ORM: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs" t0 = time.time() q = DBSession.query(Customer) dict = [{'id':r.id, 'name':r.name} for r in q] print "SqlAlchemy ORM query: Total time for " + str(len(dict)) + " records " + str(time.time() - t0) + " secs" def test_sqlalchemy_orm_pk_given(n=100000): init_sqlalchemy() t0 = time.time() for i in range(n): customer = Customer(id=i+1, name="NAME " + str(i)) DBSession.add(customer) if i % 1000 == 0: DBSession.flush() DBSession.commit() print "SqlAlchemy ORM pk given: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs" t0 = time.time() q = DBSession.query(Customer) dict = [{'id':r.id, 'name':r.name} for r in q] print "SqlAlchemy ORM pk given query: Total time for " + str(len(dict)) + " records " + str(time.time() - t0) + " secs" def test_sqlalchemy_core(n=100000): init_sqlalchemy() t0 = time.time() engine.execute( Customer.__table__.insert(), [{"name":'NAME ' + str(i)} for i in range(n)] ) print "SqlAlchemy Core: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs" conn = engine.connect() t0 = time.time() sql = select([Customer.__table__]) q = conn.execute(sql) dict = [{'id':r[0], 'name':r[0]} for r in q] print "SqlAlchemy Core query: Total time for " + str(len(dict)) + " records " + str(time.time() - t0) + " secs" def init_sqlite3(dbname): conn = sqlite3.connect(dbname) c = conn.cursor() c.execute("DROP TABLE IF EXISTS customer") c.execute("CREATE TABLE customer (id INTEGER NOT NULL, name VARCHAR(255), PRIMARY KEY(id))") conn.commit() return conn def test_sqlite3(n=100000, dbname = 'sqlite3.db'): conn = init_sqlite3(dbname) c = conn.cursor() t0 = time.time() for i in range(n): row = ('NAME ' + str(i),) c.execute("INSERT INTO customer (name) VALUES (?)", row) conn.commit() print "sqlite3: Total time for " + str(n) + " records " + str(time.time() - t0) + " sec" t0 = time.time() q = conn.execute("SELECT * FROM customer").fetchall() dict = [{'id':r[0], 'name':r[0]} for r in q] print "sqlite3 query: Total time for " + str(len(dict)) + " records " + str(time.time() - t0) + " secs" if __name__ == '__main__': test_sqlalchemy_orm(100000) test_sqlalchemy_orm_pk_given(100000) test_sqlalchemy_core(100000) test_sqlite3(100000) 

我也testing没有将查询结果转换为字典和统计类似:

 SqlAlchemy ORM: Total time for 100000 records 11.9189999104 secs SqlAlchemy ORM query: Total time for 100000 records 2.78500008583 secs SqlAlchemy ORM pk given: Total time for 100000 records 7.67199993134 secs SqlAlchemy ORM pk given query: Total time for 100000 records 2.94000005722 secs SqlAlchemy Core: Total time for 100000 records 0.43700003624 secs SqlAlchemy Core query: Total time for 100000 records 0.131000041962 secs sqlite3: Total time for 100000 records 0.500999927521 sec sqlite3 query: Total time for 100000 records 0.0859999656677 secs 

使用SQLAlchemy Core查询比ORM快20倍。

重要的是要注意,这些testing是非常肤浅的,不应该太过重视。 我可能会错过一些可以完全改变统计数据的明显技巧。

衡量性能改进的最佳方法是直接在您自己的应用程序中。 不要把我的数据看成是理所当然的。

我会尝试插入expression式testing,然后基准。

由于OR映射器的开销,它可能仍然会变慢,但我希望不会太慢。

你介意尝试和发布结果吗? 这是非常有趣的东西。