一 動(dòng)態(tài)配置
- 1. 環(huán)境準(zhǔn)備
- 2.新建配置
- 3.導(dǎo)入配置
- 4.配置客戶端
- 5. 修改配置信息
- 6.小結(jié)
二 配置中心原理(推還是拉)
- 1.實(shí)例化 ConfigService
- 2.添加 Listener
- 3.CacheData
- 4.觸發(fā)回調(diào)
- 5.Md5何時(shí)變更
- 6.拉的優(yōu)勢(shì)
三 總結(jié)
Hello,大家好,我是麥洛,今天我們一起從Nacos客戶端視角來(lái)看看配置中心實(shí)現(xiàn)原理;整理這篇文章時(shí)候,也參照學(xué)習(xí)了部分大佬的博客,這里致謝;
在開始閱讀文章之前,有些思路我按我的理解先闡述一些,方便大家更快理清思路,不對(duì)的地方還請(qǐng)大家批評(píng)指正;
- Nacos客戶端會(huì)在在本地緩存服務(wù)端配置文件,防止服務(wù)器奔潰情況下,導(dǎo)致服務(wù)不可用;
- 本地緩存類在代碼中的體現(xiàn)就是我們下面提到的CacheData,我們知道對(duì)應(yīng)服務(wù)端一個(gè)配置,肯定可以同時(shí)被多個(gè)客戶端所使用,當(dāng)這個(gè)配置發(fā)生變更,如何去通知到每一個(gè)客戶端?
-
客戶端啟動(dòng)之后,回去注冊(cè)監(jiān)視器,監(jiān)視器最終會(huì)被保存到CacheData類中CopyOnWriteArrayList
listeners字段,那么,反過(guò)來(lái),當(dāng)執(zhí)行監(jiān)視器回調(diào)方法時(shí),就可以找到所有客戶端 - 長(zhǎng)輪詢左右主要就是刷新配置,保持服務(wù)端配置和本地緩存配置保持一致;
首先,我們來(lái)看看Nacos官網(wǎng)給出的Nacos地圖,我們可以清楚的看到,動(dòng)態(tài)配置服務(wù)是 Nacos 的三大功能之一;

這里借用官網(wǎng)的描述,一起來(lái)看看Nacos 為我們帶來(lái)什么黑科技?
動(dòng)態(tài)配置服務(wù)可以讓您以中心化、外部化和動(dòng)態(tài)化的方式管理所有環(huán)境的應(yīng)用配置和服務(wù)配置。動(dòng)態(tài)配置消除了配置變更時(shí)重新部署應(yīng)用和服務(wù)的需要,讓配置管理變得更加高效和敏捷。配置中心化管理讓實(shí)現(xiàn)無(wú)狀態(tài)服務(wù)變得更簡(jiǎn)單,讓服務(wù)按需彈性擴(kuò)展變得更容易。
所以,有了Nacos ,可能我們以前上線打包弄錯(cuò)配置文件,改配置需要重啟服務(wù)等一系列問(wèn)題,都會(huì)顯著改觀
一 動(dòng)態(tài)配置
下面我將來(lái)和大家一起來(lái)了解下 Nacos 的動(dòng)態(tài)配置的能力,看看 Nacos 是如何以簡(jiǎn)單、優(yōu)雅、高效的方式管理配置,實(shí)現(xiàn)配置的動(dòng)態(tài)變更的。
我們用一個(gè)簡(jiǎn)單的例子來(lái)了解下 Nacos 的動(dòng)態(tài)配置的功能。
1. 環(huán)境準(zhǔn)備
首先,我們需要搭建一個(gè)Nacos 服務(wù)端,由于官網(wǎng)的quick-start已經(jīng)對(duì)此做了詳細(xì)的解讀,我們這里就不在贅述

- https://nacos.io/zh-cn/docs/quick-start.html
安裝完成之后啟動(dòng),我們就可以訪問(wèn) Nacos 的控制臺(tái)了,如下圖所示:

Nacos控制臺(tái)做了簡(jiǎn)單的權(quán)限控制,默認(rèn)的賬號(hào)和密碼都是 nacos。
登錄進(jìn)去之后,是這樣的:

2.新建配置
接下來(lái)我們?cè)诳刂婆_(tái)上創(chuàng)建一個(gè)簡(jiǎn)單的配置項(xiàng),如下圖所示:

3.導(dǎo)入配置
Nacos支持導(dǎo)入配置,可以直接將配置文件壓縮包導(dǎo)入,這里我們以人人開源的微服務(wù)項(xiàng)目為例


4.配置客戶端
下面我以自己搭建的子服務(wù)為例,一起來(lái)看看Nacos配置中心的使用
首先我們需要配置一下,大家只需關(guān)注config節(jié)點(diǎn)配置就可以,discovery節(jié)點(diǎn)可以忽略
- cloud:
- nacos:
- discovery:
- metadata:
- management:
- context-path: ${server.servlet.context-path}/actuator
- server-addr: ${nacos-host:nacos-host}:${nacos-port:8848}
- #nacos的命名空間ID,默認(rèn)是public
- namespace: ${nacos-namespace:}
- service: ets-web
- config:
- server-addr: ${spring.cloud.nacos.discovery.server-addr}
- namespace: ${spring.cloud.nacos.discovery.namespace}
- group: RENREN_CLOUD_GROUP
- file-extension: yaml
- #指定共享配置,且支持動(dòng)態(tài)刷新
- extension-configs:
- - data-id: datasource.yaml
- group: ${spring.cloud.nacos.config.group}
- refresh: true
- - data-id: common.yaml
- group: ${spring.cloud.nacos.config.group}
- refresh: true
其實(shí)extension-configs節(jié)點(diǎn)的配置信息對(duì)應(yīng)的是下面的類

接下來(lái)我們啟動(dòng)服務(wù),來(lái)看看控制臺(tái)日志
5. 修改配置信息
接下來(lái)我們?cè)?Nacos 的控制臺(tái)上將我們的配置信息改為如下圖所示:
修改完配置,點(diǎn)擊 “發(fā)布” 按鈕后,客戶端將會(huì)收到最新的數(shù)據(jù),如下圖所示:

至此一個(gè)簡(jiǎn)單的動(dòng)態(tài)配置管理功能已經(jīng)講完了,刪除配置和更新配置操作類似,這里不再贅述。
6.小結(jié)
通過(guò)上面的小案例,我們大概了解了Nacos動(dòng)態(tài)配置的服務(wù)的使用方法,Nacos服務(wù)端將配置信息保存到其配置文件所配置的數(shù)據(jù)庫(kù)中,客戶端連接到服務(wù)端之后,根據(jù) dataID,Group可以獲取到具體的配置信息,當(dāng)服務(wù)端的配置發(fā)生變更時(shí),客戶端會(huì)收到通知。當(dāng)客戶端拿到變更后的最新配置信息后,就可以做自己的處理了,這非常有用,所有需要使用配置的場(chǎng)景都可以通過(guò) Nacos 來(lái)進(jìn)行管理。
二 配置中心原理(推還是拉)
現(xiàn)在我們了解了 Nacos 的動(dòng)態(tài)配置服務(wù)的功能了,但是有一個(gè)問(wèn)題我們需要弄明白,那就是 Nacos 客戶端是怎么實(shí)時(shí)獲取到 Nacos 服務(wù)端的最新數(shù)據(jù)的。
其實(shí)客戶端和服務(wù)端之間的數(shù)據(jù)交互,無(wú)外乎兩種情況:
- 服務(wù)端推數(shù)據(jù)給客戶端
- 客戶端從服務(wù)端拉數(shù)據(jù)
那到底是推還是拉呢,從 Nacos 客戶端通過(guò) Listener 來(lái)接收最新數(shù)據(jù)的這個(gè)做法來(lái)看,感覺(jué)像是服務(wù)端推的數(shù)據(jù),但是不能想當(dāng)然,要想知道答案,最快最準(zhǔn)確的方法就是從源碼中去尋找。
官方示例代碼
- try {
- // 傳遞配置
- String serverAddr = "{serverAddr}";
- String dataId = "{dataId}";
- String group = "{group}";
- Properties properties = new Properties();
- properties.put("serverAddr", serverAddr);
- // 新建 configService
- ConfigService configService = NacosFactory.createConfigService(properties);
- String content = configService.getConfig(dataId, group, 5000);
- System.out.println(content);
- // 注冊(cè)監(jiān)聽器
- configService.addListener(dataId, group, new Listener() {
- @Override
- public void receiveConfigInfo(String configInfo) {
- System.out.println("recieve1:" + configInfo);
- }
- @Override
- public Executor getExecutor() {
- return null;
- }
- });
- } catch (NacosException e) {
- // TODO
- -generated catch block
- e.printStackTrace();
- }
1.實(shí)例化 ConfigService
當(dāng)我們引包結(jié)束以后,會(huì)發(fā)現(xiàn)下面三個(gè)關(guān)于Nacos的包

從我的理解來(lái)說(shuō),api包會(huì)調(diào)用client包的能力來(lái)和Nacos服務(wù)端進(jìn)行交互.那再交互時(shí)候,主要就會(huì)用到我們接下來(lái)分析的實(shí)現(xiàn)了ConfigService接口的NacosConfigService 類
現(xiàn)在我們來(lái)看下 NacosConfigService 的構(gòu)造方法,看看 ConfigService 是怎么實(shí)例化的,如下圖所示:
- public class NacosConfigService implements ConfigService {
- private static final Logger LOGGER = LogUtils.logger(NacosConfigService.class);
- private static final long POST_TIMEOUT = 3000L;
- /**
- * http agent.
- */
- private final HttpAgent agent;
- /**
- * long polling. 這里是長(zhǎng)輪詢
- */
- private final ClientWorker worker;
- private String namespace;
- private final String encode;
- //省略其他代碼
- //構(gòu)造方法
- ic NacosConfigService(Properties properties) throws NacosException {
- ValidatorUtils.checkInitParam(properties);
- String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
- if (StringUtils.isBlank(encodeTmp)) {
- this.encode = Constants.ENCODE;
- } else {
- this.encode = encodeTmp.trim();
- }
- initNamespace(properties);
- //對(duì)象1
- this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
- this.agent.start();
- //對(duì)象2
- this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties);
- }
實(shí)例化時(shí)主要是初始化了兩個(gè)對(duì)象,他們分別是:
- HttpAgent
- ClientWorker
HttpAgent
其中 agent 是通過(guò)裝飾器模式實(shí)現(xiàn)的,ServerHttpAgent 是實(shí)際工作的類,MetricsHttpAgent 在內(nèi)部也是調(diào)用了 ServerHttpAgent 的方法,另外加上了一些統(tǒng)計(jì)操作,所以我們只需要關(guān)心 ServerHttpAgent 的功能就可以了。
不熟悉的同學(xué),可以看菜鳥教程對(duì)裝飾器模式的解讀
agent 實(shí)際是在 ClientWorker 中發(fā)揮能力的,而 ClientWorker 也是真正的打工人,下面我們來(lái)看下 ClientWorker 類。
ClientWorker
以下是 ClientWorker 的構(gòu)造方法,如下圖所示:
- public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
- final Properties properties) {
- this.agent = agent;
- this.configFilterChainManager = configFilterChainManager;
- // Initialize the timeout parameter
- init(properties);
- //創(chuàng)建了一個(gè)定時(shí)任務(wù)的線程池
- this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
- t.setDaemon(true);
- return t;
- }
- });
- //創(chuàng)建了一個(gè)保持長(zhǎng)輪詢的線程池
- this.executorService = Executors
- .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r);
- t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
- t.setDaemon(true);
- return t;
- }
- });
- //創(chuàng)建了一個(gè)延遲任務(wù)線程池來(lái)每隔10ms來(lái)檢查配置信息的線程池
- this.executor.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- try {
- checkConfigInfo();
- } catch (Throwable e) {
- LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
- }
- }
- }, 1L, 10L, TimeUnit.MILLISECONDS);
- }
可以看到 ClientWorker 除了將 HttpAgent 維持在自己內(nèi)部,還創(chuàng)建了兩個(gè)線程池:
- final ScheduledExecutorService executor;
- final ScheduledExecutorService executorService;
- 第一個(gè)線程池負(fù)責(zé)與配置中心進(jìn)行數(shù)據(jù)的交互,并且啟動(dòng)后延遲1ms,之后每隔10ms對(duì)配置信息進(jìn)行定時(shí)檢查
- 第二個(gè)線程池則是負(fù)責(zé)保持一個(gè)長(zhǎng)輪詢鏈接
接下來(lái)讓我們來(lái)看下 executor 每 10ms 執(zhí)行的方法到底做了什么工作,如下圖所示:
- /**
- * groupKey -> cacheData.
- */
- private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(
- new HashMap<String, CacheData>());
- /**
- * Check config info. 檢查配置信息
- */
- public void checkConfigInfo() {
- // 分任務(wù)(解決大數(shù)據(jù)量的傳輸問(wèn)題)
- int listenerSize = cacheMap.get().size();
- // 向上取整為批數(shù),分批次進(jìn)行檢查
- //ParamUtil.getPerTaskConfigSize() =3000
- int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
- if (longingTaskCount > currentLongingTaskCount) {
- for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
- // 要判斷任務(wù)是否在執(zhí)行 這塊需要好好想想。 任務(wù)列表現(xiàn)在是無(wú)序的。變化過(guò)程可能有問(wèn)題
- executorService.execute(new LongPollingRunnable(i));
- }
- currentLongingTaskCount = longingTaskCount;
- }
- }
這里主要是先去拿緩存中 Map
現(xiàn)在我們來(lái)看看 LongPollingRunnable 做了什么,主要分為兩部分,
- 第一部分是檢查本地的配置信息,
- 第二部分是獲取服務(wù)端的配置信息然后更新到本地。
1.本地檢查
首先取出與該 taskId 相關(guān)的 CacheData,然后對(duì) CacheData 進(jìn)行檢查,包括本地配置檢查和緩存數(shù)據(jù)的 md5 檢查,本地檢查主要是做一個(gè)故障容錯(cuò),當(dāng)服務(wù)端掛掉后,Nacos 客戶端可以從本地的文件系統(tǒng)中獲取相關(guān)的配置信息,如下圖所示:
- public void run() {
- List<CacheData> cacheDatas = new ArrayList<CacheData>();
- List<String> inInitializingCacheList = new ArrayList<String>();
- try {
- //
- for (CacheData cacheData : cacheMap.get().values()) {
- if (cacheData.getTaskId() == taskId) {
- cacheDatas.add(cacheData);
- try {
- //執(zhí)行檢查本地配置
- checkLocalConfig(cacheData);
- if (cacheData.isUseLocalConfigInfo()) {
- //緩存數(shù)據(jù)的md5的檢查
- cacheData.checkListenerMd5();
- }
- } catch (Exception e) {
- LOGGER.error("get local config info error", e);
- }
- }
- }
- }
- //檢查本地配置
- private void checkLocalConfig(CacheData cacheData) {
- final String dataId = cacheData.dataId;
- final String group = cacheData.group;
- final String tenant = cacheData.tenant;
- //本地緩存文件
- File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);
- //不使用本地配置,但是持久化文件存在,需要讀取文件加載至內(nèi)存
- if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
- String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
- final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
- cacheData.setUseLocalConfigInfo(true);
- cacheData.setLocalConfigInfoVersion(path.lastModified());
- cacheData.setContent(content);
- LOGGER.warn(
- "[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}",
- agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
- return;
- }
- // 有 -> 沒(méi)有。不通知業(yè)務(wù)監(jiān)聽器,從server拿到配置后通知。
- //使用本地配置,但是持久化文件不存在
- if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
- cacheData.setUseLocalConfigInfo(false);
- LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(),
- dataId, group, tenant);
- return;
- }
- // 有變更
- //使用本地配置,持久化文件存在,緩存跟文件最后修改時(shí)間不一致
- if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path
- .lastModified()) {
- String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
- final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE);
- cacheData.setUseLocalConfigInfo(true);
- cacheData.setLocalConfigInfoVersion(path.lastModified());
- cacheData.setContent(content);
- LOGGER.warn(
- "[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}",
- agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content));
- }
- }
本地檢查主要是通過(guò)是否使用本地配置,繼而尋找持久化緩存文件,再通過(guò)判斷文件的最后修改事件與本地緩存的版本是否一致來(lái)判斷是否由變更
通過(guò)跟蹤 checkLocalConfig 方法,可以看到 Nacos 將緩存配置信息保存在了
- ~/nacos/config/fixed-{address}_8848_nacos/snapshot/DEFAULT_GROUP/{dataId}
這個(gè)文件中,我們看下這個(gè)文件中保存的內(nèi)容,如下圖所示:

2.服務(wù)端檢查
然后通過(guò) checkUpdateDataIds() 方法從服務(wù)端獲取值變化的 dataId 列表,
通過(guò) getServerConfig 方法,根據(jù) dataId 到服務(wù)端獲取最新的配置信息,接著將最新的配置信息保存到 CacheData 中。
最后調(diào)用 CacheData 的 checkListenerMd5 方法,可以看到該方法在第一部分也被調(diào)用過(guò),我們需要重點(diǎn)關(guān)注一下。
- // 檢查服務(wù)器配置
- List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
- if (!CollectionUtils.isEmpty(changedGroupKeys)) {
- LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
- }
- for (String groupKey : changedGroupKeys) {
- String[] key = GroupKey.parseKey(groupKey);
- String dataId = key[0];
- String group = key[1];
- String tenant = null;
- if (key.length == 3) {
- tenant = key[2];
- }
- try {
- //從服務(wù)器端獲取相關(guān)id的最新配置
- String[] ct = getServerConfig(dataId, group, tenant, 3000L);
- CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
- cache.setContent(ct[0]);
- if (null != ct[1]) {
- cache.setType(ct[1]);
- }
- LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
- agent.getName(), dataId, group, tenant, cache.getMd5(),
- ContentUtils.truncateContent(ct[0]), ct[1]);
- } catch (NacosException ioe) {
- String message = String
- .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
- agent.getName(), dataId, group, tenant);
- LOGGER.error(message, ioe);
- }
- }
- for (CacheData cacheData : cacheDatas) {
- if (!cacheData.isInitializing() || inInitializingCacheList
- .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
- //校驗(yàn)MD5值
- cacheData.checkListenerMd5();
- cacheData.setInitializing(false);
- }
- }
- inInitializingCacheList.clear();
- executorService.execute(this);
- catch (Throwable e) {
- // If the rotation training task is abnormal, the next execution time of the task will be punished
- LOGGER.error("longPolling error : ", e);
- executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
這里大家也發(fā)現(xiàn),當(dāng)客戶端從服務(wù)器拉去配置文件之后,會(huì)將配置文件在本地進(jìn)行緩存,所以,一般會(huì)優(yōu)先使用本地配置,如果本地文件不存在或者內(nèi)容為空,則再通過(guò) HTTP GET 方法從遠(yuǎn)端拉取配置,并保存到本地緩存中
- private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
- group = null2defaultGroup(group);
- ParamUtils.checkKeyParam(dataId, group);
- ConfigResponse cr = new ConfigResponse();
- cr.setDataId(dataId);
- cr.setTenant(tenant);
- cr.setGroup(group);
- // 優(yōu)先使用本地配置
- String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
- if (content != null) {
- LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),
- dataId, group, tenant, ContentUtils.truncateContent(content));
- cr.setContent(content);
- configFilterChainManager.doFilter(null, cr);
- content = cr.getContent();
- return content;
- }
- try {
- String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs);
- cr.setContent(ct[0]);
- configFilterChainManager.doFilter(null, cr);
- content = cr.getContent();
- return content;
- } catch (NacosException ioe) {
- if (NacosException.NO_RIGHT == ioe.getErrCode()) {
- throw ioe;
- }
- LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",
- agent.getName(), dataId, group, tenant, ioe.toString());
- }
- LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", agent.getName(),
- dataId, group, tenant, ContentUtils.truncateContent(content));
- content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant);
- cr.setContent(content);
- configFilterChainManager.doFilter(null, cr);
- content = cr.getContent();
- return content;
- }
2.添加 Listener
好了現(xiàn)在我們可以為 ConfigService 來(lái)添加一個(gè) Listener 了,最終是調(diào)用了 ClientWorker 的 addTenantListeners 方法,如下圖所示:
- /**
- * Add listeners for tenant.
- *
- * @param dataId dataId of data
- * @param group group of data
- * @param listeners listeners
- * @throws NacosException nacos exception
- */
- public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
- throws NacosException {
- //設(shè)置默認(rèn)組
- group = null2defaultGroup(group);
- String tenant = agent.getTenant();
- CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
- for (Listener listener : listeners) {
- cache.addListener(listener);
- }
- }
該方法分為兩個(gè)部分,首先根據(jù) dataId,group 和tenant獲取一個(gè) CacheData 對(duì)象,然后將當(dāng)前要添加的 listener 對(duì)象添加到 CacheData 中去。
接下來(lái),我們要重點(diǎn)關(guān)注下 CacheData 類了。
3.本地緩存CacheData
首先讓我們來(lái)看一下 CacheData 中的成員變量,如下圖所示:
- private final String name;
- private final ConfigFilterChainManager configFilterChainManager;
- public final String dataId;
- public final String group;
- public final String tenant;
- //監(jiān)聽器
- private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
- private volatile String md5;
- /**
- * whether use local config.
- */
- private volatile boolean isUseLocalConfig = false;
- /**
- * last modify time.
- */
- private volatile long localConfigLastModified;
- private volatile String content;
- private int taskId;
- private volatile boolean isInitializing = true;
- private String type;
我們可以看到,成員變量包括tenant ,dataId,group,content,taskId等,還有兩個(gè)值得我們關(guān)注的:
- listeners
- md5
listeners 是該 CacheData 所關(guān)聯(lián)的所有 listener,不過(guò)不是保存的原始的 Listener對(duì)象,而是包裝后的 ManagerListenerWrap 對(duì)象,該對(duì)象除了持有 Listener 對(duì)象,還持有了一個(gè) lastCallMd5 和lastContent屬性。
- private static class ManagerListenerWrap {
- final Listener listener;
- //關(guān)注
- String lastCallMd5 = CacheData.getMd5String(null);
- String lastContent = null;
- ManagerListenerWrap(Listener listener) {
- this.listener = listener;
- }
- ManagerListenerWrap(Listener listener, String md5) {
- this.listener = listener;
- this.lastCallMd5 = md5;
- }
- ManagerListenerWrap(Listener listener, String md5, String lastContent) {
- this.listener = listener;
- this.lastCallMd5 = md5;
- this.lastContent = lastContent;
- }
- }
另外一個(gè)屬性 md5 就是根據(jù)當(dāng)前對(duì)象的 content 計(jì)算出來(lái)的 md5 值。
4.觸發(fā)監(jiān)聽器回調(diào)
現(xiàn)在我們對(duì) ConfigService 有了大致的了解了,現(xiàn)在剩下最后一個(gè)重要的問(wèn)題還沒(méi)有答案,那就是 ConfigService 的 Listener 是在什么時(shí)候觸發(fā)回調(diào)方法 receiveConfigInfo 的。
現(xiàn)在讓我們回過(guò)頭來(lái)想一下,在 ClientWorker 中的定時(shí)任務(wù)中,啟動(dòng)了一個(gè)長(zhǎng)輪詢的任務(wù):LongPollingRunnable,該任務(wù)多次執(zhí)行了 cacheData.checkListenerMd5() 方法,那現(xiàn)在就讓我們來(lái)看下這個(gè)方法到底做了些什么,如下圖所示:
- void checkListenerMd5() {
- for (ManagerListenerWrap wrap : listeners) {
- if (!md5.equals(wrap.lastCallMd5)) {
- safeNotifyListener(dataId, group, content, type, md5, wrap);
- }
- }
- }
到這里應(yīng)該就比較清晰了,該方法會(huì)檢查 CacheData 當(dāng)前的 md5 與 CacheData 持有的所有 Listener 中保存的 md5 的值是否一致,如果不一致,就執(zhí)行一個(gè)安全的監(jiān)聽器的通知方法:safeNotifyListener,通知什么呢?我們可以大膽的猜一下,應(yīng)該是通知 Listener 的使用者,該 Listener 所關(guān)注的配置信息已經(jīng)發(fā)生改變了。現(xiàn)在讓我們來(lái)看一下 safeNotifyListener 方法,如下圖所示:
- private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
- final String md5, final ManagerListenerWrap listenerWrap) {
- final Listener listener = listenerWrap.listener;
- Runnable job = new Runnable() {
- @Override
- public void run() {
- ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
- ClassLoader appClassLoader = listener.getClass().getClassLoader();
- try {
- if (listener instanceof AbstractSharedListener) {
- AbstractSharedListener adapter = (AbstractSharedListener) listener;
- adapter.fillContext(dataId, group);
- LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
- }
- // 執(zhí)行回調(diào)之前先將線程classloader設(shè)置為具體webapp的classloader,以免回調(diào)方法中調(diào)用spi接口是出現(xiàn)異常或錯(cuò)用(多應(yīng)用部署才會(huì)有該問(wèn)題)。
- Thread.currentThread().setContextClassLoader(appClassLoader);
- ConfigResponse cr = new ConfigResponse();
- cr.setDataId(dataId);
- cr.setGroup(group);
- cr.setContent(content);
- //重點(diǎn)關(guān)注,在這里調(diào)用
- //重點(diǎn)關(guān)注,在這里調(diào)用
- //重點(diǎn)關(guān)注,在這里調(diào)用
- configFilterChainManager.doFilter(null, cr);
- String contentTmp = cr.getContent();
- listener.receiveConfigInfo(contentTmp);
- // compare lastContent and content
- if (listener instanceof AbstractConfigChangeListener) {
- Map data = ConfigChangeHandler.getInstance()
- .parseChangeData(listenerWrap.lastContent, content, type);
- ConfigChangeEvent event = new ConfigChangeEvent(data);
- ((AbstractConfigChangeListener) listener).receiveConfigChange(event);
- listenerWrap.lastContent = content;
- }
- listenerWrap.lastCallMd5 = md5;
- LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
- listener);
- } catch (NacosException ex) {
- LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
- name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
- } catch (Throwable t) {
- LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
- group, md5, listener, t.getCause());
- } finally {
- Thread.currentThread().setContextClassLoader(myClassLoader);
- }
- }
- };
- final long startNotify = System.currentTimeMillis();
- try {
- if (null != listener.getExecutor()) {
- listener.getExecutor().execute(job);
- } else {
- job.run();
- }
- } catch (Throwable t) {
- LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
- group, md5, listener, t.getCause());
- }
- final long finishNotify = System.currentTimeMillis();
- LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
- name, (finishNotify - startNotify), dataId, group, md5, listener);
- }
可以看到在 safeNotifyListener 方法中,重點(diǎn)關(guān)注下紅框中的三行代碼:獲取最新的配置信息,調(diào)用 Listener 的回調(diào)方法,將最新的配置信息作為參數(shù)傳入,這樣 Listener 的使用者就能接收到變更后的配置信息了,最后更新 ListenerWrap 的 md5 值。和我們猜測(cè)的一樣, Listener 的回調(diào)方法就是在該方法中觸發(fā)的。
5.Md5何時(shí)變更
那 CacheData 的 md5 值是何時(shí)發(fā)生改變的呢?我們可以回想一下,在上面的 LongPollingRunnable 所執(zhí)行的任務(wù)中,在獲取服務(wù)端發(fā)生變更的配置信息時(shí),將最新的 content 數(shù)據(jù)寫入了 CacheData 中,我們可以看下該方法如下:
- public void setContent(String content) {
- this.content = content;
- this.md5 = getMd5String(this.content);
- }
可以看到是在長(zhǎng)輪詢的任務(wù)中,當(dāng)服務(wù)端配置信息發(fā)生變更時(shí),客戶端將最新的數(shù)據(jù)獲取下來(lái)之后,保存在了 CacheData 中,同時(shí)更新了該 CacheData 的 md5 值,所以當(dāng)下次執(zhí)行 checkListenerMd5 方法時(shí),就會(huì)發(fā)現(xiàn)當(dāng)前 listener 所持有的 md5 值已經(jīng)和 CacheData 的 md5 值不一樣了,也就意味著服務(wù)端的配置信息發(fā)生改變了,這時(shí)就需要將最新的數(shù)據(jù)通知給 Listener 的持有者。
至此配置中心的完整流程已經(jīng)分析完畢了,可以發(fā)現(xiàn),Nacos 并不是通過(guò)推的方式將服務(wù)端最新的配置信息發(fā)送給客戶端的,而是客戶端維護(hù)了一個(gè)長(zhǎng)輪詢的任務(wù),定時(shí)去拉取發(fā)生變更的配置信息,然后將最新的數(shù)據(jù)推送給 Listener 的持有者。
6.為什么要拉?
客戶端拉取服務(wù)端的數(shù)據(jù)與服務(wù)端推送數(shù)據(jù)給客戶端相比,優(yōu)勢(shì)在哪呢,為什么 Nacos 不設(shè)計(jì)成主動(dòng)推送數(shù)據(jù),而是要客戶端去拉取呢?如果用推的方式,服務(wù)端需要維持與客戶端的長(zhǎng)連接,這樣的話需要耗費(fèi)大量的資源,并且還需要考慮連接的有效性,例如需要通過(guò)心跳來(lái)維持兩者之間的連接。而用拉取的方式,客戶端只需要通過(guò)一個(gè)無(wú)狀態(tài)的 http 請(qǐng)求即可獲取到服務(wù)端的數(shù)據(jù)。
三 總結(jié)
現(xiàn)在,我們來(lái)簡(jiǎn)單復(fù)盤一下Nacos客戶端視角下的配置中心實(shí)現(xiàn)原理
首先我們假設(shè)Nacos服務(wù)端一切正常,Nacos客戶端啟動(dòng)以后
第一步是根據(jù)我們配置的服務(wù)端信息,新建 ConfigService 實(shí)例,它的實(shí)現(xiàn)就是我們文中提到的NacosConfigService;
第二步可以通過(guò)相應(yīng)的接口獲取配置和注冊(cè)配置監(jiān)聽器,
考慮到服務(wù)端故障的問(wèn)題,客戶端將最新數(shù)據(jù)獲取后會(huì)保存在本地的 緩存文件中,以后會(huì)優(yōu)先從文件中獲取配置信息的值,如果獲取不到,會(huì)直接從服務(wù)器拉去,并保存到緩存中;
其實(shí)真正干活的就是ClientWorker類;客戶端是通過(guò)一個(gè)定時(shí)的長(zhǎng)輪詢來(lái)檢查自己監(jiān)聽的配置項(xiàng)的數(shù)據(jù)的,一旦服務(wù)端的數(shù)據(jù)發(fā)生變化時(shí),會(huì)從服務(wù)端獲取到dataID的列表,
客戶端根據(jù)dataID列表從服務(wù)端獲取到最新的數(shù)據(jù),并將最新的數(shù)據(jù)保存在一個(gè) CacheData 對(duì)象中,在輪詢過(guò)程中,如果決定使用本地配置,就會(huì)比較當(dāng)前CacheData 的MD5值是否和所有監(jiān)聽者所持有的MD5值相等,如果不相等,,此時(shí)就會(huì)對(duì)該 CacheData 所綁定的 Listener 觸發(fā) receiveConfigInfo 回調(diào),來(lái)通知使用者此配置信息已經(jīng)變更;
原文地址:https://mp.weixin.qq.com/s/qmT-SsYr6yPmqEtN-4XAoQ