場(chǎng)景
發(fā)布微服務(wù)的操作一般都是打完新代碼的包,kill掉在跑的應(yīng)用,替換新的包,啟動(dòng)。
spring cloud 中使用eureka為注冊(cè)中心,它是允許服務(wù)列表數(shù)據(jù)的延遲性的,就是說(shuō)即使應(yīng)用已經(jīng)不在服務(wù)列表了,客戶(hù)端在一段時(shí)間內(nèi)依然會(huì)請(qǐng)求這個(gè)地址。那么就會(huì)出現(xiàn)請(qǐng)求正在發(fā)布的地址,而導(dǎo)致失敗。
我們會(huì)優(yōu)化服務(wù)列表的刷新時(shí)間,以提高服務(wù)列表信息的時(shí)效性。但是無(wú)論怎樣,都無(wú)法避免有那么一段時(shí)間是數(shù)據(jù)不一致的。
所以我們想到一個(gè)辦法就是重試機(jī)制,當(dāng)a機(jī)子在重啟時(shí),同個(gè)集群的b是可以正常提供服務(wù)的,如果有重試機(jī)制就可以在上面這個(gè)場(chǎng)景里進(jìn)行重試到b而不影響正確響應(yīng)。
操作
需要進(jìn)行如下的操作:
1
2
3
4
5
6
|
ribbon: ReadTimeout: 10000 ConnectTimeout: 10000 MaxAutoRetries: 0 MaxAutoRetriesNextServer: 1 OkToRetryOnAllOperations: false |
引入spring-retry包
1
2
3
4
|
< dependency > < groupId >org.springframework.retry</ groupId > < artifactId >spring-retry</ artifactId > </ dependency > |
以zuul為例子還需要配置開(kāi)啟重試:
1
|
zuul.retryable=true |
遇到了問(wèn)題
然而萬(wàn)事總沒(méi)那么一帆風(fēng)順,通過(guò)測(cè)試重試機(jī)制生效了,但是并沒(méi)有我想象的去請(qǐng)求另一臺(tái)健康的機(jī)子,于是被迫去吧開(kāi)源碼看一看,最終發(fā)現(xiàn)是源碼的bug,不過(guò)已經(jīng)修復(fù),升級(jí)版本即可。
代碼分析
使用的版本是
spring-cloud-netflix-core:1.3.6.RELEASE
spring-retry:1.2.1.RELEASE
spring cloud 依賴(lài)版本:
1
2
3
4
5
6
7
8
9
10
11
|
< dependencyManagement > < dependencies > < dependency > < groupId >org.springframework.cloud</ groupId > < artifactId >spring-cloud-dependencies</ artifactId > < version >${spring-cloud.version}</ version > < type >pom</ type > < scope >import</ scope > </ dependency > </ dependencies > </ dependencyManagement > |
因?yàn)閱⒂昧酥卦嚕哉?qǐng)求應(yīng)用時(shí)會(huì)執(zhí)行RetryableRibbonLoadBalancingHttpClient.execute方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
public RibbonApacheHttpResponse execute( final RibbonApacheHttpRequest request, final IClientConfig configOverride) throws Exception { final RequestConfig.Builder builder = RequestConfig.custom(); IClientConfig config = configOverride != null ? configOverride : this .config; builder.setConnectTimeout(config.get( CommonClientConfigKey.ConnectTimeout, this .connectTimeout)); builder.setSocketTimeout(config.get( CommonClientConfigKey.ReadTimeout, this .readTimeout)); builder.setRedirectsEnabled(config.get( CommonClientConfigKey.FollowRedirects, this .followRedirects)); final RequestConfig requestConfig = builder.build(); final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryPolicyFactory.create( this .getClientName(), this ); RetryCallback retryCallback = new RetryCallback() { @Override public RibbonApacheHttpResponse doWithRetry(RetryContext context) throws Exception { //on retries the policy will choose the server and set it in the context //extract the server and update the request being made RibbonApacheHttpRequest newRequest = request; if (context instanceof LoadBalancedRetryContext) { ServiceInstance service = ((LoadBalancedRetryContext)context).getServiceInstance(); if (service != null ) { //Reconstruct the request URI using the host and port set in the retry context newRequest = newRequest.withNewUri( new URI(service.getUri().getScheme(), newRequest.getURI().getUserInfo(), service.getHost(), service.getPort(), newRequest.getURI().getPath(), newRequest.getURI().getQuery(), newRequest.getURI().getFragment())); } } newRequest = getSecureRequest(request, configOverride); HttpUriRequest httpUriRequest = newRequest.toRequest(requestConfig); final HttpResponse httpResponse = RetryableRibbonLoadBalancingHttpClient. this .delegate.execute(httpUriRequest); if (retryPolicy.retryableStatusCode(httpResponse.getStatusLine().getStatusCode())) { if (CloseableHttpResponse. class .isInstance(httpResponse)) { ((CloseableHttpResponse)httpResponse).close(); } throw new RetryableStatusCodeException(RetryableRibbonLoadBalancingHttpClient. this .clientName, httpResponse.getStatusLine().getStatusCode()); } return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI()); } }; return this .executeWithRetry(request, retryPolicy, retryCallback); } |
我們發(fā)現(xiàn)先new 一個(gè)RetryCallback,然后執(zhí)行this.executeWithRetry(request, retryPolicy, retryCallback);
而這個(gè)RetryCallback.doWithRetry的代碼我們清楚看到是實(shí)際請(qǐng)求的代碼,也就是說(shuō)this.executeWithRetry方法最終還是會(huì)調(diào)用RetryCallback.doWithRetry
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException { RetryPolicy retryPolicy = this .retryPolicy; BackOffPolicy backOffPolicy = this .backOffPolicy; // Allow the retry policy to initialise itself... RetryContext context = open(retryPolicy, state); if ( this .logger.isTraceEnabled()) { this .logger.trace( "RetryContext retrieved: " + context); } // Make sure the context is available globally for clients who need // it... RetrySynchronizationManager.register(context); Throwable lastException = null ; boolean exhausted = false ; try { // Give clients a chance to enhance the context... boolean running = doOpenInterceptors(retryCallback, context); if (!running) { throw new TerminatedRetryException( "Retry terminated abnormally by interceptor before first attempt" ); } // Get or Start the backoff context... BackOffContext backOffContext = null ; Object resource = context.getAttribute( "backOffContext" ); if (resource instanceof BackOffContext) { backOffContext = (BackOffContext) resource; } if (backOffContext == null ) { backOffContext = backOffPolicy.start(context); if (backOffContext != null ) { context.setAttribute( "backOffContext" , backOffContext); } } /* * We allow the whole loop to be skipped if the policy or context already * forbid the first try. This is used in the case of external retry to allow a * recovery in handleRetryExhausted without the callback processing (which * would throw an exception). */ while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { try { if (this.logger.isDebugEnabled()) { this.logger.debug("Retry: count=" + context.getRetryCount()); } // Reset the last exception, so if we are successful // the close interceptors will not think we failed... lastException = null; return retryCallback.doWithRetry(context); } catch (Throwable e) { lastException = e; try { registerThrowable(retryPolicy, state, context, e); } catch (Exception ex) { throw new TerminatedRetryException("Could not register throwable", ex); } finally { doOnErrorInterceptors(retryCallback, context, e); } if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { try { backOffPolicy.backOff(backOffContext); } catch (BackOffInterruptedException ex) { lastException = e; // back off was prevented by another thread - fail the retry if (this.logger.isDebugEnabled()) { this.logger .debug("Abort retry because interrupted: count=" + context.getRetryCount()); } throw ex; } } if (this.logger.isDebugEnabled()) { this.logger.debug( "Checking for rethrow: count=" + context.getRetryCount()); } if (shouldRethrow(retryPolicy, context, state)) { if (this.logger.isDebugEnabled()) { this.logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount()); } throw RetryTemplate.<E>wrapIfNecessary(e); } } /* * A stateful attempt that can retry may rethrow the exception before now, * but if we get this far in a stateful retry there's a reason for it, * like a circuit breaker or a rollback classifier. */ if (state != null && context.hasAttribute(GLOBAL_STATE)) { break ; } } if (state == null && this .logger.isDebugEnabled()) { this .logger.debug( "Retry failed last attempt: count=" + context.getRetryCount()); } exhausted = true ; return handleRetryExhausted(recoveryCallback, context, state); } catch (Throwable e) { throw RetryTemplate.<E>wrapIfNecessary(e); } finally { close(retryPolicy, context, state, lastException == null || exhausted); doCloseInterceptors(retryCallback, context, lastException); RetrySynchronizationManager.clear(); } } |
在一個(gè)while循環(huán)里實(shí)現(xiàn)重試機(jī)制,當(dāng)執(zhí)行retryCallback.doWithRetry(context)出現(xiàn)異常的時(shí)候,就會(huì)catch異常,然后用 retryPolicy判斷是否進(jìn)行重試,特別注意registerThrowable(retryPolicy, state, context, e);方法,不但判斷了是否重試,在重試情況下會(huì)新選出一個(gè)機(jī)子放入context,然后再去執(zhí)行retryCallback.doWithRetry(context)時(shí)帶入,如此就實(shí)現(xiàn)了換機(jī)子重試了。
但是我的配置怎么會(huì)沒(méi)有換機(jī)子呢?調(diào)試代碼發(fā)現(xiàn)registerThrowable(retryPolicy, state, context, e);選出來(lái)的機(jī)子沒(méi)問(wèn)題,就是新的健康的機(jī)子,但是在執(zhí)行retryCallback.doWithRetry(context)代碼的時(shí)候依然請(qǐng)求的是那臺(tái)掛掉的機(jī)子。
所以我們?cè)僮屑?xì)看一下retryCallback.doWithRetry(context)的代碼:
我們發(fā)現(xiàn)了這行代碼:
1
2
3
4
5
6
7
8
9
|
newRequest = getSecureRequest(request, configOverride); protected RibbonApacheHttpRequest getSecureRequest(RibbonApacheHttpRequest request, IClientConfig configOverride) { if (isSecure(configOverride)) { final URI secureUri = UriComponentsBuilder.fromUri(request.getUri()) .scheme( "https" ).build( true ).toUri(); return request.withNewUri(secureUri); } return request; } |
newRequest在前面已經(jīng)使用context構(gòu)建完畢,request是上一次請(qǐng)求的數(shù)據(jù),只要執(zhí)行這個(gè)代碼就會(huì)發(fā)現(xiàn)newRequest永遠(yuǎn)都會(huì)被request覆蓋??吹竭@里我們才發(fā)現(xiàn)原來(lái)是一個(gè)源碼bug。
issue地址:https://github.com/spring-cloud/spring-cloud-netflix/issues/2667
總結(jié)
這是一次很普通的查問(wèn)題過(guò)程,在這個(gè)過(guò)程中當(dāng)我發(fā)現(xiàn)配置沒(méi)有達(dá)到我的預(yù)期時(shí),我先查看了配置的含義,嘗試多次無(wú)果,于是進(jìn)行斷點(diǎn)調(diào)試發(fā)現(xiàn)異常中斷點(diǎn)后,因?yàn)閳?chǎng)景需要一臺(tái)機(jī)子健康一臺(tái)機(jī)子下線(xiàn),我模擬了數(shù)百次,最終才定位到了這行代碼。開(kāi)源項(xiàng)目即使是優(yōu)秀的項(xiàng)目必然也會(huì)有bug存在,不迷信,不盲目。另一方面,閱讀源碼能力也是一個(gè)解決問(wèn)題的重要能力,像我在找源碼入口,定位代碼時(shí)耗費(fèi)了很多的時(shí)間。
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。
原文鏈接:http://www.cnblogs.com/killbug/p/9150067.html