使用SQLAlchemy ORM批量插入

有没有办法让SQLAlchemy做一个批量插入,而不是插入每个单独的对象。 即

这样做的:

INSERT INTO `foo` (`bar`) VALUES (1), (2), (3) 

而不是:

 INSERT INTO `foo` (`bar`) VALUES (1) INSERT INTO `foo` (`bar`) VALUES (2) INSERT INTO `foo` (`bar`) VALUES (3) 

我刚刚转换了一些代码来使用sqlalchemy,而不是原始的sql,虽然现在好多了,但现在看起来比较慢(高达10倍),我想知道这是否是原因。

也许我可以更有效地改善使用会话的情况。 目前我有autoCommit=False ,并添加了一些东西后,做一个session.commit() 。 虽然这似乎导致数据如果数据库在其他地方更改,如果即使我做了一个新的查询,我仍旧得到旧的结果?

谢谢你的帮助!

SQLAlchemy在1.0.0版中引入了:

批量操作 – SQLAlchemy文档

通过这些操作,您现在可以批量插入或更新!

例如,你可以做:

 s = Session() objects = [ User(name="u1"), User(name="u2"), User(name="u3") ] s.bulk_save_objects(objects) 

在这里,将会做一个批量插入。

据我所知,没有办法让ORM发出批量插入。 我相信其根本的原因是SQLAlchemy需要跟踪每个对象的身份(即新的主键),并且批量插入会干扰。 例如,假设你的foo表包含一个id列并映射到Foo类:

 x = Foo(bar=1) print x.id # None session.add(x) session.flush() # BEGIN # INSERT INTO foo (bar) VALUES(1) # COMMIT print x.id # 1 

由于SQLAlchemy在没有发出另一个查询的情况下为x.id了值, x.id我们可以推断它直接从INSERT语句中获得值。 如果您不需要通过相同的实例对创build的对象进行后续访问,则可以跳过插入的ORM层:

 Foo.__table__.insert().execute([{'bar': 1}, {'bar': 2}, {'bar': 3}]) # INSERT INTO foo (bar) VALUES ((1,), (2,), (3,)) 

SQLAlchemy无法将这些新行与任何现有对象进行匹配,因此您必须重新查询它们以进行后续操作。

就陈旧的数据而言,记住会话没有内置的方法可以知道数据库何时在会话之外被更改。 为了通过现有实例访问外部修改的数据,实例必须标记为过期 。 这在session.commit()默认会发生,但可以通过调用session.expire_all()session.expire(instance)手动完成。 一个例子(省略了SQL):

 x = Foo(bar=1) session.add(x) session.commit() print x.bar # 1 foo.update().execute(bar=42) print x.bar # 1 session.expire(x) print x.bar # 42 

session.commit()过期x ,因此第一个打印语句会隐式地打开一个新的事务并重新查询x的属性。 如果您注释掉第一个打印语句,则会注意到第二个打印语句现在选取正确的值,因为直到更新后才会发出新的查询。

从事务隔离的angular度来看,这是有道理的 – 你只能select事务之间的外部修改。 如果这给您造成麻烦,我build议您澄清或重新考虑您的应用程序的事务边界,而不是立即达到session.expire_all()

sqlalchemy文档对可用于批量插入的各种技术的性能有很好的评价:

ORM基本上不适用于高性能批量插入 – 这是SQLAlchemy除了将ORM作为第一级组件外还提供Core的全部原因。

对于快速批量插入的用例,ORM构build在其上的SQL生成和执行系统是Core的一部分。 直接使用这个系统,我们可以产生一个直接使用原始数据库API的INSERT。

或者,SQLAlchemy ORM提供批量操作的方法套件,这些方法提供了工作单元的子部分的钩子,以便通过小程度的基于ORM的自动化来发出核心级别的INSERT和UPDATE结构。

下面的例子说明了插入行的几种不同方法的基于时间的testing,从最自动化到最小化。 用cPython 2.7,运行时观察到:

 classics-MacBook-Pro:sqlalchemy classic$ python test.py SQLAlchemy ORM: Total time for 100000 records 12.0471920967 secs SQLAlchemy ORM pk given: Total time for 100000 records 7.06283402443 secs SQLAlchemy ORM bulk_save_objects(): Total time for 100000 records 0.856323003769 secs SQLAlchemy Core: Total time for 100000 records 0.485800027847 secs sqlite3: Total time for 100000 records 0.487842082977 sec 

脚本:

 import time import sqlite3 from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, create_engine from sqlalchemy.orm import scoped_session, sessionmaker Base = declarative_base() DBSession = scoped_session(sessionmaker()) engine = None class Customer(Base): __tablename__ = "customer" id = Column(Integer, primary_key=True) name = Column(String(255)) def init_sqlalchemy(dbname='sqlite:///sqlalchemy.db'): global engine engine = create_engine(dbname, echo=False) DBSession.remove() DBSession.configure(bind=engine, autoflush=False, expire_on_commit=False) Base.metadata.drop_all(engine) Base.metadata.create_all(engine) def test_sqlalchemy_orm(n=100000): init_sqlalchemy() t0 = time.time() for i in xrange(n): customer = Customer() customer.name = 'NAME ' + str(i) DBSession.add(customer) if i % 1000 == 0: DBSession.flush() DBSession.commit() print( "SQLAlchemy ORM: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def test_sqlalchemy_orm_pk_given(n=100000): init_sqlalchemy() t0 = time.time() for i in xrange(n): customer = Customer(id=i+1, name="NAME " + str(i)) DBSession.add(customer) if i % 1000 == 0: DBSession.flush() DBSession.commit() print( "SQLAlchemy ORM pk given: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def test_sqlalchemy_orm_bulk_insert(n=100000): init_sqlalchemy() t0 = time.time() n1 = n while n1 > 0: n1 = n1 - 10000 DBSession.bulk_insert_mappings( Customer, [ dict(name="NAME " + str(i)) for i in xrange(min(10000, n1)) ] ) DBSession.commit() print( "SQLAlchemy ORM bulk_save_objects(): Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def test_sqlalchemy_core(n=100000): init_sqlalchemy() t0 = time.time() engine.execute( Customer.__table__.insert(), [{"name": 'NAME ' + str(i)} for i in xrange(n)] ) print( "SQLAlchemy Core: Total time for " + str(n) + " records " + str(time.time() - t0) + " secs") def init_sqlite3(dbname): conn = sqlite3.connect(dbname) c = conn.cursor() c.execute("DROP TABLE IF EXISTS customer") c.execute( "CREATE TABLE customer (id INTEGER NOT NULL, " "name VARCHAR(255), PRIMARY KEY(id))") conn.commit() return conn def test_sqlite3(n=100000, dbname='sqlite3.db'): conn = init_sqlite3(dbname) c = conn.cursor() t0 = time.time() for i in xrange(n): row = ('NAME ' + str(i),) c.execute("INSERT INTO customer (name) VALUES (?)", row) conn.commit() print( "sqlite3: Total time for " + str(n) + " records " + str(time.time() - t0) + " sec") if __name__ == '__main__': test_sqlalchemy_orm(100000) test_sqlalchemy_orm_pk_given(100000) test_sqlalchemy_orm_bulk_insert(100000) test_sqlalchemy_core(100000) test_sqlite3(100000) 

我通常使用add_all来做到这add_all

 from app import session from models import User objects = [User(name="u1"), User(name="u2"), User(name="u3")] session.add_all(objects) session.commit() 

从版本0.8开始直接支持SQLAlchemy

根据文档 , connection.execute(table.insert().values(data))应该做的伎俩。 (请注意,这与connection.execute(table.insert(), data) 一样,通过调用executemany ,会导致许多单独的行插入executemany )。 除了本地连接以外,性能的差异可能是巨大的。

这是一个方法:

 values = [1, 2, 3] Foo.__table__.insert().execute([{'bar': x} for x in values]) 

这将插入像这样:

 INSERT INTO `foo` (`bar`) VALUES (1), (2), (3) 

参考:SQLAlchemy FAQ包含各种提交方法的基准。

SQLAlchemy在1.0.0版中引入了:

批量操作 – SQLAlchemy文档

通过这些操作,您现在可以批量插入或更新!

例如(如果你想简单的表INSERTs的开销最低),你可以使用Session.bulk_insert_mappings()

 loadme = [ (1, 'a') , (2, 'b') , (3, 'c') ] dicts = [] for i in range(len(loadme)): dicts.append(dict(bar=loadme[i][0], fly=loadme[i][1])) s = Session() s.bulk_insert_mappings(Foo, dicts) s.commit() 

或者,如果你愿意的话,可以跳过loadme元组,直接把dicts写成dicts (但是我觉得把所有的字都留在数据之外,并且在一个循环中加载一个字典列表是比较容易的)。

Piere的回答是正确的,但是一个问题是bulk_save_objects在默认情况下不会返回对象的主键,如果这是你所关心的。 将return_defaults设置为True以获得此行为。

文档在这里 。

 foos = [Foo(bar='a',), Foo(bar='b'), Foo(bar='c')] session.bulk_save_objects(foos, return_defaults=True) for foo in foos: assert foo.id is not None session.commit()