在微服務架構中,我們將一個項目拆分成很多個獨立的模塊,這些獨立的模塊通過遠程調用來互相配合工作,但是,在高并發情況下,通信次數的增加會導致總的通信時間增加,同時,線程池的資源也是有限的,高并發環境會導致有大量的線程處于等待狀態,進而導致響應延遲,為了解決這些問題,我們需要來了解hystrix的請求合并。
hystrix中的請求合并,就是利用一個合并處理器,將對同一個服務發起的連續請求合并成一個請求進行處理(這些連續請求的時間窗默認為10ms),在這個過程中涉及到的一個核心類就是hystrixcollapser,ok,接下來我們就來看看如何實現hystrix的請求合并。
服務提供者接口
我需在在服務提供者中提供兩個接口供服務消費者調用,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
@requestmapping ( "/getbook6" ) public list<book> book6(string ids) { system.out.println( "ids>>>>>>>>>>>>>>>>>>>>>" + ids); arraylist<book> books = new arraylist<>(); books.add( new book( "《李自成》" , 55 , "姚雪垠" , "人民文學出版社" )); books.add( new book( "中國文學簡史" , 33 , "林庚" , "清華大學出版社" )); books.add( new book( "文學改良芻議" , 33 , "胡適" , "無" )); books.add( new book( "ids" , 22 , "helloworld" , "haha" )); return books; } @requestmapping ( "/getbook6/{id}" ) public book book61( @pathvariable integer id) { book book = new book( "《李自成》2" , 55 , "姚雪垠2" , "人民文學出版社2" ); return book; } |
第一個接口是一個批處理接口,第二個接口是一個處理單個請求的接口。在批處理接口中,服務消費者傳來的ids參數格式是1,2,3,4…這種格式,正常情況下我們需要根據ids查詢到對應的數據,然后組裝成一個集合返回,我這里為了處理方便,不管什么樣的請求統統都返回一樣的數據集;處理單個請求的接口就比較簡單了,不再贅述。
服務消費者
ok,服務提供者處理好之后,接下來我們來看看服務消費者要怎么處理。
bookservice
首先在bookservice中添加兩個方法用來調用服務提供者提供的接口,如下:
1
2
3
4
5
6
7
8
9
|
public book test8( long id) { return resttemplate.getforobject( "http://hello-service/getbook6/{1}" , book. class , id); } public list<book> test9(list< long > ids) { system.out.println( "test9---------" +ids+ "thread.currentthread().getname():" + thread.currentthread().getname()); book[] books = resttemplate.getforobject( "http://hello-service/getbook6?ids={1}" , book[]. class , stringutils.join(ids, "," )); return arrays.aslist(books); } |
test8用來調用提供單個id的接口,test9用來調用批處理的接口,在test9中,我將test9執行時所處的線程打印出來,方便我們觀察執行結果,另外,在resttemplate中,如果返回值是一個集合,我們得先用一個數組接收,然后再轉為集合(或許也有其他辦法,小伙伴們有更好的建議可以提)。
bookbatchcommand
ok,bookservice中的方法準備好了后,我們就可以來創建一個bookbatchcommand,這是一個批處理命令,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public class bookbatchcommand extends hystrixcommand<list<book>> { private list< long > ids; private bookservice bookservice; public bookbatchcommand(list< long > ids, bookservice bookservice) { super (setter.withgroupkey(hystrixcommandgroupkey.factory.askey( "collapsinggroup" )) .andcommandkey(hystrixcommandkey.factory.askey( "collapsingkey" ))); this .ids = ids; this .bookservice = bookservice; } @override protected list<book> run() throws exception { return bookservice.test9(ids); } } |
這個類實際上和我們在上篇博客中介紹的類差不多,都是繼承自hystrixcommand,用來處理合并之后的請求,在run方法中調用bookservice中的test9方法。
bookcollapsecommand
接下來我們需要創建bookcollapsecommand繼承自hystrixcollapser來實現請求合并。如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
public class bookcollapsecommand extends hystrixcollapser<list<book>, book, long > { private bookservice bookservice; private long id; public bookcollapsecommand(bookservice bookservice, long id) { super (setter.withcollapserkey(hystrixcollapserkey.factory.askey( "bookcollapsecommand" )).andcollapserpropertiesdefaults(hystrixcollapserproperties.setter().withtimerdelayinmilliseconds( 100 ))); this .bookservice = bookservice; this .id = id; } @override public long getrequestargument() { return id; } @override protected hystrixcommand<list<book>> createcommand(collection<collapsedrequest<book, long >> collapsedrequests) { list< long > ids = new arraylist<>(collapsedrequests.size()); ids.addall(collapsedrequests.stream().map(collapsedrequest::getargument).collect(collectors.tolist())); bookbatchcommand bookbatchcommand = new bookbatchcommand(ids, bookservice); return bookbatchcommand; } @override protected void mapresponsetorequests(list<book> batchresponse, collection<collapsedrequest<book, long >> collapsedrequests) { system.out.println( "mapresponsetorequests" ); int count = 0 ; for (collapsedrequest<book, long > collapsedrequest : collapsedrequests) { book book = batchresponse.get(count++); collapsedrequest.setresponse(book); } } } |
關于這個類,我說如下幾點:
1.首先在構造方法中,我們設置了請求時間窗為100ms,即請求時間間隔在100ms之內的請求會被合并為一個請求。
2.createcommand方法主要用來合并請求,在這里獲取到各個單個請求的id,將這些單個的id放到一個集合中,然后再創建出一個bookbatchcommand對象,用該對象去發起一個批量請求。
3.mapresponsetorequests方法主要用來為每個請求設置請求結果。該方法的第一個參數batchresponse表示批處理請求的結果,第二個參數collapsedrequests則代表了每一個被合并的請求,然后我們通過遍歷batchresponse來為collapsedrequests設置請求結果。
ok,所有的這些操作完成后,我們就可以來測試啦。
測試
我們在服務消費者端創建訪問接口,來測試合并請求,測試接口如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@requestmapping ( "/test7" ) @responsebody public void test7() throws executionexception, interruptedexception { hystrixrequestcontext context = hystrixrequestcontext.initializecontext(); bookcollapsecommand bc1 = new bookcollapsecommand(bookservice, 1l); bookcollapsecommand bc2 = new bookcollapsecommand(bookservice, 2l); bookcollapsecommand bc3 = new bookcollapsecommand(bookservice, 3l); bookcollapsecommand bc4 = new bookcollapsecommand(bookservice, 4l); future<book> q1 = bc1.queue(); future<book> q2 = bc2.queue(); future<book> q3 = bc3.queue(); book book1 = q1.get(); book book2 = q2.get(); book book3 = q3.get(); thread.sleep( 3000 ); future<book> q4 = bc4.queue(); book book4 = q4.get(); system.out.println( "book1>>>" +book1); system.out.println( "book2>>>" +book2); system.out.println( "book3>>>" +book3); system.out.println( "book4>>>" +book4); context.close(); } |
關于這個測試接口我說如下兩點:
1.首先要初始化hystrixrequestcontext
2.創建bookcollapsecommand類的實例來發起請求,先發送3個請求,然后睡眠3秒鐘,再發起1個請求,這樣,前3個請求就會被合并為一個請求,第四個請求因為間隔的時間比較久,所以不會被合并,而是單獨創建一個線程去處理。
ok,我們來看看執行結果,如下:
通過注解實現請求合并
ok,上面這種請求合并方式寫起來稍微有一點麻煩,我們可以使用注解來更優雅的實現這一功能。首先在bookservice中添加兩個方法,如下:
1
2
3
4
5
6
7
8
9
10
11
|
@hystrixcollapser (batchmethod = "test11" ,collapserproperties = { @hystrixproperty (name = "timerdelayinmilliseconds" ,value = "100" )}) public future<book> test10( long id) { return null ; } @hystrixcommand public list<book> test11(list< long > ids) { system.out.println( "test9---------" +ids+ "thread.currentthread().getname():" + thread.currentthread().getname()); book[] books = resttemplate.getforobject( "http://hello-service/getbook6?ids={1}" , book[]. class , stringutils.join(ids, "," )); return arrays.aslist(books); } |
在test10方法上添加@hystrixcollapser注解實現請求合并,用batchmethod屬性指明請求合并后的處理方法,collapserproperties屬性指定其他屬性。
ok,在bookservice中寫好之后,直接調用就可以了,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
@requestmapping ( "/test8" ) @responsebody public void test8() throws executionexception, interruptedexception { hystrixrequestcontext context = hystrixrequestcontext.initializecontext(); future<book> f1 = bookservice.test10(1l); future<book> f2 = bookservice.test10(2l); future<book> f3 = bookservice.test10(3l); book b1 = f1.get(); book b2 = f2.get(); book b3 = f3.get(); thread.sleep( 3000 ); future<book> f4 = bookservice.test10(4l); book b4 = f4.get(); system.out.println( "b1>>>" +b1); system.out.println( "b2>>>" +b2); system.out.println( "b3>>>" +b3); system.out.println( "b4>>>" +b4); context.close(); } |
和前面的一樣,前三個請求會進行合并,第四個請求會單獨執行,ok,執行結果如下:
總結
請求合并的優點小伙伴們已經看到了,多個請求被合并為一個請求進行一次性處理,可以有效節省網絡帶寬和線程池資源,但是,有優點必然也有缺點,設置請求合并之后,本來一個請求可能5ms就搞定了,但是現在必須再等10ms看看還有沒有其他的請求一起的,這樣一個請求的耗時就從5ms增加到15ms了,不過,如果我們要發起的命令本身就是一個高延遲的命令,那么這個時候就可以使用請求合并了,因為這個時候時間窗的時間消耗就顯得微不足道了,另外高并發也是請求合并的一個非常重要的場景。
ok,我們的請求合并就說到這里,有問題歡迎小伙伴們留言討論。希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://blog.csdn.net/u012702547/article/details/78213270