批量插入到使用.NET的Oracle

使用.NET批量插入Oracle的最快方法是什么? 我需要使用.NET将大约160Klogging传输到Oracle。 目前,我正在使用插入语句并执行它160K次,大约需要25分钟才能完成。 源数据存储在DataTable中,作为从另一个数据库(MySQL)查询的结果,

有没有更好的方法来做到这一点?

编辑 :我目前正在使用System.Data.OracleClient,但愿意接受使用其他提供商(ODP.NET,DevArt等..)的解决scheme。

我使用ODP.NET中的数组绑定在15秒左右的时间内加载50,000条logging

它通过重复调用指定的存储过程(并且可以在其中执行更新/插入/删除)来工作,但是它将多个参数值从.NET传递到数据库。

为存储过程的每个参数指定一个值,而不是为每个参数指定一个值数组

Oracle一次将参数数组从.NET传递到数据库,然后使用您指定的参数值反复调用您指定的存储过程。

http://www.oracle.com/technetwork/issue-archive/2009/09-sep/o59odpnet-085168.html

/达米安

我最近发现了一个对于批量插入(ODP.NET)来说非常棒的特殊类。 Oracle.DataAccess.Client.OracleBulkCopy! 它需要一个数据表作为参数,然后你打电话WriteTOServer方法…这是非常快速和有效的,祝你好运!

Rob Stevenson-Legget的解决scheme很慢,因为他没有绑定他的值,但他使用了string.Format()。

当你要求Oracle执行一个sql语句时,首先要计算这个语句的has值。 之后,它在一个散列表中查找它是否已经知道这个语句。 如果它已经知道它的声明,它可以从这个哈希表中检索它的执行path,并且执行这个语句真的很快,因为Oracle之前已经执行了这个语句。 这被称为库caching,如果你不绑定你的sql语句,它就不能正常工作。

比如不要这样做:

int n;

for (n = 0; n < 100000; n ++) { mycommand.CommandText = String.Format("INSERT INTO [MyTable] ([MyId]) VALUES({0})", n + 1); mycommand.ExecuteNonQuery(); } 

但是:

  OracleParameter myparam = new OracleParameter(); int n; mycommand.CommandText = "INSERT INTO [MyTable] ([MyId]) VALUES(?)"; mycommand.Parameters.Add(myparam); for (n = 0; n < 100000; n ++) { myparam.Value = n + 1; mycommand.ExecuteNonQuery(); } 

不使用参数也可能导致sql注入。

SQL Server的SQLBulkCopy非常快速。 不幸的是,我发现OracleBulkCopy要慢得多。 也有问题:

  • 如果您打算使用OracleBulkCopy,则必须确保input数据是干净的。 如果发生主键冲突,则会引发一个ORA-26026,它似乎是不可恢复的。 尝试重build索引不起作用,表中的任何后续插入失败,也正常插入。
  • 即使数据是干净的,我发现OracleBulkCopy有时会卡在WriteToServer中。 问题似乎取决于批量大小。 在我的testing数据中,当我重复的时候,这个问题会发生在我testing的同一点。 使用更大或更小的批量,问题不会发生。 我发现在更大批量的情况下速度更不规则,这就意味着与内存pipe理有关的问题。

其实System.Data.OracleClient.OracleDataAdapter比OracleBulkCopy更快,如果你想填充一个小logging,但很多行的表。 您需要调整批量大小,但OracleDataAdapter的最佳BatchSize小于OracleBulkCopy。

我使用x86可执行文件和32位ODP.Net客户端2.112.1.0在Windows 7机器上运行我的testing。 。 OracleDataAdapter是System.Data.OracleClient 2.0.0.0的一部分。 我的testing集大约有60万行,最大logging大小。 102个字节(平均大小43个字符)。 数据源是一个25 MB的文本文件,逐行读取为一个stream。

在我的testing中,我将input数据表build立为固定的表大小,然后使用OracleBulkCopy或OracleDataAdapter将数据块复制到服务器。 我在OracleBulkCopy中将BatchSize保留为0(以便当前表内容被复制为一个批处理),并将其设置为OracleDataAdapter中的表大小(同样应该在内部创build一个批处理)。 最好的结果:

  • OracleBulkCopy:表大小= 500,总持续时间4'22“
  • OracleDataAdapter:表大小= 100,总持续时间3'03“

为了比较:

  • SqlBulkCopy:表大小= 1000,总持续时间0'15“
  • SqlDataAdapter:表大小= 1000,总持续时间8'05“

相同的客户机,testing服务器是SQL Server 2008 R2。 对于SQL Server来说,批量复制显然是最好的select。 它不仅总体上最快,而且服务器负载也低于使用数据适配器时的负载。 OracleBulkCopy不提供相同的体验,这是一个可惜的事情 – BulkCopy API比DataAdapter更容易使用。

解决这个问题的一个非常快速的方法是build立从Oracle数据库到MySQL数据库的数据库链接。 您可以创build数据库链接到非Oracle数据库。 创build数据库链接后,您可以从MySQL数据库检索您的数据…创build表mydata作为select * from …语句。 这被称为异构连接。 这样你就不必在你的.net应用程序中做任何事情来移动数据。

另一种方法是使用ODP.NET。 在ODP.NET中,您可以使用OracleBulkCopy类。

但我不认为用System.Data.OracleClient在Oracle表中插入160klogging需要25分钟。 我觉得你犯太多次了 你是否将你的值绑定到插入语句的参数,或者你连接你的值。 绑定要快得多。

你不应该提交每一个插入,因为提交需要很多时间。

您使用哪个提供程序将您的.NET应用程序连接到Oracle数据库? 您是否使用ODP.NET或Devart提供程序(又名Corelab提供程序),还是使用Oracle的Microsoft提供程序(System.Data.OracleClient)?

甲骨文表示( http://www.oracle.com/technology/products/database/utilities/htdocs/sql_loader_overview.html

SQL * Loader是使用外部文件中的数据快速填充Oracle表的主要方法

我的经验是,他们的加载器加载他们的表比任何其他更快。

跟随西奥的build议与我的研究结果(道歉 – 我目前没有足够的声誉发表这个评论)

首先,这是如何使用几个命名参数:

 String commandString = "INSERT INTO Users (Name, Desk, UpdateTime) VALUES (:Name, :Desk, :UpdateTime)"; using (OracleCommand command = new OracleCommand(commandString, _connection, _transaction)) { command.Parameters.Add("Name", OracleType.VarChar, 50).Value = strategy; command.Parameters.Add("Desk", OracleType.VarChar, 50).Value = deskName ?? OracleString.Null; command.Parameters.Add("UpdateTime", OracleType.DateTime).Value = updated; command.ExecuteNonQuery(); } 

不过,我看到速度之间没有变化:

  • 为每一行构造一个新的commandString(String.Format)
  • 为每一行构造一个现在参数化的commandString
  • 使用一个commandString并更改参数

我正在使用System.Data.OracleClient,在事务中删除和插入2500行

发现链接的例子有点混乱,我找出了一些代码,演示了如何将一个工作数组插入到一个testing表(jkl_test)中。 这是表格:

 create table jkl_test (id number(9)); 

这里是一个简单的控制台应用程序的.Net代码,它使用ODP.Net连接到Oracle,并插入一个由5个整数组成的数组:

 using Oracle.DataAccess.Client; namespace OracleArrayInsertExample { class Program { static void Main(string[] args) { // Open a connection using ODP.Net var connection = new OracleConnection("Data Source=YourDatabase; Password=YourPassword; User Id=YourUser"); connection.Open(); // Create an insert command var command = connection.CreateCommand(); command.CommandText = "insert into jkl_test values (:ids)"; // Set up the parameter and provide values var param = new OracleParameter("ids", OracleDbType.Int32); param.Value = new int[] { 22, 55, 7, 33, 11 }; // This is critical to the process; in order for the command to // recognize and bind arrays, an array bind count must be specified. // Set it to the length of the array. command.ArrayBindCount = 5; command.Parameters.Add(param); command.ExecuteNonQuery(); } } } 

我想OracleBulkCopy是最快的方法之一。 我有一些麻烦学习,我需要一个新的ODAC版本。 参看 在哪里键入[Oracle.DataAccess.Client.OracleBulkCopy]?

这里是完整的PowerShell代码,从查询复制到适合的现有Oracle表。 我试过Sql-Server数据源,但其他有效的OLE-DB源将会去。

 if ($ora_dll -eq $null) { "Load Oracle dll" $ora_dll = [System.Reflection.Assembly]::LoadWithPartialName("Oracle.DataAccess") $ora_dll } # sql-server or Oracle source example is sql-server $ConnectionString ="server=localhost;database=myDatabase;trusted_connection=yes;Provider=SQLNCLI10;" # Oracle destination $oraClientConnString = "Data Source=myTNS;User ID=myUser;Password=myPassword" $tableName = "mytable" $sql = "select * from $tableName" $OLEDBConn = New-Object System.Data.OleDb.OleDbConnection($ConnectionString) $OLEDBConn.open() $readcmd = New-Object system.Data.OleDb.OleDbCommand($sql,$OLEDBConn) $readcmd.CommandTimeout = '300' $da = New-Object system.Data.OleDb.OleDbDataAdapter($readcmd) $dt = New-Object system.Data.datatable [void]$da.fill($dt) $OLEDBConn.close() #Write-Output $dt if ($dt) { try { $bulkCopy = new-object ("Oracle.DataAccess.Client.OracleBulkCopy") $oraClientConnString $bulkCopy.DestinationTableName = $tableName $bulkCopy.BatchSize = 5000 $bulkCopy.BulkCopyTimeout = 10000 $bulkCopy.WriteToServer($dt) $bulkcopy.close() $bulkcopy.Dispose() } catch { $ex = $_.Exception Write-Error "Write-DataTable$($connectionName):$ex.Message" continue } } 

顺便说一句:我用这个CLOB列复制表。 我没有得到这个工作使用链接的服务器比较。 关于dba的问题 。 我没有重新尝试链接服务与新的ODAC。

如果您使用的是非托pipe的Oracle客户端(Oracle.DataAccess),那么最快的方法就是像Tarik指出的那样使用OracleBulkCopy。

如果您使用的是最新的托pipeoracle客户端(Oracle.ManagedDataAccess),那么最快的方法就是使用数组绑定,就像Damien指出的那样。 如果希望保持应用程序代码不受数组绑定的限制,则可以使用数组绑定来编写自己的OracleBulkCopy实现。

以下是来自实际项目的使用示例:

 var bulkWriter = new OracleDbBulkWriter(); bulkWriter.Write( connection, "BULK_WRITE_TEST", Enumerable.Range(1, 10000).Select(v => new TestData { Id = v, StringValue=v.ToString() }).ToList()); 

在500毫秒内插入10K条logging!

这是实现:

 public class OracleDbBulkWriter : IDbBulkWriter { public void Write<T>(IDbConnection connection, string targetTableName, IList<T> data, IList<ColumnToPropertyMapping> mappings = null) { if (connection == null) { throw new ArgumentNullException(nameof(connection)); } if (string.IsNullOrEmpty(targetTableName)) { throw new ArgumentNullException(nameof(targetTableName)); } if (data == null) { throw new ArgumentNullException(nameof(data)); } if (mappings == null) { mappings = GetGenericMappings<T>(); } mappings = GetUniqueMappings<T>(mappings); Dictionary<string, Array> parameterValues = InitializeParameterValues<T>(mappings, data.Count); FillParameterValues(parameterValues, data); using (var command = CreateCommand(connection, targetTableName, mappings, parameterValues)) { command.ExecuteNonQuery(); } } private static IDbCommand CreateCommand(IDbConnection connection, string targetTableName, IList<ColumnToPropertyMapping> mappings, Dictionary<string, Array> parameterValues) { var command = (OracleCommandWrapper)connection.CreateCommand(); command.ArrayBindCount = parameterValues.First().Value.Length; foreach(var mapping in mappings) { var parameter = command.CreateParameter(); parameter.ParameterName = mapping.Column; parameter.Value = parameterValues[mapping.Property]; command.Parameters.Add(parameter); } command.CommandText = $@"insert into {targetTableName} ({string.Join(",", mappings.Select(m => m.Column))}) values ({string.Join(",", mappings.Select(m => $":{m.Column}")) })"; return command; } private IList<ColumnToPropertyMapping> GetGenericMappings<T>() { var accessor = TypeAccessor.Create(typeof(T)); var mappings = accessor.GetMembers() .Select(m => new ColumnToPropertyMapping(m.Name, m.Name)) .ToList(); return mappings; } private static IList<ColumnToPropertyMapping> GetUniqueMappings<T>(IList<ColumnToPropertyMapping> mappings) { var accessor = TypeAccessor.Create(typeof(T)); var members = new HashSet<string>(accessor.GetMembers().Select(m => m.Name)); mappings = mappings .Where(m => m != null && members.Contains(m.Property)) .GroupBy(m => m.Column) .Select(g => g.First()) .ToList(); return mappings; } private static Dictionary<string, Array> InitializeParameterValues<T>(IList<ColumnToPropertyMapping> mappings, int numberOfRows) { var values = new Dictionary<string, Array>(mappings.Count); var accessor = TypeAccessor.Create(typeof(T)); var members = accessor.GetMembers().ToDictionary(m => m.Name); foreach(var mapping in mappings) { var member = members[mapping.Property]; values[mapping.Property] = Array.CreateInstance(member.Type, numberOfRows); } return values; } private static void FillParameterValues<T>(Dictionary<string, Array> parameterValues, IList<T> data) { var accessor = TypeAccessor.Create(typeof(T)); for (var rowNumber = 0; rowNumber < data.Count; rowNumber++) { var row = data[rowNumber]; foreach (var pair in parameterValues) { Array parameterValue = pair.Value; var propertyValue = accessor[row, pair.Key]; parameterValue.SetValue(propertyValue, rowNumber); } } } } 

注意:此实现使用Fastmember包来优化对属性的访问(比reflection快得多)