之前有聊過 golang 的協程,我發覺似乎還很理論,特別是在并發安全上,所以特結合網上的一些例子,來試驗下go routine中 的 channel, select, context 的妙用。
場景-微服務調用
我們用 gin(一個web框架) 作為處理請求的工具,需求是這樣的:
一個請求 X 會去并行調用 A, B, C 三個方法,并把三個方法返回的結果加起來作為 X 請求的 Response。
但是我們這個 Response 是有時間要求的(不能超過3秒的響應時間),可能 A, B, C 中任意一個或兩個,處理邏輯十分復雜,或者數據量超大,導致處理時間超出預期,那么我們就馬上切斷,并返回已經拿到的任意個返回結果之和。
我們先來定義主函數:
1
2
3
4
5
|
func main() { r := gin.New() r.GET("/calculate", calHandler) http.ListenAndServe(":8008", r) } |
非常簡單,普通的請求接受和 handler 定義。其中 calHandler 是我們用來處理請求的函數。
分別定義三個假的微服務,其中第三個將會是我們超時的哪位~
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func microService1() int { time.Sleep(1*time.Second) return 1 } func microService2() int { time.Sleep(2*time.Second) return 2 } func microService3() int { time.Sleep(10*time.Second) return 3 } |
接下來,我們看看 calHandler 里到底是什么
1
2
3
|
func calHandler(c *gin.Context) { ... } |
要點1--并發調用
直接用 go 就好了嘛~
所以一開始我們可能就這么寫:
1
2
3
|
go microService1() go microService2() go microService3() |
很簡單有沒有,但是等等,說好的返回值我怎么接呢?
為了能夠并行地接受處理結果,我們很容易想到用 channel 去接。
所以我們把調用服務改成這樣:
1
2
3
4
5
6
7
8
9
10
11
12
|
var resChan = make(chan int, 3) // 因為有3個結果,所以我們創建一個可以容納3個值的 int channel。 go func() { resChan <- microService1() }() go func() { resChan <- microService2() }() go func() { resChan <- microService3() }() |
有東西接,那也要有方法去算,所以我們加一個一直循環拿 resChan 中結果并計算的方法:
1
2
3
4
5
|
var resContainer, sum int for { resContainer = <-resChan sum += resContainer } |
這樣一來我們就有一個 sum 來計算每次從 resChan 中拿出的結果了。
要點2--超時信號
還沒結束,說好的超時處理呢?
為了實現超時處理,我們需要引入一個東西,就是 context,什么是 context ?
我們這里只使用 context 的一個特性,超時通知(其實這個特性完全可以用 channel 來替代)。
可以看在定義 calHandler 的時候我們已經將 c *gin.Context 作為參數傳了進來,那我們就不用自己在聲明了。
gin.Context 簡單理解為貫穿整個 gin 聲明周期的上下文容器,有點像是分身,亦或是量子糾纏的感覺。
有了這個 gin.Context, 我們就能在一個地方對 context 做出操作,而其他正在使用 context 的函數或方法,也會感受到 context 做出的變化。
1
|
ctx, _ := context.WithTimeout(c, 3*time.Second) //定義一個超時的 context |
只要時間到了,我們就能用 ctx.Done() 獲取到一個超時的 channel(通知),然后其他用到這個 ctx 的地方也會停掉,并釋放 ctx。
一般來說,ctx.Done() 是結合 select 使用的。
所以我們又需要一個循環來監聽 ctx.Done()
1
2
3
4
5
|
for { select { case <- ctx.Done(): // 返回結果 } |
現在我們有兩個 for 了,是不是能夠合并下?
1
2
3
4
5
6
7
8
9
10
|
for { select { case resContainer = <-resChan: sum += resContainer fmt.Println("add", resContainer) case <- ctx.Done(): fmt.Println("result:", sum) return } } |
誒嘿,看上去不錯。
不過我們怎么在正常完成微服務調用的時候輸出結果呢?
看來我們還需要一個 flag
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
var count int for { select { case resContainer = <-resChan: sum += resContainer count ++ fmt.Println("add", resContainer) if count > 2 { fmt.Println("result:", sum) return } case <- ctx.Done(): fmt.Println("timeout result:", sum) return } } |
我們加入一個計數器,因為我們只是調用3次微服務,所以當 count 大于2的時候,我們就應該結束并輸出結果了。
要點3--并發中的等待
上面的計時器是一種偷懶的方法,因為我們知道了調用微服務的次數,如果我們并不知道,或者之后還要添加呢?
手動每次改 count 的判斷閾值會不會太沙雕了?這時候我們就要加入 sync 包了。
我們將會使用的 sync 的一個特性是 WaitGroup。它的作用是等待一組協程運行完畢后,執行接下去的步驟。
我們來改下之前微服務調用的代碼塊:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
var success = make(chan int, 1) // 成功的通道標識 wg := sync.WaitGroup{} // 創建一個 waitGroup 組 wg.Add(3) // 我們往組里加3個標識,因為我們要運行3個任務 go func() { resChan <- microService1() wg.Done() // 完成一個,Done()一個 }() go func() { resChan <- microService2() wg.Done() }() go func() { resChan <- microService3() wg.Done() }() wg.Wait() // 直到我們前面三個標識都被 Done 了,否則程序一直會阻塞在這里 success <- 1 // 我們發送一個成功信號到通道中 |
既然我們有了 success 這個信號,那么再把它加入到監控 for 循環中,并做些修改,刪除原來 count 判斷的部分。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
go func() { for { select { case resContainer = <-resChan: sum += resContainer fmt.Println("add", resContainer) case <- success: fmt.Println("result:", sum) return case <- ctx.Done(): fmt.Println("result:", sum) return } } }() |
三個 case,分工明確,一個用來拿服務輸出的結果并計算,一個用來做最終的完成輸出,一個是超時輸出。
同時我們將這個循環監聽,也作為協程運行。
至此,所有的主要代碼都完成了。下面是完全版
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
package main import ( "context" "fmt" "net/http" "sync" "time" "github.com/gin-gonic/gin" ) // 一個請求會觸發調用三個服務,每個服務輸出一個 int, // 請求要求結果為三個服務輸出 int 之和 // 請求返回時間不超過3秒,大于3秒只輸出已經獲得的 int 之和 func calHandler(c *gin.Context) { var resContainer, sum int var success, resChan = make(chan int), make(chan int, 3) ctx, _ := context.WithTimeout(c, 3*time.Second) go func() { for { select { case resContainer = <-resChan: sum += resContainer fmt.Println("add", resContainer) case <- success: fmt.Println("result:", sum) return case <- ctx.Done(): fmt.Println("result:", sum) return } } }() wg := sync.WaitGroup{} wg.Add(3) go func() { resChan <- microService1() wg.Done() }() go func() { resChan <- microService2() wg.Done() }() go func() { resChan <- microService3() wg.Done() }() wg.Wait() success <- 1 return } func main() { r := gin.New() r.GET("/calculate", calHandler) http.ListenAndServe(":8008", r) } func microService1() int { time.Sleep(1*time.Second) return 1 } func microService2() int { time.Sleep(2*time.Second) return 2 } func microService3() int { time.Sleep(10*time.Second) return 3 } |
上面的程序只是簡單描述了一個調用其他微服務超時的處理場景。
實際過程中還需要加很多很多調料,才能保證接口的對外完整性。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://segmentfault.com/a/1190000017872359