大佬教程收集整理的这篇文章主要介绍了有没有办法使用 FORALL 从数组中插入数据?,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
我正在运行 Oracle 19c,我想获得最佳的插入性能。目前,我使用 INSERT /*+APPEND */ ...
插入,这很好,但不是我想要的速度。
我读到使用 FORALL
快得多,但我真的找不到任何例子。
这是代码片段(python 3):
connection = pool.acquire()
cursor = connection.cursor()
cursor.executemany("INSERT /*+APPEND*/ INTO RANDOM VALUES (:1,:2,:3)",List(random))
connection.commit()
cursor.close()
connection.close()
我真的对什么会更快很感兴趣,所以我测试了一些可能的方法来比较它们:
executemany
,没有技巧。APPEND_VALUES
提示相同。union all
您在另一个问题中尝试过的方法。这应该比上面慢,因为它生成一个非常大的语句(这可能需要比数据本身更多的网络)。然后应该在 DB 端解析它,这也会消耗大量时间并忽略所有好处(不谈论潜在的大小限制)。然后我已经 executemany
用块来测试它,而不是为 100k 记录构建单个语句。我没有在语句中使用值的串联,因为想保证它的安全。insert all
。同样的缺点,但没有工会。将其与 union
版本进行比较。json_table
在 DB 端进行反序列化。单个短语句和单个数据传输具有潜在的良好性能,而 JSON 开销很小。FORALL
。应该与 executemany
相同,因为相同,但在数据库端。将数据转换为集合的开销。FORALL
,但使用列式方法来传递数据:传递列值的简单列表而不是复杂类型。应该比带有集合的 FORALL
快得多,因为不需要将数据序列化为集合的类型。我使用免费帐户在 Oracle 云中使用了 Oracle 自治数据库。每个方法循环执行 10 次,输入数据集相同,包含 100k 条记录,每次测试前重新创建表。这是我得到的结果。这里的准备时间和执行时间分别是客户端DB调用本身的数据转换。
>>> t = PerfTest(100000)
>>> t.run("exec_many",10)
Method: exec_many.
Duration,avg: 2.3083874 s
Preparation time,avg: 0.0 s
Execution time,avg: 2.3083874 s
>>> t.run("exec_many_append",10)
Method: exec_many_append.
Duration,avg: 2.6031369 s
Preparation time,avg: 2.6031369 s
>>> t.run("union_all",10,10000)
Method: union_all.
Duration,avg: 27.9444233 s
Preparation time,avg: 0.0408773 s
Execution time,avg: 27.8457551 s
>>> t.run("insert_all",10000)
Method: insert_all.
Duration,avg: 70.6442494 s
Preparation time,avg: 0.0289269 s
Execution time,avg: 70.5541995 s
>>> t.run("json_table",10)
Method: json_table.
Duration,avg: 10.4648237 s
Preparation time,avg: 9.7907693 s
Execution time,avg: 0.621006 s
>>> t.run("forall",10)
Method: forall.
Duration,avg: 5.5622837 s
Preparation time,avg: 1.8972456000000002 s
Execution time,avg: 3.6650380999999994 s
>>> t.run("forall_columnar",10)
Method: forall_columnar.
Duration,avg: 2.6702698000000002 s
Preparation time,avg: 0.055710800000000005 s
Execution time,avg: 2.6105702 s
>>>
最快的方式就是executemany
,没什么好惊讶的。有趣的是,APPEND_VALUES
并没有改进查询并且平均获得更多时间,因此需要更多调查。
关于FORALL
:正如预期的那样,每列的单个数组花费的时间更少,因为没有为其准备数据。它或多或少与 executemany
相当,但我认为 pl/sqL 开销在这里起到了一定的作用。
对我来说另一个有趣的部分是 JSON:大部分时间都花在将 LOB 写入数据库和序列化上,但查询本身非常快。也许可以使用 chuncsize 以某种方式改进写入操作,或者可以通过其他方式将 LOB 数据传递到 SELEct 语句中,但是就我的代码而言,使用 executemany
远非非常简单和直接的方法。
也有可能在没有 Python 的情况下应该作为外部数据的本地工具更快,但我没有测试它们:
以下是我用于测试的代码。
import cx_Oracle as db
import os,random,json
import datetiR_721_11845@e as dt
class PerfTest:
def __init__(self,sizE):
self._con = db.connect(
os.environ["ora_cloud_usr"],os.environ["ora_cloud_pwd"],"test_low",encoding="UTF-8"
)
self._cur = self._con.cursor()
self.inp = [(i,"Test {i}".format(i=i),random.random()) for i in range(sizE)]
def __del__(self):
if self._con:
self._con.rollBACk()
self._con.close()
#Create objets
def setup(self):
try:
self._cur.execute("drop table rand")
#print("table dropped")
except:
pass
self._cur.execute("""create table rand(
id int,str varchar2(100),val number
)""")
self._cur.execute("""create or replace package pkg_test as
type ts_test is record (
id rand.id%type,str rand.str%type,val rand.val%type
);
type tt_test is table of ts_test index by pls_Integer;
type tt_ids is table of rand.id%type index by pls_Integer;
type tt_strs is table of rand.str%type index by pls_Integer;
type tt_vals is table of rand.val%type index by pls_Integer;
procedure write_data(p_data in tt_test);
procedure write_data_columnar(
p_ids in tt_ids,p_strs in tt_strs,p_vals in tt_vals
);
end;""")
self._cur.execute("""create or replace package body pkg_test as
procedure write_data(p_data in tt_test)
as
begin
forall i in inDices of p_data
insert into rand(id,str,val)
values (p_data(i).id,p_data(i).str,p_data(i).val)
;
commit;
end;
procedure write_data_columnar(
p_ids in tt_ids,p_vals in tt_vals
) as
begin
forall i in inDices of p_ids
insert into rand(id,val)
values (p_ids(i),p_strs(i),p_vals(i))
;
commit;
end;
end;
""")
def build_union(self,sizE):
return """insert into rand(id,val)
SELEct id,val from rand where 1 = 0 union all
""" + """ union all """.join(
["SELEct :{},:{},:{} from dual".format(i*3+1,i*3+2,i*3+3)
for i in range(sizE)]
)
def build_insert_all(self,sizE):
return """
""".join(
["into rand(id,val) values (:{},:{})".format(i*3+1,i*3+3)
for i in range(sizE)]
)
#Test case with executemany
def exec_many(self):
start = dt.datetiR_721_11845@e.now()
self._cur.executemany("insert into rand(id,val) values (:1,:2,:3)",self.inp)
self._con.commit()
return (dt.timedelta(0),dt.datetiR_721_11845@e.now() - start)
#The same as above but with prepared statement (no parsing)
def exec_many_append(self):
start = dt.datetiR_721_11845@e.now()
self._cur.executemany("insert /*+APPEND_VALUES*/ into rand(id,dt.datetiR_721_11845@e.now() - start)
#union all approach (chunked). Should have large parse time
def union_all(self,sizE):
##Chunked list of big tuples
start_prepare = dt.datetiR_721_11845@e.now()
new_inp = [
tuple([item for t in r for item in t])
for r in list(zip(*[iter(self.inp)]*sizE))
]
new_stmt = self.build_union(sizE)
dur_prepare = dt.datetiR_721_11845@e.now() - start_prepare
#Execute unions
start_exec = dt.datetiR_721_11845@e.now()
self._cur.executemany(new_stmt,new_inp)
dur_exec = dt.datetiR_721_11845@e.now() - start_exec
##In case the size is not a divisor
remainder = len(self.inp) % size
if remainder > 0 :
start_prepare = dt.datetiR_721_11845@e.now()
new_stmt = self.build_union(remainder)
new_inp = tuple([
item for t in self.inp[-remainder:] for item in t
])
dur_prepare += dt.datetiR_721_11845@e.now() - start_prepare
start_exec = dt.datetiR_721_11845@e.now()
self._cur.execute(new_stmt,new_inp)
dur_exec += dt.datetiR_721_11845@e.now() - start_exec
self._con.commit()
return (dur_prepare,dur_exeC)
#The same as union all,but with no need to union something
def insert_all(self,sizE):
##Chunked list of big tuples
start_prepare = dt.datetiR_721_11845@e.now()
new_inp = [
tuple([item for t in r for item in t])
for r in list(zip(*[iter(self.inp)]*sizE))
]
new_stmt = """insert all
{}
SELEct * from dual"""
dur_prepare = dt.datetiR_721_11845@e.now() - start_prepare
#Execute
start_exec = dt.datetiR_721_11845@e.now()
self._cur.executemany(
new_stmt.format(self.build_insert_all(sizE)),new_inp
)
dur_exec = dt.datetiR_721_11845@e.now() - start_exec
##In case the size is not a divisor
remainder = len(self.inp) % size
if remainder > 0 :
start_prepare = dt.datetiR_721_11845@e.now()
new_inp = tuple([
item for t in self.inp[-remainder:] for item in t
])
dur_prepare += dt.datetiR_721_11845@e.now() - start_prepare
start_exec = dt.datetiR_721_11845@e.now()
self._cur.execute(
new_stmt.format(self.build_insert_all(remainder)),new_inp
)
dur_exec += dt.datetiR_721_11845@e.now() - start_exec
self._con.commit()
return (dur_prepare,dur_exeC)
#serialize at server side and do deserialization at DB side
def json_table(self):
start_prepare = dt.datetiR_721_11845@e.now()
new_inp = json.dumps([
{ "id":t[0],"str":t[1],"val":t[2]} for t in self.inp
])
lob_var = self._con.createlob(db.DB_TYPE_CLOB)
lob_var.write(new_inp)
start_exec = dt.datetiR_721_11845@e.now()
self._cur.execute("""
insert into rand(id,val
from json_table(
to_clob(:json),'$[*]'
columns
id int,val number
)
""",json=lob_var)
dur_exec = dt.datetiR_721_11845@e.now() - start_exec
self._con.commit()
return (start_exec - start_prepare,dur_exeC)
#pl/sqL with FORALL
def forall(self):
start_prepare = dt.datetiR_721_11845@e.now()
collection_type = self._con.gettype("PKG_TEST.TT_TEST")
record_type = self._con.gettype("PKG_TEST.TS_TEST")
def recBuilder(X):
rec = record_type.newobject()
rec.ID = x[0]
rec.STR = x[1]
rec.VAL = x[2]
return rec
inp_collection = collection_type.newobject([
recBuilder(i) for i in self.inp
])
start_exec = dt.datetiR_721_11845@e.now()
self._cur.callproc("pkg_test.write_data",[inp_collection])
dur_exec = dt.datetiR_721_11845@e.now() - start_exec
return (start_exec - start_prepare,dur_exeC)
#pl/sqL with FORALL and plain collections
def forall_columnar(self):
start_prepare = dt.datetiR_721_11845@e.now()
ids,strs,vals = map(list,zip(*self.inp))
start_exec = dt.datetiR_721_11845@e.now()
self._cur.callproc("pkg_test.write_data_columnar",[ids,vals])
dur_exec = dt.datetiR_721_11845@e.now() - start_exec
return (start_exec - start_prepare,dur_exeC)
#Run test
def run(self,method,iterations,*args):
#Cleanup scheR_721_11845@a
self.setup()
start = dt.datetiR_721_11845@e.now()
runtime = []
for i in range(iterations):
single_run = getattr(self,method)(*args)
runtime.append(single_run)
dur = dt.datetiR_721_11845@e.now() - start
dur_prep_@R_54_10586@l = sum([i.@R_54_10586@l_seconds() for i,_ in runtime])
dur_exec_@R_54_10586@l = sum([i.@R_54_10586@l_seconds() for _,i in runtime])
print("""Method: {meth}.
Duration,avg: {run_dur} s
Preparation time,avg: {prep} s
Execution time,avg: {ex} s""".format(
inp_s=len(self.inp),meth=method,run_dur=dur.@R_54_10586@l_seconds() / iterations,prep=dur_prep_@R_54_10586@l / iterations,ex=dur_exec_@R_54_10586@l / iterations
))
以上是大佬教程为你收集整理的有没有办法使用 FORALL 从数组中插入数据?全部内容,希望文章能够帮你解决有没有办法使用 FORALL 从数组中插入数据?所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。