程序问答   发布时间:2022-06-01  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了有没有办法使用 FORALL 从数组中插入数据?大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

如何解决有没有办法使用 FORALL 从数组中插入数据??

开发过程中遇到有没有办法使用 FORALL 从数组中插入数据?的问题如何解决?下面主要结合日常开发的经验,给出你关于有没有办法使用 FORALL 从数组中插入数据?的解决方法建议,希望对你解决有没有办法使用 FORALL 从数组中插入数据?有所启发或帮助;

我正在运行 Oracle 19c,我想获得最佳的插入性能。目前,我使用 INSERT /*+APPEND */ ... 插入,这很好,但不是我想要的速度。
我读到使用 FORALL 快得多,但我真的找不到任何例子。 这是代码片段(python 3):

connection = pool.acquire()
cursor = connection.cursor()
cursor.executemany("INSERT /*+APPEND*/ INTO RANDOM VALUES (:1,:2,:3)",List(random))
connection.commit()
cursor.close()
connection.close()

解决方法

我真的对什么会更快很感兴趣,所以我测试了一些可能的方法来比较它们:

  • 简单的executemany,没有技巧。
  • 与语句中的 APPEND_VALUES 提示相同。
  • union all 您在另一个问题中尝试过的方法。这应该比上面慢,因为它生成一个非常大的语句(这可能需要比数据本身更多的网络)。然后应该在 DB 端解析它,这也会消耗大量时间并忽略所有好处(不谈论潜在的大小限制)。然后我已经 executemany 用块来测试它,而不是为 100k 记录构建单个语句。我没有在语句中使用值的串联,因为想保证它的安全。
  • insert all。同样的缺点,但没有工会。将其与 union 版本进行比较。
  • 序列化 JSON 中的数据,并使用 json_table 在 DB 端进行反序列化。单个短语句和单个数据传输具有潜在的良好性能,而 JSON 开销很小。
  • 您在 pl/sqL 包装程序中建议的 FORALL。应该与 executemany 相同,因为相同,但在数据库端。将数据转换为集合的开销。
  • 相同的 FORALL,但使用列式方法来传递数据:传递列值的简单列表而不是复杂类型。应该比带有集合的 FORALL 快得多,因为不需要将数据序列化为集合的类型。

我使用免费帐户在 Oracle 云中使用了 Oracle 自治数据库。每个方法循环执行 10 次,输入数据集相同,包含 100k 条记录,每次测试前重新创建表。这是我得到的结果。这里的准备时间和执行时间分别是客户端DB调用本身的数据转换。

>>> t = PerfTest(100000)
>>> t.run("exec_many",10)
Method:  exec_many.
    Duration,avg: 2.3083874 s
    Preparation time,avg: 0.0 s
    Execution time,avg: 2.3083874 s
>>> t.run("exec_many_append",10)
Method: exec_many_append.
    Duration,avg: 2.6031369 s
    Preparation time,avg: 2.6031369 s
>>> t.run("union_all",10,10000)
Method:  union_all.
    Duration,avg: 27.9444233 s
    Preparation time,avg: 0.0408773 s
    Execution time,avg: 27.8457551 s
>>> t.run("insert_all",10000)
Method: insert_all.
    Duration,avg: 70.6442494 s
    Preparation time,avg: 0.0289269 s
    Execution time,avg: 70.5541995 s
>>> t.run("json_table",10)
Method: json_table.
    Duration,avg: 10.4648237 s
    Preparation time,avg: 9.7907693 s
    Execution time,avg: 0.621006 s
>>> t.run("forall",10)
Method:     forall.
    Duration,avg: 5.5622837 s
    Preparation time,avg: 1.8972456000000002 s
    Execution time,avg: 3.6650380999999994 s
>>> t.run("forall_columnar",10)
Method: forall_columnar.
    Duration,avg: 2.6702698000000002 s
    Preparation time,avg: 0.055710800000000005 s
    Execution time,avg: 2.6105702 s
>>> 

最快的方式就是executemany,没什么好惊讶的。有趣的是,APPEND_VALUES 并没有改进查询并且平均获得更多时间,因此需要更多调查。

关于FORALL:正如预期的那样,每列的单个数组花费的时间更少,因为没有为其准备数据。它或多或少与 executemany 相当,但我认为 pl/sqL 开销在这里起到了一定的作用。

对我来说另一个有趣的部分是 JSON:大部分时间都花在将 LOB 写入数据库和序列化上,但查询本身非常快。也许可以使用 chuncsize 以某种方式改进写入操作,或者可以通过其他方式将 LOB 数据传递到 SELEct 语句中,但是就我的代码而言,使用 executemany 远非非常简单和直接的方法。

也有可能在没有 Python 的情况下应该作为外部数据的本地工具更快,但我没有测试它们:

以下是我用于测试的代码。

import cx_Oracle as db
import os,random,json
import datetiR_721_11845@e as dt


class PerfTest:
  
  def __init__(self,sizE):
    self._con = db.connect(
      os.environ["ora_cloud_usr"],os.environ["ora_cloud_pwd"],"test_low",encoding="UTF-8"
    )
    self._cur = self._con.cursor()
    self.inp = [(i,"Test {i}".format(i=i),random.random()) for i in range(sizE)]
  
  def __del__(self):
    if self._con:
      self._con.rollBACk()
      self._con.close()
 
#Create objets
  def setup(self):
    try:
      self._cur.execute("drop table rand")
      #print("table dropped")
    except:
      pass
  
    self._cur.execute("""create table rand(
      id int,str varchar2(100),val number
    )""")
    
    self._cur.execute("""create or replace package pkg_test as
  type ts_test is record (
    id rand.id%type,str rand.str%type,val rand.val%type
  );
  type tt_test is table of ts_test index by pls_Integer;
  
  type tt_ids is table of rand.id%type index by pls_Integer;
  type tt_strs is table of rand.str%type index by pls_Integer;
  type tt_vals is table of rand.val%type index by pls_Integer;
  
  procedure write_data(p_data in tt_test);
  procedure write_data_columnar(
    p_ids in tt_ids,p_strs in tt_strs,p_vals in tt_vals
  );

end;""")
    self._cur.execute("""create or replace package body pkg_test as
  procedure write_data(p_data in tt_test)
  as
  begin
    forall i in inDices of p_data
      insert into rand(id,str,val)
      values (p_data(i).id,p_data(i).str,p_data(i).val)
    ;
    
    commit;

  end;
  
  procedure write_data_columnar(
    p_ids in tt_ids,p_vals in tt_vals
  ) as
  begin
    forall i in inDices of p_ids
      insert into rand(id,val)
      values (p_ids(i),p_strs(i),p_vals(i))
    ;
    
    commit;
    
  end;

end;
""")

 
  def build_union(self,sizE):
      return """insert into rand(id,val)
    SELEct id,val from rand where 1 = 0 union all
    """ + """ union all """.join(
      ["SELEct :{},:{},:{} from dual".format(i*3+1,i*3+2,i*3+3)
        for i in range(sizE)]
    )
 
 
  def build_insert_all(self,sizE):
      return """
      """.join(
      ["into rand(id,val) values (:{},:{})".format(i*3+1,i*3+3)
        for i in range(sizE)]
    )


#Test case with executemany
  def exec_many(self):
    start = dt.datetiR_721_11845@e.now()
    self._cur.executemany("insert into rand(id,val) values (:1,:2,:3)",self.inp)
    self._con.commit()
    
    return (dt.timedelta(0),dt.datetiR_721_11845@e.now() - start)
 
 
#The same as above but with prepared statement (no parsing)
  def exec_many_append(self):
    start = dt.datetiR_721_11845@e.now()
    self._cur.executemany("insert /*+APPEND_VALUES*/ into rand(id,dt.datetiR_721_11845@e.now() - start)


#union all approach (chunked). Should have large parse time
  def union_all(self,sizE):
##Chunked list of big tuples
    start_prepare = dt.datetiR_721_11845@e.now()
    new_inp = [
      tuple([item for t in r for item in t])
      for r in list(zip(*[iter(self.inp)]*sizE))
    ]
    new_stmt = self.build_union(sizE)
    
    dur_prepare = dt.datetiR_721_11845@e.now() - start_prepare
    
    #Execute unions
    start_exec = dt.datetiR_721_11845@e.now()
    self._cur.executemany(new_stmt,new_inp)
    dur_exec = dt.datetiR_721_11845@e.now() - start_exec

##In case the size is not a divisor
    remainder = len(self.inp) % size
    if remainder > 0 :
      start_prepare = dt.datetiR_721_11845@e.now()
      new_stmt = self.build_union(remainder)
      new_inp = tuple([
        item for t in self.inp[-remainder:] for item in t
      ])
      dur_prepare += dt.datetiR_721_11845@e.now() - start_prepare
      
      start_exec = dt.datetiR_721_11845@e.now()
      self._cur.execute(new_stmt,new_inp)
      dur_exec += dt.datetiR_721_11845@e.now() - start_exec

    self._con.commit()
    
    return (dur_prepare,dur_exeC)


#The same as union all,but with no need to union something
  def insert_all(self,sizE):
##Chunked list of big tuples
    start_prepare = dt.datetiR_721_11845@e.now()
    new_inp = [
      tuple([item for t in r for item in t])
      for r in list(zip(*[iter(self.inp)]*sizE))
    ]
    new_stmt = """insert all
    {}
    SELEct * from dual"""
    dur_prepare = dt.datetiR_721_11845@e.now() - start_prepare
    
    #Execute
    start_exec = dt.datetiR_721_11845@e.now()
    self._cur.executemany(
      new_stmt.format(self.build_insert_all(sizE)),new_inp
    )
    dur_exec = dt.datetiR_721_11845@e.now() - start_exec

##In case the size is not a divisor
    remainder = len(self.inp) % size
    if remainder > 0 :
      start_prepare = dt.datetiR_721_11845@e.now()
      new_inp = tuple([
        item for t in self.inp[-remainder:] for item in t
      ])
      dur_prepare += dt.datetiR_721_11845@e.now() - start_prepare
      
      start_exec = dt.datetiR_721_11845@e.now()
      self._cur.execute(
        new_stmt.format(self.build_insert_all(remainder)),new_inp
      )
      dur_exec += dt.datetiR_721_11845@e.now() - start_exec

    self._con.commit()
    
    return (dur_prepare,dur_exeC)

    
#serialize at server side and do deserialization at DB side
  def json_table(self):
    start_prepare = dt.datetiR_721_11845@e.now()
    new_inp = json.dumps([
      { "id":t[0],"str":t[1],"val":t[2]} for t in self.inp
    ])
    
    lob_var = self._con.createlob(db.DB_TYPE_CLOB)
    lob_var.write(new_inp)
    
    start_exec = dt.datetiR_721_11845@e.now()
    self._cur.execute("""
    insert into rand(id,val
    from json_table(
      to_clob(:json),'$[*]'
      columns
        id int,val number
    )
    """,json=lob_var)
    dur_exec = dt.datetiR_721_11845@e.now() - start_exec
    
    self._con.commit()
    
    return (start_exec - start_prepare,dur_exeC)


#pl/sqL with FORALL
  def forall(self):
    start_prepare = dt.datetiR_721_11845@e.now()
    collection_type = self._con.gettype("PKG_TEST.TT_TEST")
    record_type = self._con.gettype("PKG_TEST.TS_TEST")
    
    def recBuilder(X):
      rec = record_type.newobject()
      rec.ID = x[0]
      rec.STR = x[1]
      rec.VAL = x[2]
      
      return rec

    inp_collection = collection_type.newobject([
      recBuilder(i) for i in self.inp
    ])
    
    start_exec = dt.datetiR_721_11845@e.now()
    self._cur.callproc("pkg_test.write_data",[inp_collection])
    dur_exec = dt.datetiR_721_11845@e.now() - start_exec
    
    return (start_exec - start_prepare,dur_exeC)


#pl/sqL with FORALL and plain collections
  def forall_columnar(self):
    start_prepare = dt.datetiR_721_11845@e.now()
    ids,strs,vals = map(list,zip(*self.inp))
    start_exec = dt.datetiR_721_11845@e.now()
    self._cur.callproc("pkg_test.write_data_columnar",[ids,vals])
    dur_exec = dt.datetiR_721_11845@e.now() - start_exec
    
    return (start_exec - start_prepare,dur_exeC)

  
#Run test
  def run(self,method,iterations,*args):
    #Cleanup scheR_721_11845@a
    self.setup()

    start = dt.datetiR_721_11845@e.now()
    runtime = []
    for i in range(iterations):
      single_run = getattr(self,method)(*args)
      runtime.append(single_run)
    
    dur = dt.datetiR_721_11845@e.now() - start
    dur_prep_@R_54_10586@l = sum([i.@R_54_10586@l_seconds() for i,_ in runtime])
    dur_exec_@R_54_10586@l = sum([i.@R_54_10586@l_seconds() for _,i in runtime])
    
    print("""Method: {meth}.
    Duration,avg: {run_dur} s
    Preparation time,avg: {prep} s
    Execution time,avg: {ex} s""".format(
      inp_s=len(self.inp),meth=method,run_dur=dur.@R_54_10586@l_seconds() / iterations,prep=dur_prep_@R_54_10586@l / iterations,ex=dur_exec_@R_54_10586@l / iterations
    ))

大佬总结

以上是大佬教程为你收集整理的有没有办法使用 FORALL 从数组中插入数据?全部内容,希望文章能够帮你解决有没有办法使用 FORALL 从数组中插入数据?所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。