如何在PostgreSQL中UPSERT(MERGE,INSERT … ON DUPLICATE UPDATE)?

这里一个非常常见的问题是如何做一个upsert,这就是MySQL调用INSERT ... ON DUPLICATE UPDATE和标准支持作为MERGE操作的一部分。

鉴于PostgreSQL不直接支持(在第9.5页之前),你如何做到这一点? 考虑以下几点:

 CREATE TABLE testtable ( id integer PRIMARY KEY, somedata text NOT NULL ); INSERT INTO testtable (id, somedata) VALUES (1, 'fred'), (2, 'bob'); 

现在设想你想要“插入”元组(2, 'Joe')(3, 'Alan') ,所以新的表格内容是:

 (1, 'fred'), (2, 'Joe'), -- Changed value of existing tuple (3, 'Alan') -- Added new tuple 

这就是人们在讨论一个upsert时候正在讨论的问题。 至关重要的是,任何方法在同一个表上进行多个事务的情况下都必须是安全的 – 无论是通过使用显式locking还是防御由此产生的竞争条件。

这个话题在Insert中进行了广泛的讨论,PostgreSQL中的重复更新? ,但这是关于MySQL语法的替代方法,而且随着时间的推移,它会产生一些无关细节。 我正在做出明确的答案。

这些技术对于“插入如果不存在,否则什么都不做”也是有用的,即“插入…在重复键忽略”。

9.5和更新:

PostgreSQL 9.5和更新的支持INSERT ... ON CONFLICT UPDATE (和ON CONFLICT DO NOTHING ),即upsert。

ON DUPLICATE KEY UPDATE比较 。

快速解释 。

有关用法,请参阅手册 – 特别是语法图中的conflict_action子句和说明文本 。

与以下给出的9.4和更早版本的解决scheme不同,此function可以处理多个冲突的行,并且不需要独占locking或重试循环。

添加这个特性的提交就在这里 , 关于它的发展的讨论就在这里 。


如果你在9.5,不需要向下兼容,你现在可以停止阅读


9.4岁以上:

PostgreSQL没有任何内置的UPSERT (或MERGE )工具,并且在并发使用时高效地执行它是非常困难的。

本文讨论有用的细节问题 。

一般来说,您必须在两个选项中进行select

  • 在重试循环中单独插入/更新操作; 要么
  • locking表格并进行批量合并

单独的行重试循环

如果您想要许多连接同时尝试执行插入操作,则在重试循环中使用单独的行注释是合理的select。

PostgreSQL文档包含了一个有用的过程,可以让你在数据库的循环中完成这个过程 。 它防止丢失的更新和插入比赛,不像大多数天真的解决scheme。 它只能在READ COMMITTED模式下工作,并且只有在事务中唯一的事情是安全的。 如果触发器或辅助唯一键导致唯一的违规,该function将无法正常工作。

这个策略是非常低效的。 每当实际时,你应该排队工作,并做一个批量upsert,如下所述。

许多尝试解决此问题的方法都不考虑回滚,所以导致更新不完整。 两笔交易相互竞争; 其中一人成功INSERT s; 另一个得到一个重复的键错误,并做一个UPDATEUPDATE块等待INSERT回滚或提交。 当它回滚时, UPDATE条件重新检查匹配零行,所以即使UPDATE提交它实际上没有做你想要的upsert。 您必须检查结果行数,并在必要时重新尝试。

一些尝试解决scheme也不考虑SELECT比赛。 如果你尝试一下显而易见的简单:

 -- THIS IS WRONG. DO NOT COPY IT. It's an EXAMPLE. BEGIN; UPDATE testtable SET somedata = 'blah' WHERE id = 2; -- Remember, this is WRONG. Do NOT COPY IT. INSERT INTO testtable (id, somedata) SELECT 2, 'blah' WHERE NOT EXISTS (SELECT 1 FROM testtable WHERE testtable.id = 2); COMMIT; 

那么当两次运行时有几种失效模式。 一个是已经讨论过的更新重新检查问题。 另一个是在同时UPDATE地方,匹配零行和继续。 然后他们都进行EXISTStesting,这 INSERT 之前发生。 两者都得到零行,所以都做INSERT 。 一个失败,重复键错误。

这就是为什么你需要重新尝试循环。 你可能会认为你可以通过聪明的SQL来防止重复的键错误或丢失的更新,但是你不能。 您需要检查行数或处理重复键错误(取决于所选方法)并重试。

请不要为此推出自己的解决scheme。 像消息排队一样,这可能是错误的。

大量upsert与锁

有时你想做一个批量upsert,在那里你有一个新的数据集,你想合并到一个旧的现有数据集。 这比单独的排列插入更有效率,并且在实际中应该是优选的。

在这种情况下,您通常会遵循以下过程:

  • CREATE一个TEMPORARY

  • 将新数据COPY或批量插入临时表

  • LOCK目标表IN EXCLUSIVE MODE 。 这允许其他事务SELECT ,但不对表进行任何更改。

  • 使用临时表中的值对现有logging执行UPDATE ... FROM ;

  • 对目标表中不存在的行进行INSERT ;

  • COMMIT ,释放locking。

例如,对于问题中给出的示例,使用多值INSERT来填充临时表:

 BEGIN; CREATE TEMPORARY TABLE newvals(id integer, somedata text); INSERT INTO newvals(id, somedata) VALUES (2, 'Joe'), (3, 'Alan'); LOCK TABLE testtable IN EXCLUSIVE MODE; UPDATE testtable SET somedata = newvals.somedata FROM newvals WHERE newvals.id = testtable.id; INSERT INTO testtable SELECT newvals.id, newvals.somedata FROM newvals LEFT OUTER JOIN testtable ON (testtable.id = newvals.id) WHERE testtable.id IS NULL; COMMIT; 

相关阅读

  • UPSERT wiki页面
  • Postgres中的UPSERTisms
  • 在PostgreSQL中插入重复更新?
  • http://petereisentraut.blogspot.com/2010/05/merge-syntax.html
  • 用事务处理Upsert
  • SELECT或INSERT在一个容易出现竞争条件的函数中?
  • PostgreSQL wiki上的SQL MERGE
  • 现在在Postgresql中实现UPSERT的最常用的方法是

MERGE呢?

SQL标准的MERGE实际上定义的并发语义很差,不适合首先不locking表。

这是一个非常有用的OLAP语句,用于数据合并,但对于并发安全的upsert而言,它实际上并不是一个有用的解决scheme。 对于使用其他DBMS来使用MERGE的人来说,有很多build议,但实际上是错误的。

其他数据库:

  • INSERT ... ON DUPLICATE KEY UPDATE在MySQL中的INSERT ... ON DUPLICATE KEY UPDATE
  • MERGE从MS SQL Server (但请参阅以上关于MERGE问题)
  • 来自Oracle的MERGE (但请参阅以上关于MERGE问题)

我正在尝试为PostgreSQL 9.5之前版本的单一插入问题提供另一个解决scheme。 这个想法只是简单地尝试执行插入,并且如果logging已经存在,则更新它:

 do $$ begin insert into testtable(id, somedata) values(2,'Joe'); exception when unique_violation then update testtable set somedata = 'Joe' where id = 2; end $$; 

请注意, 只有在表格行没有删除的情况下,才能应用此解决scheme。

我不知道这个解决scheme的效率,但在我看来,足够合理。

 WITH UPD AS (UPDATE TEST_TABLE SET SOME_DATA = 'Joe' WHERE ID = 2 RETURNING ID), INS AS (SELECT '2', 'Joe' WHERE NOT EXISTS (SELECT * FROM UPD)) INSERT INTO TEST_TABLE(ID, SOME_DATA) SELECT * FROM INS 

在Postgresql 9.3上testing

由于这个问题已经结束,我在这里发表你如何使用SQLAlchemy来做这件事。 通过recursion,它重试批量插入或更新以对抗竞态条件和validation错误。

首先是import

 import itertools as it from functools import partial from operator import itemgetter from sqlalchemy.exc import IntegrityError from app import session from models import Posts 

现在有一些帮手function

 def chunk(content, chunksize=None): """Groups data into chunks each with (at most) `chunksize` items. https://stackoverflow.com/a/22919323/408556 """ if chunksize: i = iter(content) generator = (list(it.islice(i, chunksize)) for _ in it.count()) else: generator = iter([content]) return it.takewhile(bool, generator) def gen_resources(records): """Yields a dictionary if the record's id already exists, a row object otherwise. """ ids = {item[0] for item in session.query(Posts.id)} for record in records: is_row = hasattr(record, 'to_dict') if is_row and record.id in ids: # It's a row but the id already exists, so we need to convert it # to a dict that updates the existing record. Since it is duplicate, # also yield True yield record.to_dict(), True elif is_row: # It's a row and the id doesn't exist, so no conversion needed. # Since it's not a duplicate, also yield False yield record, False elif record['id'] in ids: # It's a dict and the id already exists, so no conversion needed. # Since it is duplicate, also yield True yield record, True else: # It's a dict and the id doesn't exist, so we need to convert it. # Since it's not a duplicate, also yield False yield Posts(**record), False 

最后是upsert函数

 def upsert(data, chunksize=None): for records in chunk(data, chunksize): resources = gen_resources(records) sorted_resources = sorted(resources, key=itemgetter(1)) for dupe, group in it.groupby(sorted_resources, itemgetter(1)): items = [g[0] for g in group] if dupe: _upsert = partial(session.bulk_update_mappings, Posts) else: _upsert = session.add_all try: _upsert(items) session.commit() except IntegrityError: # A record was added or deleted after we checked, so retry # # modify accordingly by adding additional exceptions, eg, # except (IntegrityError, ValidationError, ValueError) db.session.rollback() upsert(items) except Exception as e: # Some other error occurred so reduce chunksize to isolate the # offending row(s) db.session.rollback() num_items = len(items) if num_items > 1: upsert(items, num_items // 2) else: print('Error adding record {}'.format(items[0])) 

这是你如何使用它

 >>> data = [ ... {'id': 1, 'text': 'updated post1'}, ... {'id': 5, 'text': 'updated post5'}, ... {'id': 1000, 'text': 'new post1000'}] ... >>> upsert(data) 

bulk_save_objects ,它的bulk_save_objects是它可以处理关系,错误检查等插入操作 (不像批量操作 )。

SQLAlchemy upsert for Postgres> = 9.5

由于上面的大post涵盖了Postgres版本的许多不同的SQL方法(不仅在问题中不是9.5),我想在SQLAlchemy中添加如何使用Postgres 9.5。 而不是实现自己的upsert,你也可以使用SQLAlchemy的函数(在SQLAlchemy 1.1中添加)。 就个人而言,如果可能,我会build议使用这些。 不仅是因为方便,还因为它让PostgreSQL处理可能发生的任何竞态条件。

从我昨天给出的另一个答案交叉发布( https://stackoverflow.com/a/44395983/2156909

SQLAlchemy现在使用两个方法on_conflict_do_update()on_conflict_do_nothing()支持ON CONFLICT

从文档复制:

 from sqlalchemy.dialects.postgresql import insert stmt = insert(my_table).values(user_email='a@b.com', data='inserted data') stmt = stmt.on_conflict_do_update( index_elements=[my_table.c.user_email], index_where=my_table.c.user_email.like('%@gmail.com'), set_=dict(data=stmt.excluded.data) ) conn.execute(stmt) 

http://docs.sqlalchemy.org/en/latest/dialects/postgresql.html?highlight=conflict#insert-on-conflict-upsert