如何用SqlAlchemy做一个upsert?
我有一个logging,如果它不存在,如果它已经存在(主键存在),我想要存在数据库中的字段被更新为当前状态。 这通常被称为upsert 。
下面的不完整的代码片段演示了什么可以工作,但似乎过于笨重(特别是如果有更多的列)。 什么是更好/最好的方式?
Base = declarative_base() class Template(Base): __tablename__ = 'templates' id = Column(Integer, primary_key = True) name = Column(String(80), unique = True, index = True) template = Column(String(80), unique = True) description = Column(String(200)) def __init__(self, Name, Template, Desc): self.name = Name self.template = Template self.description = Desc def UpsertDefaultTemplate(): sess = Session() desired_default = Template("default", "AABBCC", "This is the default template") try: q = sess.query(Template).filter_by(name = desiredDefault.name) existing_default = q.one() except sqlalchemy.orm.exc.NoResultFound: #default does not exist yet, so add it... sess.add(desired_default) else: #default already exists. Make sure the values are what we want... assert isinstance(existing_default, Template) existing_default.name = desired_default.name existing_default.template = desired_default.template existing_default.description = desired_default.description sess.flush()
有没有更好或更less的详细的做法呢? 像这样的东西会很棒:
sess.upsert_this(desired_default, unique_key = "name")
虽然unique_key
kwarg显然是不必要的(ORM应该能够很容易地知道这一点),但是我添加它只是因为SQLAlchemy只能使用主键。 例如:我一直在研究Session.merge是否适用,但是这只适用于主键,在这种情况下,这是一个自动增量的id,这对于这个目的来说并不是非常有用。
一个示例用例就是在启动一个可能已经升级了默认预期数据的服务器应用程序时。 即:没有并发关心这个upsert。
SQLAlchemy确实有一个“保存或更新”行为,在最近的版本中已经内置到session.add
,但以前是独立的session.saveorupdate
调用。 这不是一个“upsert”,但它可能足够满足您的需求。
你在问一个有多个独特密钥的课程是好事, 我相信这正是没有单一的正确方法来做到这一点的原因。 主键也是唯一的键。 如果没有唯一的约束,只有主键,这将是一个足够简单的问题:如果没有给定的ID存在,或者如果ID是无,创build一个新的logging; 否则使用该主键更新现有logging中的所有其他字段。
但是,当有其他独特的约束条件时,这种简单的方法就存在一些合理的问题。 如果你想“插入”一个对象,并且你的对象的主键匹配一个现有的logging,但是另一个唯一的列匹配不同的logging,那么你会怎么做? 同样,如果主键不匹配现有logging,但另一个唯一列匹配现有logging,那又如何? 对于你的具体情况可能有一个正确的答案,但总的来说,我认为没有一个正确的答案。
这就是没有内置“upsert”操作的原因。 应用程序必须定义在每个特定情况下这意味着什么。
SQLAlchemy现在使用两个方法on_conflict_do_update()
和on_conflict_do_nothing()
支持ON CONFLICT
:
从文档复制:
from sqlalchemy.dialects.postgresql import insert stmt = insert(my_table).values(user_email='a@b.com', data='inserted data') stmt = stmt.on_conflict_do_update( index_elements=[my_table.c.user_email], index_where=my_table.c.user_email.like('%@gmail.com'), set_=dict(data=stmt.excluded.data) ) conn.execute(stmt)
我用一个“看你跳跃”的方法:
# first get the object from the database if it exists # we're guaranteed to only get one or zero results # because we're filtering by primary key switch_command = session.query(Switch_Command).\ filter(Switch_Command.switch_id == switch.id).\ filter(Switch_Command.command_id == command.id).first() # If we didn't get anything, make one if not switch_command: switch_command = Switch_Command(switch_id=switch.id, command_id=command.id) # update the stuff we care about switch_command.output = 'Hooray!' switch_command.lastseen = datetime.datetime.utcnow() session.add(switch_command) # This will generate either an INSERT or UPDATE # depending on whether we have a new object or not session.commit()
好处是这是数据库中性,我认为这是清楚的阅读。 缺点是在以下情况下有潜在的竞争条件:
- 我们查询数据库的
switch_command
并没有find一个 - 我们创build一个
switch_command
- 另一个进程或线程使用与我们相同的主键创build
switch_command
- 我们尝试提交我们的
switch_command