在之前只知道SqlServer支持數(shù)據(jù)批量插入,殊不知道Oracle、SQLite和MySql也是支持的,不過Oracle需要使用Orace.DataAccess驅(qū)動,今天就貼出幾種數(shù)據(jù)庫的批量插入解決方法。
首先說一下,IProvider里有一個用于實現(xiàn)批量插入的插件服務(wù)接口IBatcherProvider,此接口在前一篇文章中已經(jīng)提到過了。
1
2
3
4
5
6
7
8
9
10
11
12
|
/// <summary> /// 提供數(shù)據(jù)批量處理的方法。 /// </summary> public interface IBatcherProvider : IProviderService { /// <summary> /// 將 <see cref="DataTable"/> 的數(shù)據(jù)批量插入到數(shù)據(jù)庫中。 /// </summary> /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> /// <param name="batchSize">每批次寫入的數(shù)據(jù)量。</param> void Insert(DataTable dataTable, int batchSize = 10000); } |
一、SqlServer數(shù)據(jù)批量插入
SqlServer的批量插入很簡單,使用SqlBulkCopy就可以,以下是該類的實現(xiàn):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
/// <summary> /// 為 System.Data.SqlClient 提供的用于批量操作的方法。 /// </summary> public sealed class MsSqlBatcher : IBatcherProvider { /// <summary> /// 獲取或設(shè)置提供者服務(wù)的上下文。 /// </summary> public ServiceContext ServiceContext { get ; set ; } /// <summary> /// 將 <see cref="DataTable"/> 的數(shù)據(jù)批量插入到數(shù)據(jù)庫中。 /// </summary> /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> /// <param name="batchSize">每批次寫入的數(shù)據(jù)量。</param> public void Insert(DataTable dataTable, int batchSize = 10000) { Checker.ArgumentNull(dataTable, "dataTable" ); if (dataTable.Rows.Count == 0) { return ; } using (var connection = (SqlConnection)ServiceContext.Database.CreateConnection()) { try { connection.TryOpen(); //給表名加上前后導(dǎo)符 var tableName = DbUtility.FormatByQuote(ServiceContext.Database.Provider.GetService<ISyntaxProvider>(), dataTable.TableName); using (var bulk = new SqlBulkCopy(connection, SqlBulkCopyOptions.KeepIdentity, null ) { DestinationTableName = tableName, BatchSize = batchSize }) { //循環(huán)所有列,為bulk添加映射 dataTable.EachColumn(c => bulk.ColumnMappings.Add(c.ColumnName, c.ColumnName), c => !c.AutoIncrement); bulk.WriteToServer(dataTable); bulk.Close(); } } catch (Exception exp) { throw new BatcherException(exp); } finally { connection.TryClose(); } } } } |
以上沒有使用事務(wù),使用事務(wù)在性能上會有一定的影響,如果要使用事務(wù),可以設(shè)置SqlBulkCopyOptions.UseInternalTransaction。
二、Oracle數(shù)據(jù)批量插入
System.Data.OracleClient不支持批量插入,因此只能使用Oracle.DataAccess組件來作為提供者。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
|
/// <summary> /// Oracle.Data.Access 組件提供的用于批量操作的方法。 /// </summary> public sealed class OracleAccessBatcher : IBatcherProvider { /// <summary> /// 獲取或設(shè)置提供者服務(wù)的上下文。 /// </summary> public ServiceContext ServiceContext { get ; set ; } /// <summary> /// 將 <see cref="DataTable"/> 的數(shù)據(jù)批量插入到數(shù)據(jù)庫中。 /// </summary> /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> /// <param name="batchSize">每批次寫入的數(shù)據(jù)量。</param> public void Insert(DataTable dataTable, int batchSize = 10000) { Checker.ArgumentNull(dataTable, "dataTable" ); if (dataTable.Rows.Count == 0) { return ; } using (var connection = ServiceContext.Database.CreateConnection()) { try { connection.TryOpen(); using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) { if (command == null ) { throw new BatcherException( new ArgumentException( "command" )); } command.Connection = connection; command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable); command.ExecuteNonQuery(); } } catch (Exception exp) { throw new BatcherException(exp); } finally { connection.TryClose(); } } } /// <summary> /// 生成插入數(shù)據(jù)的sql語句。 /// </summary> /// <param name="database"></param> /// <param name="command"></param> /// <param name="table"></param> /// <returns></returns> private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table) { var names = new StringBuilder(); var values = new StringBuilder(); //將一個DataTable的數(shù)據(jù)轉(zhuǎn)換為數(shù)組的數(shù)組 var data = table.ToArray(); //設(shè)置ArrayBindCount屬性 command.GetType().GetProperty( "ArrayBindCount" ).SetValue(command, table.Rows.Count, null ); var syntax = database.Provider.GetService<ISyntaxProvider>(); for (var i = 0; i < table.Columns.Count; i++) { var column = table.Columns[i]; var parameter = database.Provider.DbProviderFactory.CreateParameter(); if (parameter == null ) { continue ; } parameter.ParameterName = column.ColumnName; parameter.Direction = ParameterDirection.Input; parameter.DbType = column.DataType.GetDbType(); parameter.Value = data[i]; if (names.Length > 0) { names.Append( "," ); values.Append( "," ); } names.AppendFormat( "{0}" , DbUtility.FormatByQuote(syntax, column.ColumnName)); values.AppendFormat( "{0}{1}" , syntax.ParameterPrefix, column.ColumnName); command.Parameters.Add(parameter); } return string .Format( "INSERT INTO {0}({1}) VALUES ({2})" , DbUtility.FormatByQuote(syntax, table.TableName), names, values); } } |
以上最重要的一步,就是將DataTable轉(zhuǎn)為數(shù)組的數(shù)組表示,即object[][],前數(shù)組的上標是列的個數(shù),后數(shù)組是行的個數(shù),因此循環(huán)Columns將后數(shù)組作為Parameter的值,也就是說,參數(shù)的值是一個數(shù)組。而insert語句與一般的插入語句沒有什么不一樣。
三、SQLite數(shù)據(jù)批量插入
SQLite的批量插入只需開啟事務(wù)就可以了,這個具體的原理不得而知。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
public sealed class SQLiteBatcher : IBatcherProvider { /// <summary> /// 獲取或設(shè)置提供者服務(wù)的上下文。 /// </summary> public ServiceContext ServiceContext { get ; set ; } /// <summary> /// 將 <see cref="DataTable"/> 的數(shù)據(jù)批量插入到數(shù)據(jù)庫中。 /// </summary> /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> /// <param name="batchSize">每批次寫入的數(shù)據(jù)量。</param> public void Insert(DataTable dataTable, int batchSize = 10000) { Checker.ArgumentNull(dataTable, "dataTable" ); if (dataTable.Rows.Count == 0) { return ; } using (var connection = ServiceContext.Database.CreateConnection()) { DbTransaction transcation = null ; try { connection.TryOpen(); transcation = connection.BeginTransaction(); using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) { if (command == null ) { throw new BatcherException( new ArgumentException( "command" )); } command.Connection = connection; command.CommandText = GenerateInserSql(ServiceContext.Database, dataTable); if (command.CommandText == string .Empty) { return ; } var flag = new AssertFlag(); dataTable.EachRow(row => { var first = flag.AssertTrue(); ProcessCommandParameters(dataTable, command, row, first); command.ExecuteNonQuery(); }); } transcation.Commit(); } catch (Exception exp) { if (transcation != null ) { transcation.Rollback(); } throw new BatcherException(exp); } finally { connection.TryClose(); } } } private void ProcessCommandParameters(DataTable dataTable, DbCommand command, DataRow row, bool first) { for (var c = 0; c < dataTable.Columns.Count; c++) { DbParameter parameter; //首次創(chuàng)建參數(shù),是為了使用緩存 if (first) { parameter = ServiceContext.Database.Provider.DbProviderFactory.CreateParameter(); parameter.ParameterName = dataTable.Columns[c].ColumnName; command.Parameters.Add(parameter); } else { parameter = command.Parameters[c]; } parameter.Value = row[c]; } } /// <summary> /// 生成插入數(shù)據(jù)的sql語句。 /// </summary> /// <param name="database"></param> /// <param name="table"></param> /// <returns></returns> private string GenerateInserSql(IDatabase database, DataTable table) { var syntax = database.Provider.GetService<ISyntaxProvider>(); var names = new StringBuilder(); var values = new StringBuilder(); var flag = new AssertFlag(); table.EachColumn(column => { if (!flag.AssertTrue()) { names.Append( "," ); values.Append( "," ); } names.Append(DbUtility.FormatByQuote(syntax, column.ColumnName)); values.AppendFormat( "{0}{1}" , syntax.ParameterPrefix, column.ColumnName); }); return string .Format( "INSERT INTO {0}({1}) VALUES ({2})" , DbUtility.FormatByQuote(syntax, table.TableName), names, values); } } |
四、MySql數(shù)據(jù)批量插入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
|
/// <summary> /// 為 MySql.Data 組件提供的用于批量操作的方法。 /// </summary> public sealed class MySqlBatcher : IBatcherProvider { /// <summary> /// 獲取或設(shè)置提供者服務(wù)的上下文。 /// </summary> public ServiceContext ServiceContext { get ; set ; } /// <summary> /// 將 <see cref="DataTable"/> 的數(shù)據(jù)批量插入到數(shù)據(jù)庫中。 /// </summary> /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> /// <param name="batchSize">每批次寫入的數(shù)據(jù)量。</param> public void Insert(DataTable dataTable, int batchSize = 10000) { Checker.ArgumentNull(dataTable, "dataTable" ); if (dataTable.Rows.Count == 0) { return ; } using (var connection = ServiceContext.Database.CreateConnection()) { try { connection.TryOpen(); using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) { if (command == null ) { throw new BatcherException( new ArgumentException( "command" )); } command.Connection = connection; command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable); if (command.CommandText == string .Empty) { return ; } command.ExecuteNonQuery(); } } catch (Exception exp) { throw new BatcherException(exp); } finally { connection.TryClose(); } } } /// <summary> /// 生成插入數(shù)據(jù)的sql語句。 /// </summary> /// <param name="database"></param> /// <param name="command"></param> /// <param name="table"></param> /// <returns></returns> private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table) { var names = new StringBuilder(); var values = new StringBuilder(); var types = new List<DbType>(); var count = table.Columns.Count; var syntax = database.Provider.GetService<ISyntaxProvider>(); table.EachColumn(c => { if (names.Length > 0) { names.Append( "," ); } names.AppendFormat( "{0}" , DbUtility.FormatByQuote(syntax, c.ColumnName)); types.Add(c.DataType.GetDbType()); }); var i = 0; foreach (DataRow row in table.Rows) { if (i > 0) { values.Append( "," ); } values.Append( "(" ); for (var j = 0; j < count; j++) { if (j > 0) { values.Append( ", " ); } var isStrType = IsStringType(types[j]); var parameter = CreateParameter(database.Provider, isStrType, types[j], row[j], syntax.ParameterPrefix, i, j); if (parameter != null ) { values.Append(parameter.ParameterName); command.Parameters.Add(parameter); } else if (isStrType) { values.AppendFormat( "'{0}'" , row[j]); } else { values.Append(row[j]); } } values.Append( ")" ); i++; } return string .Format( "INSERT INTO {0}({1}) VALUES {2}" , DbUtility.FormatByQuote(syntax, table.TableName), names, values); } /// <summary> /// 判斷是否為字符串類別。 /// </summary> /// <param name="dbType"></param> /// <returns></returns> private bool IsStringType(DbType dbType) { return dbType == DbType.AnsiString || dbType == DbType.AnsiStringFixedLength || dbType == DbType.String || dbType == DbType.StringFixedLength; } /// <summary> /// 創(chuàng)建參數(shù)。 /// </summary> /// <param name="provider"></param> /// <param name="isStrType"></param> /// <param name="dbType"></param> /// <param name="value"></param> /// <param name="parPrefix"></param> /// <param name="row"></param> /// <param name="col"></param> /// <returns></returns> private DbParameter CreateParameter(IProvider provider, bool isStrType, DbType dbType, object value, char parPrefix, int row, int col) { //如果生成全部的參數(shù),則速度會很慢,因此,只有數(shù)據(jù)類型為字符串(包含'號)和日期型時才添加參數(shù) if ((isStrType && value.ToString().IndexOf( '\'' ) != -1) || dbType == DbType.DateTime) { var name = string .Format( "{0}p_{1}_{2}" , parPrefix, row, col); var parameter = provider.DbProviderFactory.CreateParameter(); parameter.ParameterName = name; parameter.Direction = ParameterDirection.Input; parameter.DbType = dbType; parameter.Value = value; return parameter; } return null ; } } |
MySql的批量插入,是將值全部寫在語句的values里,例如,insert batcher(id, name) values(1, '1', 2, '2', 3, '3', ........ 10, '10')。
五、測試
接下來寫一個測試用例來看一下使用批量插入的效果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
public void TestBatchInsert() { Console.WriteLine(TimeWatcher.Watch(() => InvokeTest(database => { var table = new DataTable( "Batcher" ); table.Columns.Add( "Id" , typeof ( int )); table.Columns.Add( "Name1" , typeof ( string )); table.Columns.Add( "Name2" , typeof ( string )); table.Columns.Add( "Name3" , typeof ( string )); table.Columns.Add( "Name4" , typeof ( string )); //構(gòu)造100000條數(shù)據(jù) for (var i = 0; i < 100000; i++) { table.Rows.Add(i, i.ToString(), i.ToString(), i.ToString(), i.ToString()); } //獲取 IBatcherProvider var batcher = database.Provider.GetService<IBatcherProvider>(); if (batcher == null ) { Console.WriteLine( "不支持批量插入。" ); } else { batcher.Insert(table); } //輸出batcher表的數(shù)據(jù)量 var sql = new SqlCommand( "SELECT COUNT(1) FROM Batcher" ); Console.WriteLine( "當前共有 {0} 條數(shù)據(jù)" , database.ExecuteScalar(sql)); }))); } |
以下表中列出了四種數(shù)據(jù)庫生成10萬條數(shù)據(jù)各耗用的時間
數(shù)據(jù)庫 |
耗用時間 |
MsSql | 00:00:02.9376300 |
Oracle | 00:00:01.5155959 |
SQLite | 00:00:01.6275634 |
MySql | 00:00:05.4166891 |