psycopg2:用一个查询插入多行

我需要插入多行与一个查询(行数不是常量),所以我需要执行这样的查询:

INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6); 

我知道的唯一方法是

 args = [(1,2), (3,4), (5,6)] args_str = ','.join(cursor.mogrify("%s", (x, )) for x in args) cursor.execute("INSERT INTO t (a, b) VALUES "+args_str) 

但我想要一些更简单的方法。

我build立了一个程序,将多行插入位于另一个城市的服务器。

我发现使用这种方法比executemany快10倍。 在我的情况下,tup是一个包含大约2000行的元组。 使用这种方法花了大约10秒钟:

 args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup) cur.execute("INSERT INTO table VALUES " + args_str) 

和2分钟时使用这种方法:

 cur.executemany("INSERT INTO table VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)", tup) 

Psycopg 2.7中的新的execute_values方法 :

 data = [(1,'x'), (2,'y')] insert_query = 'insert into t (a, b) values %s' psycopg2.extras.execute_values ( cursor, insert_query, data, template=None, page_size=100 ) 

在Psycopg 2.6中做pythonic的方式:

 data = [(1,'x'), (2,'y')] records_list_template = ','.join(['%s'] * len(data)) insert_query = 'insert into t (a, b) values {}'.format(records_list_template) cursor.execute(insert_query, data) 

说明:如果要插入的数据是以in的列表的forms给出的

 data = [(1,'x'), (2,'y')] 

那么它已经和所需的格式一样

  1. insert子句的values语法需要一个logging列表

    insert into t (a, b) values (1, 'x'),(2, 'y')

  2. Psycopg将Python tuple调整为Postgresql record

唯一必要的工作是提供一个由psycopg填写的logging列表模板

 # We use the data list to be sure of the template length records_list_template = ','.join(['%s'] * len(data)) 

并将其放在insert查询中

 insert_query = 'insert into t (a, b) values {}'.format(records_list_template) 

打印insert_query输出

 insert into t (a, b) values %s,%s 

现在以通常的Psycopg参数替代

 cursor.execute(insert_query, data) 

或者只是testing将发送到服务器

 print (cursor.mogrify(insert_query, data).decode('utf8')) 

输出:

 insert into t (a, b) values (1, 'x'),(2, 'y') 

来自Psycopg2的Postgresql.org教程页面(参见下文) :

我想告诉你的最后一项是如何使用字典插入多行。 如果您有以下情况:

 namedict = ({"first_name":"Joshua", "last_name":"Drake"}, {"first_name":"Steven", "last_name":"Foo"}, {"first_name":"David", "last_name":"Bar"}) 

您可以使用以下方法轻松地在字典中插入所有三行:

 cur = conn.cursor() cur.executemany("""INSERT INTO bar(first_name,last_name) VALUES (%(first_name)s, %(last_name)s)""", namedict) 

它不会节省太多的代码,但它确实看起来更好。

cursor.copy_from是迄今为止我发现的用于批量插入的最快解决scheme。 这里有一个我做的包含一个名为IteratorFile的类,它允许一个产生string的迭代器像文件一样被读取。 我们可以使用生成器expression式将每个inputlogging转换为一个string。 所以解决的办法是

 args = [(1,2), (3,4), (5,6)] f = IteratorFile(("{}\t{}".format(x[0], x[1]) for x in args)) cursor.copy_from(f, 'table_name', columns=('a', 'b')) 

对于这个小小的参数,它不会带来很大的速度差异,但是在处理数以千计的行时,我看到了很大的加速。 它也将比build立一个巨大的查询string更有效率。 迭代器一次只能在内存中保存一个inputlogging,在某些时候,通过构build查询string,您将在Python进程或Postgres中耗尽内存。

[用psycopg2 2.7更新]

经典的executemany()比@ ant32的实现(称为“折叠”)要慢60倍左右,正如这个线程所解释的: https ://www.postgresql.org/message-id/20170130215151.GA7081%40deb76.aryehleib 。 COM

这个实现在2.7版本中被添加到了psycopg2中,被称为execute_values()

 from psycopg2.extras import execute_values execute_values(cur, "INSERT INTO test (id, v1, v2) VALUES %s", [(1, 2, 3), (4, 5, 6), (7, 8, 9)]) 

[上一个回答]

要插入多行,使用execute() executemany() VALUES语法比使用psycopg2 executemany()要快10倍。 事实上, executemany()只是运行许多单独的INSERT语句。

在Python 3中,@ ant32的代码完美地工作。但是在Python 3中, cursor.mogrify()返回字节, cursor.execute()接受字节或string,而','.join()需要str实例。

因此,在Python 3中,您可能需要通过添加.decode('utf-8')来修改@ ant32的代码:

 args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x).decode('utf-8') for x in tup) cur.execute("INSERT INTO table VALUES " + args_str) 

或者仅使用字节(使用b''b"" ):

 args_bytes = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup) cur.execute(b"INSERT INTO table VALUES " + args_bytes) 

所有这些技术在Postgres术语中都被称为“扩展插入”,截至2016年11月24日,它的速度仍然比psychopg2的executemany()以及本主题中列出的所有其他方法(我在尝试使用之前回答)。

这里有一些代码,不使用cur.mogrify,很好,只是为了让你的头:

 valueSQL = [ '%s', '%s', '%s', ... ] # as many as you have columns. sqlrows = [] rowsPerInsert = 3 # more means faster, but with diminishing returns.. for row in getSomeData: # row == [1, 'a', 'yolo', ... ] sqlrows += row if ( len(sqlrows)/len(valueSQL) ) % rowsPerInsert == 0: # sqlrows == [ 1, 'a', 'yolo', 2, 'b', 'swag', 3, 'c', 'selfie' ] insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*rowsPerInsert) cur.execute(insertSQL, sqlrows) con.commit() sqlrows = [] insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*len(sqlrows)) cur.execute(insertSQL, sqlrows) con.commit() 

但是应该注意的是,如果你可以使用copy_from(),你应该使用copy_from;)

如果您正在使用SQLAlchemy,则不需要手工处理string,因为SQLAlchemy 支持为单个INSERT语句生成多行VALUES子句 :

 rows = [] for i, name in enumerate(rawdata): row = { 'id': i, 'name': name, 'valid': True, } rows.append(row) if len(rows) > 0: # INSERT fails if no rows insert_query = SQLAlchemyModelName.__table__.insert().values(rows) session.execute(insert_query) 

上面已经使用了ant32的答案好几年了。 不过,我发现这是thorws python 3中的错误,因为mogrify返回一个字节string。

显式转换为bytsestring是一个简单的解决scheme,使代码python 3兼容。

 args_str = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup) cur.execute(b"INSERT INTO table VALUES " + args_str) 

另一个不错的和有效的方法是传递行作为1参数,这是json对象的数组。

例如你传递参数:

 [ {id: 18, score: 1}, { id: 19, score: 5} ] 

它是一个数组,里面可能包含任何数量的对象。 然后你的SQL看起来像:

 INSERT INTO links (parent_id, child_id, score) SELECT 123, (r->>'id')::int, (r->>'score')::int FROM unnest($1::json[]) as r 

注意:你的postgress必须足够新,才能支持json

该解决scheme基于JJ解决scheme,但由于遇到问题而具有不同的IF / Else结构。

 def insert_Entries(EntriesWishList): conn = None try: # connect to the PostgreSQL database con = psycopg2.connect(dbname='myDBName', user='postgres', host='localhost', password='myPW') # create a new cursor cur = con.cursor() valueSQL = [ '%s','%s', '%s', '%s', '%s', '%s', '%s' ] # as many as you have columns. sqlrows = [] rowsPerInsert = 3 # more means faster, but with diminishing returns.. units = len(EntriesWishList) print(units) for unit in range(0,units): sqlrows += EntriesWishList[unit] insertSQL ='' if(( len(sqlrows)/len(valueSQL) ) % rowsPerInsert == 0): insertSQL = 'INSERT INTO DATABASE VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*rowsPerInsert) cur.execute(insertSQL, sqlrows) con.commit() elif( (units-unit) <= rowsPerInsert): rowsPerInsert = 1 unit = unit-( len(sqlrows)/len(valueSQL) ) else: continue sqlrows = [] cur.close() except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close() 

如果你想在一个插入语句中插入多行(假设你没有使用ORM),那么最简单的方法就是使用词典列表。 这里是一个例子:

  t = [{'id':1, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 6}, {'id':2, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 7}, {'id':3, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 8}] conn.execute("insert into campaign_dates (id, start_date, end_date, campaignid) values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);", t) 

正如你所看到的,只有一个查询将被执行:

 INFO sqlalchemy.engine.base.Engine insert into campaign_dates (id, start_date, end_date, campaignid) values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s); INFO sqlalchemy.engine.base.Engine [{'campaignid': 6, 'id': 1, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 7, 'id': 2, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 8, 'id': 3, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}] INFO sqlalchemy.engine.base.Engine COMMIT 

使用aiopg – 下面的代码片段工作得很好

  # items = [10, 11, 12, 13] # group = 1 tup = [(gid, pid) for pid in items] args_str = ",".join([str(s) for s in tup]) # insert into group values (1, 10), (1, 11), (1, 12), (1, 13) yield from cur.execute("INSERT INTO group VALUES " + args_str)