本篇,我將來講解一下在sqlserver中批量插入數據。
先創建一個用來測試的數據庫和表,為了讓插入數據更快,表中主鍵采用的是guid,表中沒有創建任何索引。guid必然是比自增長要快的,因為你生成一個guid算法所花的時間肯定比你從數據表中重新查詢上一條記錄的id的值然后再進行加1運算要少。而如果存在索引的情況下,每次插入記錄都會進行索引重建,這是非常耗性能的。如果表中無可避免的存在索引,我們可以通過先刪除索引,然后批量插入,最后再重建索引的方式來提高效率。
1
2
3
4
5
6
7
8
9
|
create database carsys; go use carsys; go create table product( id uniqueidentifier primary key , name varchar (50) not null , price decimal (18,2) not null ) |
我們通過sql腳本來插入數據,常見如下四種方式。
方式一:一條一條插入,性能最差,不建議使用。
1
2
3
|
insert into product(id, name ,price) values (newid(), '牛欄1段' ,160); insert into product(id, name ,price) values (newid(), '牛欄2段' ,260); ...... |
方式二:insert bulk
語法如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
bulk insert [ [ 'database_name' .][ 'owner' ].]{ 'table_name' from 'data_file' } with ( [ batchsize [ = batch_size ] ], [ check_constraints ], [ codepage [ = 'acp' | 'oem' | 'raw' | 'code_page' ] ], [ datafiletype [ = 'char' | 'native' | 'widechar' | 'widenative' ] ], [ fieldterminator [ = 'field_terminator' ] ], [ firstrow [ = first_row ] ], [ fire_triggers ], [ formatfile = 'format_file_path' ], [ keepidentity ], [ keepnulls ], [ kilobytes_per_batch [ = kilobytes_per_batch ] ], [ lastrow [ = last_row ] ], [ maxerrors [ = max_errors ] ], [ order ( { column [ asc | desc ] } [ ,...n ] ) ], [ rows_per_batch [ = rows_per_batch ] ], [ rowterminator [ = 'row_terminator' ] ], [ tablock ], ) |
相關參數說明:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
bulk insert [ database_name . [ schema_name ] . | schema_name . ] [ table_name | view_name ] from 'data_file' [ with ( [ [ , ] batchsize = batch_size ] --batchsize指令來設置在單個事務中可以插入到表中的記錄的數量 [ [ , ] check_constraints ] --指定在大容量導入操作期間,必須檢查所有對目標表或視圖的約束。若沒有 check_constraints 選項,則所有 check 和 foreign key 約束都將被忽略,并且在此操作之后表的約束將標記為不可信。 [ [ , ] codepage = { 'acp' | 'oem' | 'raw' | 'code_page' } ] --指定該數據文件中數據的代碼頁 [ [ , ] datafiletype = { 'char' | 'native' | 'widechar' | 'widenative' } ] --指定 bulk insert 使用指定的數據文件類型值執行導入操作。 [ [ , ] fieldterminator = 'field_terminator' ] --標識分隔內容的符號 [ [ , ] firstrow = first_row ] --指定要加載的第一行的行號。默認值是指定數據文件中的第一行 [ [ , ] fire_triggers ] --是否啟動觸發器 [ [ , ] formatfile = 'format_file_path' ] [ [ , ] keepidentity ] --指定導入數據文件中的標識值用于標識列 [ [ , ] keepnulls ] --指定在大容量導入操作期間空列應保留一個空值,而不插入用于列的任何默認值 [ [ , ] kilobytes_per_batch = kilobytes_per_batch ] [ [ , ] lastrow = last_row ] --指定要加載的最后一行的行號 [ [ , ] maxerrors = max_errors ] --指定允許在數據中出現的最多語法錯誤數,超過該數量后將取消大容量導入操作。 [ [ , ] order ( { column [ asc | desc ] } [ ,...n ] ) ] --指定數據文件中的數據如何排序 [ [ , ] rows_per_batch = rows_per_batch ] [ [ , ] rowterminator = 'row_terminator' ] --標識分隔行的符號 [ [ , ] tablock ] --指定為大容量導入操作持續時間獲取一個表級鎖 [ [ , ] errorfile = 'file_name' ] --指定用于收集格式有誤且不能轉換為 ole db 行集的行的文件。 )] |
方式三:insert into xx select...
1
2
3
4
5
6
|
insert into product(id, name ,price) select newid(), '牛欄1段' ,160 union all select newid(), '牛欄2段' ,180 union all ...... |
方式四:拼接sql
1
2
3
4
|
insert into product(id, name ,price) values (newid(), '牛欄1段' ,160) ,(newid(), '牛欄2段' ,260) ...... |
在c#中通過ado.net來實現批量操作存在四種與之對應的方式。
方式一:逐條插入
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
#region 方式一 static void insertone() { console.writeline( "采用一條一條插入的方式實現" ); stopwatch sw = new stopwatch(); using (sqlconnection conn = new sqlconnection(strconnmsg)) //using中會自動open和close 連接。 { string sql = "insert into product(id,name,price) values(newid(),@p,@d)" ; conn.open(); for ( int i = 0; i < totalrow; i++) { using (sqlcommand cmd = new sqlcommand(sql, conn)) { cmd.parameters.addwithvalue( "@p" , "商品" + i); cmd.parameters.addwithvalue( "@d" , i); sw.start(); cmd.executenonquery(); console.writeline( string .format( "插入一條記錄,已耗時{0}毫秒" , sw.elapsedmilliseconds)); } if (i == getrow) { sw.stop(); break ; } } } console.writeline( string .format( "插入{0}條記錄,每{4}條的插入時間是{1}毫秒,預估總得插入時間是{2}毫秒,{3}分鐘" , totalrow, sw.elapsedmilliseconds, ((sw.elapsedmilliseconds / getrow) * totalrow), getminute((sw.elapsedmilliseconds / getrow * totalrow)), getrow)); } static int getminute( long l) { return (int32)l / 60000; } #endregion |
運行結果如下:
我們會發現插入100w條記錄,預計需要50分鐘時間,每插入一條記錄大概需要3毫秒左右。
方式二:使用sqlbulk
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
#region 方式二 static void inserttwo() { console.writeline( "使用bulk插入的實現方式" ); stopwatch sw = new stopwatch(); datatable dt = gettableschema(); using (sqlconnection conn = new sqlconnection(strconnmsg)) { sqlbulkcopy bulkcopy = new sqlbulkcopy(conn); bulkcopy.destinationtablename = "product" ; bulkcopy.batchsize = dt.rows.count; conn.open(); sw.start(); for ( int i = 0; i < totalrow;i++ ) { datarow dr = dt.newrow(); dr[0] = guid.newguid(); dr[1] = string .format( "商品" , i); dr[2] = ( decimal )i; dt.rows.add(dr); } if (dt != null && dt.rows.count != 0) { bulkcopy.writetoserver(dt); sw.stop(); } console.writeline( string .format( "插入{0}條記錄共花費{1}毫秒,{2}分鐘" , totalrow, sw.elapsedmilliseconds, getminute(sw.elapsedmilliseconds))); } } static datatable gettableschema() { datatable dt = new datatable(); dt.columns.addrange( new datacolumn[] { new datacolumn( "id" , typeof (guid)), new datacolumn( "name" , typeof ( string )), new datacolumn( "price" , typeof ( decimal ))}); return dt; } #endregion |
運行結果如下:
插入100w條記錄才8s多,是不是很溜。
打開sqlserver profiler跟蹤,會發現執行的是如下語句:
insert bulk product ([id] uniqueidentifier, [name] varchar(50) collate chinese_prc_ci_as, [price] decimal(18,2))
方式三:使用tvps(表值參數)插入數據
從sqlserver 2008起開始支持tvps。創建緩存表producttemp ,執行如下sql。
1
2
3
4
5
|
create type producttemp as table ( id uniqueidentifier primary key , name varchar (50) not null , price decimal (18,2) not null ) |
執行完成之后,會發現在數據庫carsys下面多了一張緩存表producttemp
可見插入100w條記錄共花費了11秒多。
方式四:拼接sql
此種方法在c#中有限制,一次性只能批量插入1000條,所以就得分段進行插入。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
#region 方式四 static void insertfour() { console.writeline( "采用拼接批量sql插入的方式實現" ); stopwatch sw = new stopwatch(); using (sqlconnection conn = new sqlconnection(strconnmsg)) //using中會自動open和close 連接。 { conn.open(); sw.start(); for ( int j = 0; j < totalrow / getrow;j++ ) { stringbuilder sb = new stringbuilder(); sb.append( "insert into product(id,name,price) values" ); using (sqlcommand cmd = new sqlcommand()) { for ( int i = 0; i < getrow; i++) { sb.appendformat( "(newid(),'商品{0}',{0})," , j*i+i); } cmd.connection = conn; cmd.commandtext = sb.tostring().trimend( ',' ); cmd.executenonquery(); } } sw.stop(); console.writeline( string .format( "插入{0}條記錄,共耗時{1}毫秒" ,totalrow,sw.elapsedmilliseconds)); } } #endregion |
運行結果如下:
我們可以看到大概花費了10分鐘。雖然在方式一的基礎上,性能有了較大的提升,但是顯然還是不夠快。
總結:大數據批量插入方式一和方式四盡量避免使用,而方式二和方式三都是非常高效的批量插入數據方式。其都是通過構建datatable的方式插入的,而我們知道datatable是存在內存中的,所以當數據量特別特別大,大到內存中無法一次性存儲的時候,可以分段插入。比如需要插入9千萬條數據,可以分成9段進行插入,一次插入1千萬條。而在for循環中直接進行數據庫操作,我們是應該盡量避免的。每一次數據庫的連接、打開和關閉都是比較耗時的,雖然在c#中存在數據庫連接池,也就是當我們使用using或者conn.close(),進行釋放連接時,其實并沒有真正關閉數據庫連接,它只是讓連接以類似于休眠的方式存在,當再次操作的時候,會從連接池中找一個休眠狀態的連接,喚醒它,這樣可以有效的提高并發能力,減少連接損耗。而連接池中的連接數,我們都是可以配置的。
源碼下載
以上就是本文的全部內容,希望本文的內容對大家的學習或者工作能帶來一定的幫助,同時也希望多多支持服務器之家!
原文鏈接:http://www.cnblogs.com/jiekzou/p/6145550.html