psycopg2:用一个查询插入多行
我需要插入多行与一个查询(行数不是常量),所以我需要执行这样的查询:
INSERT INTO t (a, b) VALUES (1, 2), (3, 4), (5, 6);
我知道的唯一方法是
args = [(1,2), (3,4), (5,6)] args_str = ','.join(cursor.mogrify("%s", (x, )) for x in args) cursor.execute("INSERT INTO t (a, b) VALUES "+args_str)
但我想要一些更简单的方法。
我build立了一个程序,将多行插入位于另一个城市的服务器。
我发现使用这种方法比executemany
快10倍。 在我的情况下,tup是一个包含大约2000行的元组。 使用这种方法花了大约10秒钟:
args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup) cur.execute("INSERT INTO table VALUES " + args_str)
和2分钟时使用这种方法:
cur.executemany("INSERT INTO table VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s)", tup)
Psycopg 2.7中的新的execute_values
方法 :
data = [(1,'x'), (2,'y')] insert_query = 'insert into t (a, b) values %s' psycopg2.extras.execute_values ( cursor, insert_query, data, template=None, page_size=100 )
在Psycopg 2.6中做pythonic的方式:
data = [(1,'x'), (2,'y')] records_list_template = ','.join(['%s'] * len(data)) insert_query = 'insert into t (a, b) values {}'.format(records_list_template) cursor.execute(insert_query, data)
说明:如果要插入的数据是以in的列表的forms给出的
data = [(1,'x'), (2,'y')]
那么它已经和所需的格式一样
-
insert
子句的values
语法需要一个logging列表insert into t (a, b) values (1, 'x'),(2, 'y')
-
Psycopg
将Pythontuple
调整为Postgresqlrecord
。
唯一必要的工作是提供一个由psycopg填写的logging列表模板
# We use the data list to be sure of the template length records_list_template = ','.join(['%s'] * len(data))
并将其放在insert
查询中
insert_query = 'insert into t (a, b) values {}'.format(records_list_template)
打印insert_query
输出
insert into t (a, b) values %s,%s
现在以通常的Psycopg
参数替代
cursor.execute(insert_query, data)
或者只是testing将发送到服务器
print (cursor.mogrify(insert_query, data).decode('utf8'))
输出:
insert into t (a, b) values (1, 'x'),(2, 'y')
来自Psycopg2的Postgresql.org教程页面(参见下文) :
我想告诉你的最后一项是如何使用字典插入多行。 如果您有以下情况:
namedict = ({"first_name":"Joshua", "last_name":"Drake"}, {"first_name":"Steven", "last_name":"Foo"}, {"first_name":"David", "last_name":"Bar"})
您可以使用以下方法轻松地在字典中插入所有三行:
cur = conn.cursor() cur.executemany("""INSERT INTO bar(first_name,last_name) VALUES (%(first_name)s, %(last_name)s)""", namedict)
它不会节省太多的代码,但它确实看起来更好。
cursor.copy_from是迄今为止我发现的用于批量插入的最快解决scheme。 这里有一个我做的包含一个名为IteratorFile的类,它允许一个产生string的迭代器像文件一样被读取。 我们可以使用生成器expression式将每个inputlogging转换为一个string。 所以解决的办法是
args = [(1,2), (3,4), (5,6)] f = IteratorFile(("{}\t{}".format(x[0], x[1]) for x in args)) cursor.copy_from(f, 'table_name', columns=('a', 'b'))
对于这个小小的参数,它不会带来很大的速度差异,但是在处理数以千计的行时,我看到了很大的加速。 它也将比build立一个巨大的查询string更有效率。 迭代器一次只能在内存中保存一个inputlogging,在某些时候,通过构build查询string,您将在Python进程或Postgres中耗尽内存。
[用psycopg2 2.7更新]
经典的executemany()
比@ ant32的实现(称为“折叠”)要慢60倍左右,正如这个线程所解释的: https ://www.postgresql.org/message-id/20170130215151.GA7081%40deb76.aryehleib 。 COM
这个实现在2.7版本中被添加到了psycopg2中,被称为execute_values()
:
from psycopg2.extras import execute_values execute_values(cur, "INSERT INTO test (id, v1, v2) VALUES %s", [(1, 2, 3), (4, 5, 6), (7, 8, 9)])
[上一个回答]
要插入多行,使用execute()
executemany()
VALUES
语法比使用psycopg2 executemany()
要快10倍。 事实上, executemany()
只是运行许多单独的INSERT
语句。
在Python 3中,@ ant32的代码完美地工作。但是在Python 3中, cursor.mogrify()
返回字节, cursor.execute()
接受字节或string,而','.join()
需要str
实例。
因此,在Python 3中,您可能需要通过添加.decode('utf-8')
来修改@ ant32的代码:
args_str = ','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x).decode('utf-8') for x in tup) cur.execute("INSERT INTO table VALUES " + args_str)
或者仅使用字节(使用b''
或b""
):
args_bytes = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup) cur.execute(b"INSERT INTO table VALUES " + args_bytes)
所有这些技术在Postgres术语中都被称为“扩展插入”,截至2016年11月24日,它的速度仍然比psychopg2的executemany()以及本主题中列出的所有其他方法(我在尝试使用之前回答)。
这里有一些代码,不使用cur.mogrify,很好,只是为了让你的头:
valueSQL = [ '%s', '%s', '%s', ... ] # as many as you have columns. sqlrows = [] rowsPerInsert = 3 # more means faster, but with diminishing returns.. for row in getSomeData: # row == [1, 'a', 'yolo', ... ] sqlrows += row if ( len(sqlrows)/len(valueSQL) ) % rowsPerInsert == 0: # sqlrows == [ 1, 'a', 'yolo', 2, 'b', 'swag', 3, 'c', 'selfie' ] insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*rowsPerInsert) cur.execute(insertSQL, sqlrows) con.commit() sqlrows = [] insertSQL = 'INSERT INTO "twitter" VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*len(sqlrows)) cur.execute(insertSQL, sqlrows) con.commit()
但是应该注意的是,如果你可以使用copy_from(),你应该使用copy_from;)
如果您正在使用SQLAlchemy,则不需要手工处理string,因为SQLAlchemy 支持为单个INSERT
语句生成多行VALUES
子句 :
rows = [] for i, name in enumerate(rawdata): row = { 'id': i, 'name': name, 'valid': True, } rows.append(row) if len(rows) > 0: # INSERT fails if no rows insert_query = SQLAlchemyModelName.__table__.insert().values(rows) session.execute(insert_query)
上面已经使用了ant32的答案好几年了。 不过,我发现这是thorws python 3中的错误,因为mogrify
返回一个字节string。
显式转换为bytsestring是一个简单的解决scheme,使代码python 3兼容。
args_str = b','.join(cur.mogrify("(%s,%s,%s,%s,%s,%s,%s,%s,%s)", x) for x in tup) cur.execute(b"INSERT INTO table VALUES " + args_str)
另一个不错的和有效的方法是传递行作为1参数,这是json对象的数组。
例如你传递参数:
[ {id: 18, score: 1}, { id: 19, score: 5} ]
它是一个数组,里面可能包含任何数量的对象。 然后你的SQL看起来像:
INSERT INTO links (parent_id, child_id, score) SELECT 123, (r->>'id')::int, (r->>'score')::int FROM unnest($1::json[]) as r
注意:你的postgress必须足够新,才能支持json
该解决scheme基于JJ解决scheme,但由于遇到问题而具有不同的IF / Else结构。
def insert_Entries(EntriesWishList): conn = None try: # connect to the PostgreSQL database con = psycopg2.connect(dbname='myDBName', user='postgres', host='localhost', password='myPW') # create a new cursor cur = con.cursor() valueSQL = [ '%s','%s', '%s', '%s', '%s', '%s', '%s' ] # as many as you have columns. sqlrows = [] rowsPerInsert = 3 # more means faster, but with diminishing returns.. units = len(EntriesWishList) print(units) for unit in range(0,units): sqlrows += EntriesWishList[unit] insertSQL ='' if(( len(sqlrows)/len(valueSQL) ) % rowsPerInsert == 0): insertSQL = 'INSERT INTO DATABASE VALUES ' + ','.join(['(' + ','.join(valueSQL) + ')']*rowsPerInsert) cur.execute(insertSQL, sqlrows) con.commit() elif( (units-unit) <= rowsPerInsert): rowsPerInsert = 1 unit = unit-( len(sqlrows)/len(valueSQL) ) else: continue sqlrows = [] cur.close() except (Exception, psycopg2.DatabaseError) as error: print(error) finally: if conn is not None: conn.close()
如果你想在一个插入语句中插入多行(假设你没有使用ORM),那么最简单的方法就是使用词典列表。 这里是一个例子:
t = [{'id':1, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 6}, {'id':2, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 7}, {'id':3, 'start_date': '2015-07-19 00:00:00', 'end_date': '2015-07-20 00:00:00', 'campaignid': 8}] conn.execute("insert into campaign_dates (id, start_date, end_date, campaignid) values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s);", t)
正如你所看到的,只有一个查询将被执行:
INFO sqlalchemy.engine.base.Engine insert into campaign_dates (id, start_date, end_date, campaignid) values (%(id)s, %(start_date)s, %(end_date)s, %(campaignid)s); INFO sqlalchemy.engine.base.Engine [{'campaignid': 6, 'id': 1, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 7, 'id': 2, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}, {'campaignid': 8, 'id': 3, 'end_date': '2015-07-20 00:00:00', 'start_date': '2015-07-19 00:00:00'}] INFO sqlalchemy.engine.base.Engine COMMIT
使用aiopg – 下面的代码片段工作得很好
# items = [10, 11, 12, 13] # group = 1 tup = [(gid, pid) for pid in items] args_str = ",".join([str(s) for s in tup]) # insert into group values (1, 10), (1, 11), (1, 12), (1, 13) yield from cur.execute("INSERT INTO group VALUES " + args_str)