| 方案 | 數量 | 時間 |
|---|---|---|
| Insert | 1千條 | 145.4351ms |
| BatchInsert | 1千條 | 103.9061ms |
| SqlBulkCopy | 1千條 | 7.021ms |
| Insert | 1萬條 | 1501.326ms |
| BatchInsert | 1萬條 | 850.6274ms |
| SqlBulkCopy | 1萬條 | 30.5129ms |
| Insert | 10萬條 | 13875.4934ms |
| BatchInsert | 10萬條 | 8278.9056ms |
| SqlBulkCopy | 10萬條 | 314.8402ms |
兩者插入效率對比,Insert明顯比SqlBulkCopy要慢太多,大概20~40倍性能差距,下面我們將SqlBulkCopy封裝一下,讓批量插入更加方便
批量插入擴展方法簽名
| 方法 | 方法參數 | 介紹 |
|---|---|---|
| BulkCopy | 同步的批量插入方法 | |
| SqlConnection connection | sql server 連接對象 | |
| IEnumerableT> source | 需要批量插入的數據源 | |
| string tableName = null | 插入表名稱【為NULL默認為實體名稱】 | |
| int bulkCopyTimeout = 30 | 批量插入超時時間 | |
| int batchSize = 0 | 寫入數據庫一批數量【如果為0代表全部一次性插入】最合適數量【這取決于您的環境,尤其是行數和網絡延遲。就個人而言,我將從BatchSize屬性設置為1000行開始,然后看看其性能如何。如果可行,那么我將使行數加倍(例如增加到2000、4000等),直到性能下降或超時。否則,如果超時發生在1000,那么我將行數減少一半(例如500),直到它起作用為止?!?/td> | |
| SqlBulkCopyOptions options = SqlBulkCopyOptions.Default | 批量復制參數 | |
| SqlTransaction externalTransaction = null | 執行的事務對象 | |
| BulkCopyAsync | 異步的批量插入方法 | |
| SqlConnection connection | sql server 連接對象 | |
| IEnumerableT> source | 需要批量插入的數據源 | |
| string tableName = null | 插入表名稱【為NULL默認為實體名稱】 | |
| int bulkCopyTimeout = 30 | 批量插入超時時間 | |
| int batchSize = 0 | 寫入數據庫一批數量【如果為0代表全部一次性插入】最合適數量【這取決于您的環境,尤其是行數和網絡延遲。就個人而言,我將從BatchSize屬性設置為1000行開始,然后看看其性能如何。如果可行,那么我將使行數加倍(例如增加到2000、4000等),直到性能下降或超時。否則,如果超時發生在1000,那么我將行數減少一半(例如500),直到它起作用為止?!?/td> | |
| SqlBulkCopyOptions options = SqlBulkCopyOptions.Default | 批量復制參數 | |
| SqlTransaction externalTransaction = null | 執行的事務對象 |
這個方法主要解決了兩個問題:
DataTable或者IDataReader接口實現類,手動構建的轉換比較難以維護,如果修改字段就得把這些地方都進行修改,特別是還需要將枚舉類型特殊處理,轉換成他的基礎類型(默認int)SqlBulkCopy對象,和配置數據庫列的映射,和一些屬性的配置此方案也是在我公司中使用,以滿足公司的批量插入數據的需求,例如第三方的對賬數據此方法使用的是Expression動態生成數據轉換函數,其效率和手寫的原生代碼差不多,和原生手寫代碼相比,多余的轉換損失很小【最大的性能損失都是在值類型拆裝箱上】
此方案和其他網上的方案有些不同的是:不是將List先轉換成DataTable,然后寫入SqlBulkCopy的,而是使用一個實現IDataReader的讀取器包裝List,每往SqlBulkCopy插入一行數據才會轉換一行數據
IDataReader方案和DataTable方案相比優點
效率高:DataTable方案需要先完全轉換后,才能交由SqlBulkCopy寫入數據庫,而IDataReader方案可以邊轉換邊交給SqlBulkCopy寫入數據庫(例如:10萬數據插入速度可提升30%)
占用內存少:DataTable方案需要先完全轉換后,才能交由SqlBulkCopy寫入數據庫,需要占用大量內存,而IDataReader方案可以邊轉換邊交給SqlBulkCopy寫入數據庫,無須占用過多內存
強大:因為是邊寫入邊轉換,而且EnumerableReader傳入的是一個迭代器,可以實現持續插入數據的效果
① 實體Model與表映射
數據庫表代碼
CREATE TABLE [dbo].[Person]( [Id] [BIGINT] NOT NULL, [Name] [VARCHAR](64) NOT NULL, [Age] [INT] NOT NULL, [CreateTime] [DATETIME] NULL, [Sex] [INT] NOT NULL, PRIMARY KEY CLUSTERED ( [Id] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] ) ON [PRIMARY]
實體類代碼
public class Person
{
public long Id { get; set; }
public string Name { get; set; }
public int Age { get; set; }
public DateTime? CreateTime { get; set; }
public Gender Sex { get; set; }
}
public enum Gender
{
Man = 0,
Woman = 1
}
SqlBulkCopy類型的ColumnMappings屬性來完成,數據列與數據庫中列的映射
//創建批量插入對象
using (var copy = new SqlBulkCopy(connection, options, externalTransaction))
{
foreach (var column in ModelToDataTableTModel>.Columns)
{
//創建字段映射
copy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
}
}
② 實體轉換成數據行
將數據轉換成數據行采用的是:反射+Expression來完成
其中反射是用于獲取編寫Expression所需程序類,屬性等信息
其中Expression是用于生成高效轉換函數其中ModelToDataTableTModel>類型利用了靜態泛型類特性,實現泛型參數的緩存效果
在ModelToDataTableTModel>的靜態構造函數中,生成轉換函數,獲取需要轉換的屬性信息,并存入靜態只讀字段中,完成緩存
③ 使用IDataReader插入數據的重載
EnumerableReader是實現了IDataReader接口的讀取類,用于將模型對象,在迭代器中讀取出來,并轉換成數據行,可供SqlBulkCopy讀取
SqlBulkCopy只會調用三個方法:GetOrdinal、Read、GetValue
GetOrdinal只會在首行讀取每個列所代表序號【需要填寫:SqlBulkCopy類型的ColumnMappings屬性】Read方法是迭代到下一行,并調用ModelToDataTableTModel>.ToRowData.Invoke()來將模型對象轉換成數據行object[]GetValue方法是獲取當前行指定下標位置的值擴展方法類
public static class SqlConnectionExtension
{
/// summary>
/// 批量復制
/// /summary>
/// typeparam name="TModel">插入的模型對象/typeparam>
/// param name="source">需要批量插入的數據源/param>
/// param name="connection">數據庫連接對象/param>
/// param name="tableName">插入表名稱【為NULL默認為實體名稱】/param>
/// param name="bulkCopyTimeout">插入超時時間/param>
/// param name="batchSize">寫入數據庫一批數量【如果為0代表全部一次性插入】最合適數量【這取決于您的環境,尤其是行數和網絡延遲。就個人而言,我將從BatchSize屬性設置為1000行開始,然后看看其性能如何。如果可行,那么我將使行數加倍(例如增加到2000、4000等),直到性能下降或超時。否則,如果超時發生在1000,那么我將行數減少一半(例如500),直到它起作用為止?!?param>
/// param name="options">批量復制參數/param>
/// param name="externalTransaction">執行的事務對象/param>
/// returns>插入數量/returns>
public static int BulkCopyTModel>(this SqlConnection connection,
IEnumerableTModel> source,
string tableName = null,
int bulkCopyTimeout = 30,
int batchSize = 0,
SqlBulkCopyOptions options = SqlBulkCopyOptions.Default,
SqlTransaction externalTransaction = null)
{
//創建讀取器
using (var reader = new EnumerableReaderTModel>(source))
{
//創建批量插入對象
using (var copy = new SqlBulkCopy(connection, options, externalTransaction))
{
//插入的表
copy.DestinationTableName = tableName ?? typeof(TModel).Name;
//寫入數據庫一批數量
copy.BatchSize = batchSize;
//超時時間
copy.BulkCopyTimeout = bulkCopyTimeout;
//創建字段映射【如果沒有此字段映射會導致數據填錯位置,如果類型不對還會導致報錯】【因為:沒有此字段映射默認是按照列序號對應插入的】
foreach (var column in ModelToDataTableTModel>.Columns)
{
//創建字段映射
copy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
}
//將數據批量寫入數據庫
copy.WriteToServer(reader);
//返回插入數據數量
return reader.Depth;
}
}
}
/// summary>
/// 批量復制-異步
/// /summary>
/// typeparam name="TModel">插入的模型對象/typeparam>
/// param name="source">需要批量插入的數據源/param>
/// param name="connection">數據庫連接對象/param>
/// param name="tableName">插入表名稱【為NULL默認為實體名稱】/param>
/// param name="bulkCopyTimeout">插入超時時間/param>
/// param name="batchSize">寫入數據庫一批數量【如果為0代表全部一次性插入】最合適數量【這取決于您的環境,尤其是行數和網絡延遲。就個人而言,我將從BatchSize屬性設置為1000行開始,然后看看其性能如何。如果可行,那么我將使行數加倍(例如增加到2000、4000等),直到性能下降或超時。否則,如果超時發生在1000,那么我將行數減少一半(例如500),直到它起作用為止?!?param>
/// param name="options">批量復制參數/param>
/// param name="externalTransaction">執行的事務對象/param>
/// returns>插入數量/returns>
public static async Taskint> BulkCopyAsyncTModel>(this SqlConnection connection,
IEnumerableTModel> source,
string tableName = null,
int bulkCopyTimeout = 30,
int batchSize = 0,
SqlBulkCopyOptions options = SqlBulkCopyOptions.Default,
SqlTransaction externalTransaction = null)
{
//創建讀取器
using (var reader = new EnumerableReaderTModel>(source))
{
//創建批量插入對象
using (var copy = new SqlBulkCopy(connection, options, externalTransaction))
{
//插入的表
copy.DestinationTableName = tableName ?? typeof(TModel).Name;
//寫入數據庫一批數量
copy.BatchSize = batchSize;
//超時時間
copy.BulkCopyTimeout = bulkCopyTimeout;
//創建字段映射【如果沒有此字段映射會導致數據填錯位置,如果類型不對還會導致報錯】【因為:沒有此字段映射默認是按照列序號對應插入的】
foreach (var column in ModelToDataTableTModel>.Columns)
{
//創建字段映射
copy.ColumnMappings.Add(column.ColumnName, column.ColumnName);
}
//將數據批量寫入數據庫
await copy.WriteToServerAsync(reader);
//返回插入數據數量
return reader.Depth;
}
}
}
}
封裝的迭代器數據讀取器
/// summary>
/// 迭代器數據讀取器
/// /summary>
/// typeparam name="TModel">模型類型/typeparam>
public class EnumerableReaderTModel> : IDataReader
{
/// summary>
/// 實例化迭代器讀取對象
/// /summary>
/// param name="source">模型源/param>
public EnumerableReader(IEnumerableTModel> source)
{
_source = source ?? throw new ArgumentNullException(nameof(source));
_enumerable = source.GetEnumerator();
}
private readonly IEnumerableTModel> _source;
private readonly IEnumeratorTModel> _enumerable;
private object[] _currentDataRow = Array.Emptyobject>();
private int _depth;
private bool _release;
public void Dispose()
{
_release = true;
_enumerable.Dispose();
}
public int GetValues(object[] values)
{
if (values == null) throw new ArgumentNullException(nameof(values));
var length = Math.Min(_currentDataRow.Length, values.Length);
Array.Copy(_currentDataRow, values, length);
return length;
}
public int GetOrdinal(string name)
{
for (int i = 0; i ModelToDataTableTModel>.Columns.Count; i++)
{
if (ModelToDataTableTModel>.Columns[i].ColumnName == name) return i;
}
return -1;
}
public long GetBytes(int ordinal, long dataIndex, byte[] buffer, int bufferIndex, int length)
{
if (dataIndex 0) throw new Exception($"起始下標不能小于0!");
if (bufferIndex 0) throw new Exception("目標緩沖區起始下標不能小于0!");
if (length 0) throw new Exception("讀取長度不能小于0!");
var numArray = (byte[])GetValue(ordinal);
if (buffer == null) return numArray.Length;
if (buffer.Length = bufferIndex) throw new Exception("目標緩沖區起始下標不能大于目標緩沖區范圍!");
var freeLength = Math.Min(numArray.Length - bufferIndex, length);
if (freeLength = 0) return 0;
Array.Copy(numArray, dataIndex, buffer, bufferIndex, length);
return freeLength;
}
public long GetChars(int ordinal, long dataIndex, char[] buffer, int bufferIndex, int length)
{
if (dataIndex 0) throw new Exception($"起始下標不能小于0!");
if (bufferIndex 0) throw new Exception("目標緩沖區起始下標不能小于0!");
if (length 0) throw new Exception("讀取長度不能小于0!");
var numArray = (char[])GetValue(ordinal);
if (buffer == null) return numArray.Length;
if (buffer.Length = bufferIndex) throw new Exception("目標緩沖區起始下標不能大于目標緩沖區范圍!");
var freeLength = Math.Min(numArray.Length - bufferIndex, length);
if (freeLength = 0) return 0;
Array.Copy(numArray, dataIndex, buffer, bufferIndex, length);
return freeLength;
}
public bool IsDBNull(int i)
{
var value = GetValue(i);
return value == null || value is DBNull;
}
public bool NextResult()
{
//移動到下一個元素
if (!_enumerable.MoveNext()) return false;
//行層+1
Interlocked.Increment(ref _depth);
//得到數據行
_currentDataRow = ModelToDataTableTModel>.ToRowData.Invoke(_enumerable.Current);
return true;
}
public byte GetByte(int i) => (byte)GetValue(i);
public string GetName(int i) => ModelToDataTableTModel>.Columns[i].ColumnName;
public string GetDataTypeName(int i) => ModelToDataTableTModel>.Columns[i].DataType.Name;
public Type GetFieldType(int i) => ModelToDataTableTModel>.Columns[i].DataType;
public object GetValue(int i) => _currentDataRow[i];
public bool GetBoolean(int i) => (bool)GetValue(i);
public char GetChar(int i) => (char)GetValue(i);
public Guid GetGuid(int i) => (Guid)GetValue(i);
public short GetInt16(int i) => (short)GetValue(i);
public int GetInt32(int i) => (int)GetValue(i);
public long GetInt64(int i) => (long)GetValue(i);
public float GetFloat(int i) => (float)GetValue(i);
public double GetDouble(int i) => (double)GetValue(i);
public string GetString(int i) => (string)GetValue(i);
public decimal GetDecimal(int i) => (decimal)GetValue(i);
public DateTime GetDateTime(int i) => (DateTime)GetValue(i);
public IDataReader GetData(int i) => throw new NotSupportedException();
public int FieldCount => ModelToDataTableTModel>.Columns.Count;
public object this[int i] => GetValue(i);
public object this[string name] => GetValue(GetOrdinal(name));
public void Close() => Dispose();
public DataTable GetSchemaTable() => ModelToDataTableTModel>.ToDataTable(_source);
public bool Read() => NextResult();
public int Depth => _depth;
public bool IsClosed => _release;
public int RecordsAffected => 0;
}
模型對象轉數據行工具類
/// summary>
/// 對象轉換成DataTable轉換類
/// /summary>
/// typeparam name="TModel">泛型類型/typeparam>
public static class ModelToDataTableTModel>
{
static ModelToDataTable()
{
//如果需要剔除某些列可以修改這段代碼
var propertyList = typeof(TModel).GetProperties().Where(w => w.CanRead).ToArray();
Columns = new ReadOnlyCollectionDataColumn>(propertyList
.Select(pr => new DataColumn(pr.Name, GetDataType(pr.PropertyType))).ToArray());
//生成對象轉數據行委托
ToRowData = BuildToRowDataDelegation(typeof(TModel), propertyList);
}
/// summary>
/// 構建轉換成數據行委托
/// /summary>
/// param name="type">傳入類型/param>
/// param name="propertyList">轉換的屬性/param>
/// returns>轉換數據行委托/returns>
private static FuncTModel, object[]> BuildToRowDataDelegation(Type type, PropertyInfo[] propertyList)
{
var source = Expression.Parameter(type);
var items = propertyList.Select(property => ConvertBindPropertyToData(source, property));
var array = Expression.NewArrayInit(typeof(object), items);
var lambda = Expression.LambdaFuncTModel, object[]>>(array, source);
return lambda.Compile();
}
/// summary>
/// 將屬性轉換成數據
/// /summary>
/// param name="source">源變量/param>
/// param name="property">屬性信息/param>
/// returns>獲取屬性數據表達式/returns>
private static Expression ConvertBindPropertyToData(ParameterExpression source, PropertyInfo property)
{
var propertyType = property.PropertyType;
var expression = (Expression)Expression.Property(source, property);
if (propertyType.IsEnum)
expression = Expression.Convert(expression, propertyType.GetEnumUnderlyingType());
return Expression.Convert(expression, typeof(object));
}
/// summary>
/// 獲取數據類型
/// /summary>
/// param name="type">屬性類型/param>
/// returns>數據類型/returns>
private static Type GetDataType(Type type)
{
//枚舉默認轉換成對應的值類型
if (type.IsEnum)
return type.GetEnumUnderlyingType();
//可空類型
if (type.IsGenericType type.GetGenericTypeDefinition() == typeof(Nullable>))
return GetDataType(type.GetGenericArguments().First());
return type;
}
/// summary>
/// 列集合
/// /summary>
public static IReadOnlyListDataColumn> Columns { get; }
/// summary>
/// 對象轉數據行委托
/// /summary>
public static FuncTModel, object[]> ToRowData { get; }
/// summary>
/// 集合轉換成DataTable
/// /summary>
/// param name="source">集合/param>
/// param name="tableName">表名稱/param>
/// returns>轉換完成的DataTable/returns>
public static DataTable ToDataTable(IEnumerableTModel> source, string tableName = "TempTable")
{
//創建表對象
var table = new DataTable(tableName);
//設置列
foreach (var dataColumn in Columns)
{
table.Columns.Add(new DataColumn(dataColumn.ColumnName, dataColumn.DataType));
}
//循環轉換每一行數據
foreach (var item in source)
{
table.Rows.Add(ToRowData.Invoke(item));
}
//返回表對象
return table;
}
}
創表代碼
CREATE TABLE [dbo].[Person]( [Id] [BIGINT] NOT NULL, [Name] [VARCHAR](64) NOT NULL, [Age] [INT] NOT NULL, [CreateTime] [DATETIME] NULL, [Sex] [INT] NOT NULL, PRIMARY KEY CLUSTERED ( [Id] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] ) ON [PRIMARY]
實體類代碼
定義的實體的屬性名稱需要和SqlServer列名稱類型對應
public class Person
{
public long Id { get; set; }
public string Name { get; set; }
public int Age { get; set; }
public DateTime? CreateTime { get; set; }
public Gender Sex { get; set; }
}
public enum Gender
{
Man = 0,
Woman = 1
}
測試方法
//生成10萬條數據
var persons = new Person[100000];
var random = new Random();
for (int i = 0; i persons.Length; i++)
{
persons[i] = new Person
{
Id = i + 1,
Name = "張三" + i,
Age = random.Next(1, 128),
Sex = (Gender)random.Next(2),
CreateTime = random.Next(2) == 0 ? null : (DateTime?) DateTime.Now.AddSeconds(i)
};
}
//創建數據庫連接
using (var conn = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;"))
{
conn.Open();
var sw = Stopwatch.StartNew();
//批量插入數據
var qty = conn.BulkCopy(persons);
sw.Stop();
Console.WriteLine(sw.Elapsed.TotalMilliseconds + "ms");
}
執行批量插入結果
226.4767ms
請按任意鍵繼續. . .

GitHub代碼地址:https://github.com/liu-zhen-liang/PackagingComponentsSet/tree/main/SqlBulkCopyComponents