激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|JAVA教程|ASP教程|編程技術(shù)|正則表達(dá)式|C/C++|IOS|C#|Swift|Android|JavaScript|易語(yǔ)言|

服務(wù)器之家 - 編程語(yǔ)言 - JAVA教程 - 從Nacos客戶端視角來(lái)分析一下配置中心實(shí)現(xiàn)原理

從Nacos客戶端視角來(lái)分析一下配置中心實(shí)現(xiàn)原理

2021-03-16 01:27今日J(rèn)ava麥洛 JAVA教程

今天我們一起從Nacos客戶端視角來(lái)看看配置中心實(shí)現(xiàn)原理;整理這篇文章時(shí)候,也參照學(xué)習(xí)了部分大佬的博客,這里致謝!

從Nacos客戶端視角來(lái)分析一下配置中心實(shí)現(xiàn)原理

一 動(dòng)態(tài)配置

  • 1. 環(huán)境準(zhǔn)備
  • 2.新建配置
  • 3.導(dǎo)入配置
  • 4.配置客戶端
  • 5. 修改配置信息
  • 6.小結(jié)

二 配置中心原理(推還是拉)

  • 1.實(shí)例化 ConfigService
  • 2.添加 Listener
  • 3.CacheData
  • 4.觸發(fā)回調(diào)
  • 5.Md5何時(shí)變更
  • 6.拉的優(yōu)勢(shì)

三 總結(jié)

Hello,大家好,我是麥洛,今天我們一起從Nacos客戶端視角來(lái)看看配置中心實(shí)現(xiàn)原理;整理這篇文章時(shí)候,也參照學(xué)習(xí)了部分大佬的博客,這里致謝;

在開始閱讀文章之前,有些思路我按我的理解先闡述一些,方便大家更快理清思路,不對(duì)的地方還請(qǐng)大家批評(píng)指正;

  1. Nacos客戶端會(huì)在在本地緩存服務(wù)端配置文件,防止服務(wù)器奔潰情況下,導(dǎo)致服務(wù)不可用;
  2. 本地緩存類在代碼中的體現(xiàn)就是我們下面提到的CacheData,我們知道對(duì)應(yīng)服務(wù)端一個(gè)配置,肯定可以同時(shí)被多個(gè)客戶端所使用,當(dāng)這個(gè)配置發(fā)生變更,如何去通知到每一個(gè)客戶端?
  3. 客戶端啟動(dòng)之后,回去注冊(cè)監(jiān)視器,監(jiān)視器最終會(huì)被保存到CacheData類中CopyOnWriteArrayList listeners字段,那么,反過(guò)來(lái),當(dāng)執(zhí)行監(jiān)視器回調(diào)方法時(shí),就可以找到所有客戶端
  4. 長(zhǎng)輪詢左右主要就是刷新配置,保持服務(wù)端配置和本地緩存配置保持一致;

首先,我們來(lái)看看Nacos官網(wǎng)給出的Nacos地圖,我們可以清楚的看到,動(dòng)態(tài)配置服務(wù)是 Nacos 的三大功能之一;

從Nacos客戶端視角來(lái)分析一下配置中心實(shí)現(xiàn)原理

這里借用官網(wǎng)的描述,一起來(lái)看看Nacos 為我們帶來(lái)什么黑科技?

動(dòng)態(tài)配置服務(wù)可以讓您以中心化、外部化和動(dòng)態(tài)化的方式管理所有環(huán)境的應(yīng)用配置和服務(wù)配置。動(dòng)態(tài)配置消除了配置變更時(shí)重新部署應(yīng)用和服務(wù)的需要,讓配置管理變得更加高效和敏捷。配置中心化管理讓實(shí)現(xiàn)無(wú)狀態(tài)服務(wù)變得更簡(jiǎn)單,讓服務(wù)按需彈性擴(kuò)展變得更容易。

所以,有了Nacos ,可能我們以前上線打包弄錯(cuò)配置文件,改配置需要重啟服務(wù)等一系列問(wèn)題,都會(huì)顯著改觀

一 動(dòng)態(tài)配置

 

下面我將來(lái)和大家一起來(lái)了解下 Nacos 的動(dòng)態(tài)配置的能力,看看 Nacos 是如何以簡(jiǎn)單、優(yōu)雅、高效的方式管理配置,實(shí)現(xiàn)配置的動(dòng)態(tài)變更的。

我們用一個(gè)簡(jiǎn)單的例子來(lái)了解下 Nacos 的動(dòng)態(tài)配置的功能。

1. 環(huán)境準(zhǔn)備

首先,我們需要搭建一個(gè)Nacos 服務(wù)端,由于官網(wǎng)的quick-start已經(jīng)對(duì)此做了詳細(xì)的解讀,我們這里就不在贅述

從Nacos客戶端視角來(lái)分析一下配置中心實(shí)現(xiàn)原理
  1. https://nacos.io/zh-cn/docs/quick-start.html 

安裝完成之后啟動(dòng),我們就可以訪問(wèn) Nacos 的控制臺(tái)了,如下圖所示:

從Nacos客戶端視角來(lái)分析一下配置中心實(shí)現(xiàn)原理

Nacos控制臺(tái)做了簡(jiǎn)單的權(quán)限控制,默認(rèn)的賬號(hào)和密碼都是 nacos。

登錄進(jìn)去之后,是這樣的:

從Nacos客戶端視角來(lái)分析一下配置中心實(shí)現(xiàn)原理

2.新建配置

接下來(lái)我們?cè)诳刂婆_(tái)上創(chuàng)建一個(gè)簡(jiǎn)單的配置項(xiàng),如下圖所示:

從Nacos客戶端視角來(lái)分析一下配置中心實(shí)現(xiàn)原理

3.導(dǎo)入配置

Nacos支持導(dǎo)入配置,可以直接將配置文件壓縮包導(dǎo)入,這里我們以人人開源的微服務(wù)項(xiàng)目為例

從Nacos客戶端視角來(lái)分析一下配置中心實(shí)現(xiàn)原理

從Nacos客戶端視角來(lái)分析一下配置中心實(shí)現(xiàn)原理
從Nacos客戶端視角來(lái)分析一下配置中心實(shí)現(xiàn)原理

4.配置客戶端

下面我以自己搭建的子服務(wù)為例,一起來(lái)看看Nacos配置中心的使用

首先我們需要配置一下,大家只需關(guān)注config節(jié)點(diǎn)配置就可以,discovery節(jié)點(diǎn)可以忽略

  1. cloud: 
  2.   nacos: 
  3.     discovery: 
  4.       metadata: 
  5.         management: 
  6.           context-path: ${server.servlet.context-path}/actuator 
  7.       server-addr: ${nacos-host:nacos-host}:${nacos-port:8848} 
  8.       #nacos的命名空間ID,默認(rèn)是public 
  9.       namespace: ${nacos-namespace:} 
  10.       service: ets-web 
  11.     config: 
  12.       server-addr: ${spring.cloud.nacos.discovery.server-addr} 
  13.       namespace: ${spring.cloud.nacos.discovery.namespace} 
  14.       group: RENREN_CLOUD_GROUP 
  15.       file-extension: yaml 
  16.       #指定共享配置,且支持動(dòng)態(tài)刷新 
  17.       extension-configs: 
  18.         - data-id: datasource.yaml 
  19.           group: ${spring.cloud.nacos.config.group
  20.           refresh: true 
  21.         - data-id: common.yaml 
  22.           group: ${spring.cloud.nacos.config.group
  23.           refresh: true 

其實(shí)extension-configs節(jié)點(diǎn)的配置信息對(duì)應(yīng)的是下面的類

從Nacos客戶端視角來(lái)分析一下配置中心實(shí)現(xiàn)原理

接下來(lái)我們啟動(dòng)服務(wù),來(lái)看看控制臺(tái)日志

 從Nacos客戶端視角來(lái)分析一下配置中心實(shí)現(xiàn)原理

5. 修改配置信息

接下來(lái)我們?cè)?Nacos 的控制臺(tái)上將我們的配置信息改為如下圖所示:

 從Nacos客戶端視角來(lái)分析一下配置中心實(shí)現(xiàn)原理

修改完配置,點(diǎn)擊 “發(fā)布” 按鈕后,客戶端將會(huì)收到最新的數(shù)據(jù),如下圖所示:

從Nacos客戶端視角來(lái)分析一下配置中心實(shí)現(xiàn)原理

至此一個(gè)簡(jiǎn)單的動(dòng)態(tài)配置管理功能已經(jīng)講完了,刪除配置和更新配置操作類似,這里不再贅述。

6.小結(jié)

通過(guò)上面的小案例,我們大概了解了Nacos動(dòng)態(tài)配置的服務(wù)的使用方法,Nacos服務(wù)端將配置信息保存到其配置文件所配置的數(shù)據(jù)庫(kù)中,客戶端連接到服務(wù)端之后,根據(jù) dataID,Group可以獲取到具體的配置信息,當(dāng)服務(wù)端的配置發(fā)生變更時(shí),客戶端會(huì)收到通知。當(dāng)客戶端拿到變更后的最新配置信息后,就可以做自己的處理了,這非常有用,所有需要使用配置的場(chǎng)景都可以通過(guò) Nacos 來(lái)進(jìn)行管理。

二 配置中心原理(推還是拉)

 

現(xiàn)在我們了解了 Nacos 的動(dòng)態(tài)配置服務(wù)的功能了,但是有一個(gè)問(wèn)題我們需要弄明白,那就是 Nacos 客戶端是怎么實(shí)時(shí)獲取到 Nacos 服務(wù)端的最新數(shù)據(jù)的。

其實(shí)客戶端和服務(wù)端之間的數(shù)據(jù)交互,無(wú)外乎兩種情況:

  • 服務(wù)端推數(shù)據(jù)給客戶端
  • 客戶端從服務(wù)端拉數(shù)據(jù)

那到底是推還是拉呢,從 Nacos 客戶端通過(guò) Listener 來(lái)接收最新數(shù)據(jù)的這個(gè)做法來(lái)看,感覺(jué)像是服務(wù)端推的數(shù)據(jù),但是不能想當(dāng)然,要想知道答案,最快最準(zhǔn)確的方法就是從源碼中去尋找。

官方示例代碼

  1. try { 
  2.     // 傳遞配置 
  3.     String serverAddr = "{serverAddr}"
  4.     String dataId = "{dataId}"
  5.     String group = "{group}"
  6.     Properties properties = new Properties(); 
  7.     properties.put("serverAddr", serverAddr); 
  8.  
  9.     // 新建 configService 
  10.     ConfigService configService = NacosFactory.createConfigService(properties); 
  11.     String content = configService.getConfig(dataId, group, 5000); 
  12.     System.out.println(content); 
  13.  
  14.     // 注冊(cè)監(jiān)聽器 
  15.     configService.addListener(dataId, group, new Listener() { 
  16.     @Override 
  17.     public void receiveConfigInfo(String configInfo) { 
  18.         System.out.println("recieve1:" + configInfo); 
  19.     } 
  20.     @Override 
  21.     public Executor getExecutor() { 
  22.         return null
  23.     } 
  24. }); 
  25. } catch (NacosException e) { 
  26.     // TODO  
  27.     -generated catch block 
  28.     e.printStackTrace(); 

1.實(shí)例化 ConfigService

當(dāng)我們引包結(jié)束以后,會(huì)發(fā)現(xiàn)下面三個(gè)關(guān)于Nacos的包

從Nacos客戶端視角來(lái)分析一下配置中心實(shí)現(xiàn)原理

從我的理解來(lái)說(shuō),api包會(huì)調(diào)用client包的能力來(lái)和Nacos服務(wù)端進(jìn)行交互.那再交互時(shí)候,主要就會(huì)用到我們接下來(lái)分析的實(shí)現(xiàn)了ConfigService接口的NacosConfigService 類

現(xiàn)在我們來(lái)看下 NacosConfigService 的構(gòu)造方法,看看 ConfigService 是怎么實(shí)例化的,如下圖所示:

  1. public class NacosConfigService implements ConfigService { 
  2.      
  3.     private static final Logger LOGGER = LogUtils.logger(NacosConfigService.class); 
  4.      
  5.     private static final long POST_TIMEOUT = 3000L; 
  6.      
  7.     /** 
  8.      * http agent. 
  9.      */ 
  10.     private final HttpAgent agent; 
  11.      
  12.     /** 
  13.      * long polling.  這里是長(zhǎng)輪詢 
  14.      */ 
  15.     private final ClientWorker worker; 
  16.      
  17.     private String namespace; 
  18.      
  19.     private final String encode; 
  20.     //省略其他代碼 

  1. //構(gòu)造方法 
  2. ic NacosConfigService(Properties properties) throws NacosException { 
  3.     ValidatorUtils.checkInitParam(properties); 
  4.     String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); 
  5.     if (StringUtils.isBlank(encodeTmp)) { 
  6.         this.encode = Constants.ENCODE; 
  7.     } else { 
  8.         this.encode = encodeTmp.trim(); 
  9.     } 
  10.     initNamespace(properties); 
  11.     //對(duì)象1 
  12.     this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); 
  13.     this.agent.start(); 
  14.     //對(duì)象2 
  15.     this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); 

實(shí)例化時(shí)主要是初始化了兩個(gè)對(duì)象,他們分別是:

  • HttpAgent
  • ClientWorker

HttpAgent

其中 agent 是通過(guò)裝飾器模式實(shí)現(xiàn)的,ServerHttpAgent 是實(shí)際工作的類,MetricsHttpAgent 在內(nèi)部也是調(diào)用了 ServerHttpAgent 的方法,另外加上了一些統(tǒng)計(jì)操作,所以我們只需要關(guān)心 ServerHttpAgent 的功能就可以了。

不熟悉的同學(xué),可以看菜鳥教程對(duì)裝飾器模式的解讀

agent 實(shí)際是在 ClientWorker 中發(fā)揮能力的,而 ClientWorker 也是真正的打工人,下面我們來(lái)看下 ClientWorker 類。

ClientWorker

以下是 ClientWorker 的構(gòu)造方法,如下圖所示:

  1. public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, 
  2.         final Properties properties) { 
  3.     this.agent = agent; 
  4.     this.configFilterChainManager = configFilterChainManager; 
  5.      
  6.     // Initialize the timeout parameter 
  7.      
  8.     init(properties); 
  9.     //創(chuàng)建了一個(gè)定時(shí)任務(wù)的線程池 
  10.     this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { 
  11.         @Override 
  12.         public Thread newThread(Runnable r) { 
  13.             Thread t = new Thread(r); 
  14.             t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); 
  15.             t.setDaemon(true); 
  16.             return t; 
  17.         } 
  18.     }); 
  19.     //創(chuàng)建了一個(gè)保持長(zhǎng)輪詢的線程池 
  20.     this.executorService = Executors 
  21.             .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { 
  22.                 @Override 
  23.                 public Thread newThread(Runnable r) { 
  24.                     Thread t = new Thread(r); 
  25.                     t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); 
  26.                     t.setDaemon(true); 
  27.                     return t; 
  28.                 } 
  29.             }); 
  30.      
  31.     //創(chuàng)建了一個(gè)延遲任務(wù)線程池來(lái)每隔10ms來(lái)檢查配置信息的線程池 
  32.     this.executor.scheduleWithFixedDelay(new Runnable() { 
  33.         @Override 
  34.         public void run() { 
  35.             try { 
  36.                 checkConfigInfo(); 
  37.             } catch (Throwable e) { 
  38.                 LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); 
  39.             } 
  40.         } 
  41.     }, 1L, 10L, TimeUnit.MILLISECONDS); 

可以看到 ClientWorker 除了將 HttpAgent 維持在自己內(nèi)部,還創(chuàng)建了兩個(gè)線程池:

  1. final ScheduledExecutorService executor; 
  2.      
  3. final ScheduledExecutorService executorService; 
  • 第一個(gè)線程池負(fù)責(zé)與配置中心進(jìn)行數(shù)據(jù)的交互,并且啟動(dòng)后延遲1ms,之后每隔10ms對(duì)配置信息進(jìn)行定時(shí)檢查
  • 第二個(gè)線程池則是負(fù)責(zé)保持一個(gè)長(zhǎng)輪詢鏈接

接下來(lái)讓我們來(lái)看下 executor 每 10ms 執(zhí)行的方法到底做了什么工作,如下圖所示:

  1. /** 
  2.  * groupKey -> cacheData. 
  3.  */ 
  4. private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>( 
  5.         new HashMap<String, CacheData>()); 

  1. /** 
  2.   * Check config info. 檢查配置信息 
  3.   */ 
  4.  public void checkConfigInfo() { 
  5.      // 分任務(wù)(解決大數(shù)據(jù)量的傳輸問(wèn)題) 
  6.      int listenerSize = cacheMap.get().size(); 
  7.      // 向上取整為批數(shù),分批次進(jìn)行檢查 
  8.      //ParamUtil.getPerTaskConfigSize() =3000 
  9.      int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); 
  10.      if (longingTaskCount > currentLongingTaskCount) { 
  11.          for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { 
  12.              // 要判斷任務(wù)是否在執(zhí)行 這塊需要好好想想。 任務(wù)列表現(xiàn)在是無(wú)序的。變化過(guò)程可能有問(wèn)題 
  13.              executorService.execute(new LongPollingRunnable(i)); 
  14.          } 
  15.          currentLongingTaskCount = longingTaskCount; 
  16.      } 
  17.  } 

這里主要是先去拿緩存中 Map

現(xiàn)在我們來(lái)看看 LongPollingRunnable 做了什么,主要分為兩部分,

  • 第一部分是檢查本地的配置信息,
  • 第二部分是獲取服務(wù)端的配置信息然后更新到本地。

1.本地檢查

首先取出與該 taskId 相關(guān)的 CacheData,然后對(duì) CacheData 進(jìn)行檢查,包括本地配置檢查和緩存數(shù)據(jù)的 md5 檢查,本地檢查主要是做一個(gè)故障容錯(cuò),當(dāng)服務(wù)端掛掉后,Nacos 客戶端可以從本地的文件系統(tǒng)中獲取相關(guān)的配置信息,如下圖所示:

  1. public void run() { 
  2.              
  3.             List<CacheData> cacheDatas = new ArrayList<CacheData>(); 
  4.             List<String> inInitializingCacheList = new ArrayList<String>(); 
  5.             try { 
  6.                 // 
  7.                 for (CacheData cacheData : cacheMap.get().values()) { 
  8.                     if (cacheData.getTaskId() == taskId) { 
  9.                         cacheDatas.add(cacheData); 
  10.                         try { 
  11.                             //執(zhí)行檢查本地配置 
  12.                             checkLocalConfig(cacheData); 
  13.                             if (cacheData.isUseLocalConfigInfo()) { 
  14.                                 //緩存數(shù)據(jù)的md5的檢查 
  15.                                 cacheData.checkListenerMd5(); 
  16.                             } 
  17.                         } catch (Exception e) { 
  18.                             LOGGER.error("get local config info error", e); 
  19.                         } 
  20.                     } 
  21.                 } 
  22.                
  23.         } 

  1. //檢查本地配置 
  2.      
  3. private void checkLocalConfig(CacheData cacheData) { 
  4.         final String dataId = cacheData.dataId; 
  5.         final String group = cacheData.group
  6.         final String tenant = cacheData.tenant; 
  7.     //本地緩存文件 
  8.         File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant); 
  9.         //不使用本地配置,但是持久化文件存在,需要讀取文件加載至內(nèi)存 
  10.         if (!cacheData.isUseLocalConfigInfo() && path.exists()) { 
  11.             String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); 
  12.             final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE); 
  13.             cacheData.setUseLocalConfigInfo(true); 
  14.             cacheData.setLocalConfigInfoVersion(path.lastModified()); 
  15.             cacheData.setContent(content); 
  16.              
  17.             LOGGER.warn( 
  18.                     "[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}"
  19.                     agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); 
  20.             return
  21.         } 
  22.          
  23.        // 有 -> 沒(méi)有。不通知業(yè)務(wù)監(jiān)聽器,從server拿到配置后通知。 
  24.         //使用本地配置,但是持久化文件不存在  
  25.         if (cacheData.isUseLocalConfigInfo() && !path.exists()) { 
  26.             cacheData.setUseLocalConfigInfo(false); 
  27.             LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(), 
  28.                     dataId, group, tenant); 
  29.             return
  30.         } 
  31.          
  32.         // 有變更 
  33.         //使用本地配置,持久化文件存在,緩存跟文件最后修改時(shí)間不一致 
  34.         if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path 
  35.                 .lastModified()) { 
  36.             String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); 
  37.             final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE); 
  38.             cacheData.setUseLocalConfigInfo(true); 
  39.             cacheData.setLocalConfigInfoVersion(path.lastModified()); 
  40.             cacheData.setContent(content); 
  41.             LOGGER.warn( 
  42.                     "[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}"
  43.                     agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); 
  44.         } 
  45.     } 

本地檢查主要是通過(guò)是否使用本地配置,繼而尋找持久化緩存文件,再通過(guò)判斷文件的最后修改事件與本地緩存的版本是否一致來(lái)判斷是否由變更

通過(guò)跟蹤 checkLocalConfig 方法,可以看到 Nacos 將緩存配置信息保存在了

  1. ~/nacos/config/fixed-{address}_8848_nacos/snapshot/DEFAULT_GROUP/{dataId} 

這個(gè)文件中,我們看下這個(gè)文件中保存的內(nèi)容,如下圖所示:

從Nacos客戶端視角來(lái)分析一下配置中心實(shí)現(xiàn)原理

2.服務(wù)端檢查

然后通過(guò) checkUpdateDataIds() 方法從服務(wù)端獲取值變化的 dataId 列表,

通過(guò) getServerConfig 方法,根據(jù) dataId 到服務(wù)端獲取最新的配置信息,接著將最新的配置信息保存到 CacheData 中。

最后調(diào)用 CacheData 的 checkListenerMd5 方法,可以看到該方法在第一部分也被調(diào)用過(guò),我們需要重點(diǎn)關(guān)注一下。

  1. // 檢查服務(wù)器配置 
  2.   List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); 
  3.   if (!CollectionUtils.isEmpty(changedGroupKeys)) { 
  4.       LOGGER.info("get changedGroupKeys:" + changedGroupKeys); 
  5.   } 
  6.    
  7.   for (String groupKey : changedGroupKeys) { 
  8.       String[] key = GroupKey.parseKey(groupKey); 
  9.       String dataId = key[0]; 
  10.       String group = key[1]; 
  11.       String tenant = null
  12.       if (key.length == 3) { 
  13.           tenant = key[2]; 
  14.       } 
  15.       try { 
  16.           //從服務(wù)器端獲取相關(guān)id的最新配置 
  17.           String[] ct = getServerConfig(dataId, group, tenant, 3000L); 
  18.           CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); 
  19.           cache.setContent(ct[0]); 
  20.           if (null != ct[1]) { 
  21.               cache.setType(ct[1]); 
  22.           } 
  23.           LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}"
  24.                   agent.getName(), dataId, group, tenant, cache.getMd5(), 
  25.                   ContentUtils.truncateContent(ct[0]), ct[1]); 
  26.       } catch (NacosException ioe) { 
  27.           String message = String 
  28.                   .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s"
  29.                           agent.getName(), dataId, group, tenant); 
  30.           LOGGER.error(message, ioe); 
  31.       } 
  32.   } 
  33.   for (CacheData cacheData : cacheDatas) { 
  34.       if (!cacheData.isInitializing() || inInitializingCacheList 
  35.               .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { 
  36.           //校驗(yàn)MD5值 
  37.           cacheData.checkListenerMd5(); 
  38.           cacheData.setInitializing(false); 
  39.       } 
  40.   } 
  41.   inInitializingCacheList.clear(); 
  42.    
  43.   executorService.execute(this); 
  44.    
  45. catch (Throwable e) { 
  46.    
  47.   // If the rotation training task is abnormal, the next execution time of the task will be punished 
  48.   LOGGER.error("longPolling error : ", e); 
  49.   executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS); 

這里大家也發(fā)現(xiàn),當(dāng)客戶端從服務(wù)器拉去配置文件之后,會(huì)將配置文件在本地進(jìn)行緩存,所以,一般會(huì)優(yōu)先使用本地配置,如果本地文件不存在或者內(nèi)容為空,則再通過(guò) HTTP GET 方法從遠(yuǎn)端拉取配置,并保存到本地緩存中

  1. private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException { 
  2.       group = null2defaultGroup(group); 
  3.       ParamUtils.checkKeyParam(dataId, group); 
  4.       ConfigResponse cr = new ConfigResponse(); 
  5.        
  6.       cr.setDataId(dataId); 
  7.       cr.setTenant(tenant); 
  8.       cr.setGroup(group); 
  9.        
  10.       // 優(yōu)先使用本地配置 
  11.       String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); 
  12.       if (content != null) { 
  13.           LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(), 
  14.                   dataId, group, tenant, ContentUtils.truncateContent(content)); 
  15.           cr.setContent(content); 
  16.           configFilterChainManager.doFilter(null, cr); 
  17.           content = cr.getContent(); 
  18.           return content; 
  19.       } 
  20.        
  21.       try { 
  22.           String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs); 
  23.           cr.setContent(ct[0]); 
  24.            
  25.           configFilterChainManager.doFilter(null, cr); 
  26.           content = cr.getContent(); 
  27.            
  28.           return content; 
  29.       } catch (NacosException ioe) { 
  30.           if (NacosException.NO_RIGHT == ioe.getErrCode()) { 
  31.               throw ioe; 
  32.           } 
  33.           LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}"
  34.                   agent.getName(), dataId, group, tenant, ioe.toString()); 
  35.       } 
  36.        
  37.       LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", agent.getName(), 
  38.               dataId, group, tenant, ContentUtils.truncateContent(content)); 
  39.       content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant); 
  40.       cr.setContent(content); 
  41.       configFilterChainManager.doFilter(null, cr); 
  42.       content = cr.getContent(); 
  43.       return content; 
  44.   } 

2.添加 Listener

好了現(xiàn)在我們可以為 ConfigService 來(lái)添加一個(gè) Listener 了,最終是調(diào)用了 ClientWorker 的 addTenantListeners 方法,如下圖所示:

  1. /** 
  2.  * Add listeners for tenant. 
  3.  * 
  4.  * @param dataId    dataId of data 
  5.  * @param group     group of data 
  6.  * @param listeners listeners 
  7.  * @throws NacosException nacos exception 
  8.  */ 
  9. public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) 
  10.         throws NacosException { 
  11.     //設(shè)置默認(rèn)組 
  12.     group = null2defaultGroup(group); 
  13.     String tenant = agent.getTenant(); 
  14.     CacheData cache = addCacheDataIfAbsent(dataId, group, tenant); 
  15.     for (Listener listener : listeners) { 
  16.         cache.addListener(listener); 
  17.     } 

該方法分為兩個(gè)部分,首先根據(jù) dataId,group 和tenant獲取一個(gè) CacheData 對(duì)象,然后將當(dāng)前要添加的 listener 對(duì)象添加到 CacheData 中去。

接下來(lái),我們要重點(diǎn)關(guān)注下 CacheData 類了。

3.本地緩存CacheData

首先讓我們來(lái)看一下 CacheData 中的成員變量,如下圖所示:

  1. private final String name
  2.  
  3. private final ConfigFilterChainManager configFilterChainManager; 
  4.  
  5. public final String dataId; 
  6.  
  7. public final String group
  8.  
  9. public final String tenant; 
  10. //監(jiān)聽器 
  11.  
  12. private final CopyOnWriteArrayList<ManagerListenerWrap> listeners; 
  13.  
  14. private volatile String md5; 
  15.  
  16. /** 
  17.  * whether use local config. 
  18.  */ 
  19. private volatile boolean isUseLocalConfig = false
  20.  
  21. /** 
  22.  * last modify time
  23.  */ 
  24. private volatile long localConfigLastModified; 
  25.  
  26. private volatile String content; 
  27.  
  28. private int taskId; 
  29.  
  30. private volatile boolean isInitializing = true
  31.  
  32. private String type; 

我們可以看到,成員變量包括tenant ,dataId,group,content,taskId等,還有兩個(gè)值得我們關(guān)注的:

  • listeners
  • md5

listeners 是該 CacheData 所關(guān)聯(lián)的所有 listener,不過(guò)不是保存的原始的 Listener對(duì)象,而是包裝后的 ManagerListenerWrap 對(duì)象,該對(duì)象除了持有 Listener 對(duì)象,還持有了一個(gè) lastCallMd5 和lastContent屬性。

  1. private static class ManagerListenerWrap { 
  2.       
  3.      final Listener listener; 
  4.       
  5.      //關(guān)注 
  6.      String lastCallMd5 = CacheData.getMd5String(null); 
  7.       
  8.      String lastContent = null
  9.       
  10.      ManagerListenerWrap(Listener listener) { 
  11.          this.listener = listener; 
  12.      } 
  13.       
  14.      ManagerListenerWrap(Listener listener, String md5) { 
  15.          this.listener = listener; 
  16.          this.lastCallMd5 = md5; 
  17.      } 
  18.       
  19.      ManagerListenerWrap(Listener listener, String md5, String lastContent) { 
  20.          this.listener = listener; 
  21.          this.lastCallMd5 = md5; 
  22.          this.lastContent = lastContent; 
  23.      } 
  24.       
  25.  } 

另外一個(gè)屬性 md5 就是根據(jù)當(dāng)前對(duì)象的 content 計(jì)算出來(lái)的 md5 值。

4.觸發(fā)監(jiān)聽器回調(diào)

現(xiàn)在我們對(duì) ConfigService 有了大致的了解了,現(xiàn)在剩下最后一個(gè)重要的問(wèn)題還沒(méi)有答案,那就是 ConfigService 的 Listener 是在什么時(shí)候觸發(fā)回調(diào)方法 receiveConfigInfo 的。

現(xiàn)在讓我們回過(guò)頭來(lái)想一下,在 ClientWorker 中的定時(shí)任務(wù)中,啟動(dòng)了一個(gè)長(zhǎng)輪詢的任務(wù):LongPollingRunnable,該任務(wù)多次執(zhí)行了 cacheData.checkListenerMd5() 方法,那現(xiàn)在就讓我們來(lái)看下這個(gè)方法到底做了些什么,如下圖所示:

  1. void checkListenerMd5() { 
  2.     for (ManagerListenerWrap wrap : listeners) { 
  3.         if (!md5.equals(wrap.lastCallMd5)) { 
  4.             safeNotifyListener(dataId, group, content, type, md5, wrap); 
  5.         } 
  6.     } 

到這里應(yīng)該就比較清晰了,該方法會(huì)檢查 CacheData 當(dāng)前的 md5 與 CacheData 持有的所有 Listener 中保存的 md5 的值是否一致,如果不一致,就執(zhí)行一個(gè)安全的監(jiān)聽器的通知方法:safeNotifyListener,通知什么呢?我們可以大膽的猜一下,應(yīng)該是通知 Listener 的使用者,該 Listener 所關(guān)注的配置信息已經(jīng)發(fā)生改變了。現(xiàn)在讓我們來(lái)看一下 safeNotifyListener 方法,如下圖所示:

  1. private void safeNotifyListener(final String dataId, final String group, final String content, final String type, 
  2.           final String md5, final ManagerListenerWrap listenerWrap) { 
  3.       final Listener listener = listenerWrap.listener; 
  4.        
  5.       Runnable job = new Runnable() { 
  6.           @Override 
  7.           public void run() { 
  8.               ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader(); 
  9.               ClassLoader appClassLoader = listener.getClass().getClassLoader(); 
  10.               try { 
  11.                   if (listener instanceof AbstractSharedListener) { 
  12.                       AbstractSharedListener adapter = (AbstractSharedListener) listener; 
  13.                       adapter.fillContext(dataId, group); 
  14.                       LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}"name, dataId, group, md5); 
  15.                   } 
  16.                   // 執(zhí)行回調(diào)之前先將線程classloader設(shè)置為具體webapp的classloader,以免回調(diào)方法中調(diào)用spi接口是出現(xiàn)異常或錯(cuò)用(多應(yīng)用部署才會(huì)有該問(wèn)題)。 
  17.                   Thread.currentThread().setContextClassLoader(appClassLoader); 
  18.                    
  19.                   ConfigResponse cr = new ConfigResponse(); 
  20.                   cr.setDataId(dataId); 
  21.                   cr.setGroup(group); 
  22.                   cr.setContent(content); 
  23.                    
  24.                   //重點(diǎn)關(guān)注,在這里調(diào)用 
  25.                   //重點(diǎn)關(guān)注,在這里調(diào)用 
  26.                   //重點(diǎn)關(guān)注,在這里調(diào)用 
  27.                    
  28.                   configFilterChainManager.doFilter(null, cr); 
  29.                   String contentTmp = cr.getContent(); 
  30.                   listener.receiveConfigInfo(contentTmp); 
  31.                    
  32.                    
  33.                    
  34.                    
  35.                   // compare lastContent and content 
  36.                   if (listener instanceof AbstractConfigChangeListener) { 
  37.                       Map data = ConfigChangeHandler.getInstance() 
  38.                               .parseChangeData(listenerWrap.lastContent, content, type); 
  39.                       ConfigChangeEvent event = new ConfigChangeEvent(data); 
  40.                       ((AbstractConfigChangeListener) listener).receiveConfigChange(event); 
  41.                       listenerWrap.lastContent = content; 
  42.                   } 
  43.                    
  44.                   listenerWrap.lastCallMd5 = md5; 
  45.                   LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} "name, dataId, group, md5, 
  46.                           listener); 
  47.               } catch (NacosException ex) { 
  48.                   LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}"
  49.                           name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg()); 
  50.               } catch (Throwable t) { 
  51.                   LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}"name, dataId, 
  52.                           group, md5, listener, t.getCause()); 
  53.               } finally { 
  54.                   Thread.currentThread().setContextClassLoader(myClassLoader); 
  55.               } 
  56.           } 
  57.       }; 
  58.        
  59.       final long startNotify = System.currentTimeMillis(); 
  60.       try { 
  61.           if (null != listener.getExecutor()) { 
  62.               listener.getExecutor().execute(job); 
  63.           } else { 
  64.               job.run(); 
  65.           } 
  66.       } catch (Throwable t) { 
  67.           LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}"name, dataId, 
  68.                   group, md5, listener, t.getCause()); 
  69.       } 
  70.       final long finishNotify = System.currentTimeMillis(); 
  71.       LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} "
  72.               name, (finishNotify - startNotify), dataId, group, md5, listener); 
  73.   } 

可以看到在 safeNotifyListener 方法中,重點(diǎn)關(guān)注下紅框中的三行代碼:獲取最新的配置信息,調(diào)用 Listener 的回調(diào)方法,將最新的配置信息作為參數(shù)傳入,這樣 Listener 的使用者就能接收到變更后的配置信息了,最后更新 ListenerWrap 的 md5 值。和我們猜測(cè)的一樣, Listener 的回調(diào)方法就是在該方法中觸發(fā)的。

5.Md5何時(shí)變更

那 CacheData 的 md5 值是何時(shí)發(fā)生改變的呢?我們可以回想一下,在上面的 LongPollingRunnable 所執(zhí)行的任務(wù)中,在獲取服務(wù)端發(fā)生變更的配置信息時(shí),將最新的 content 數(shù)據(jù)寫入了 CacheData 中,我們可以看下該方法如下:

  1. public void setContent(String content) { 
  2.      this.content = content; 
  3.      this.md5 = getMd5String(this.content); 
  4.  } 

可以看到是在長(zhǎng)輪詢的任務(wù)中,當(dāng)服務(wù)端配置信息發(fā)生變更時(shí),客戶端將最新的數(shù)據(jù)獲取下來(lái)之后,保存在了 CacheData 中,同時(shí)更新了該 CacheData 的 md5 值,所以當(dāng)下次執(zhí)行 checkListenerMd5 方法時(shí),就會(huì)發(fā)現(xiàn)當(dāng)前 listener 所持有的 md5 值已經(jīng)和 CacheData 的 md5 值不一樣了,也就意味著服務(wù)端的配置信息發(fā)生改變了,這時(shí)就需要將最新的數(shù)據(jù)通知給 Listener 的持有者。

至此配置中心的完整流程已經(jīng)分析完畢了,可以發(fā)現(xiàn),Nacos 并不是通過(guò)推的方式將服務(wù)端最新的配置信息發(fā)送給客戶端的,而是客戶端維護(hù)了一個(gè)長(zhǎng)輪詢的任務(wù),定時(shí)去拉取發(fā)生變更的配置信息,然后將最新的數(shù)據(jù)推送給 Listener 的持有者。

6.為什么要拉?

客戶端拉取服務(wù)端的數(shù)據(jù)與服務(wù)端推送數(shù)據(jù)給客戶端相比,優(yōu)勢(shì)在哪呢,為什么 Nacos 不設(shè)計(jì)成主動(dòng)推送數(shù)據(jù),而是要客戶端去拉取呢?如果用推的方式,服務(wù)端需要維持與客戶端的長(zhǎng)連接,這樣的話需要耗費(fèi)大量的資源,并且還需要考慮連接的有效性,例如需要通過(guò)心跳來(lái)維持兩者之間的連接。而用拉取的方式,客戶端只需要通過(guò)一個(gè)無(wú)狀態(tài)的 http 請(qǐng)求即可獲取到服務(wù)端的數(shù)據(jù)。

三 總結(jié)

 

 從Nacos客戶端視角來(lái)分析一下配置中心實(shí)現(xiàn)原理

現(xiàn)在,我們來(lái)簡(jiǎn)單復(fù)盤一下Nacos客戶端視角下的配置中心實(shí)現(xiàn)原理

首先我們假設(shè)Nacos服務(wù)端一切正常,Nacos客戶端啟動(dòng)以后

第一步是根據(jù)我們配置的服務(wù)端信息,新建 ConfigService 實(shí)例,它的實(shí)現(xiàn)就是我們文中提到的NacosConfigService;

第二步可以通過(guò)相應(yīng)的接口獲取配置和注冊(cè)配置監(jiān)聽器,

考慮到服務(wù)端故障的問(wèn)題,客戶端將最新數(shù)據(jù)獲取后會(huì)保存在本地的 緩存文件中,以后會(huì)優(yōu)先從文件中獲取配置信息的值,如果獲取不到,會(huì)直接從服務(wù)器拉去,并保存到緩存中;

其實(shí)真正干活的就是ClientWorker類;客戶端是通過(guò)一個(gè)定時(shí)的長(zhǎng)輪詢來(lái)檢查自己監(jiān)聽的配置項(xiàng)的數(shù)據(jù)的,一旦服務(wù)端的數(shù)據(jù)發(fā)生變化時(shí),會(huì)從服務(wù)端獲取到dataID的列表,

客戶端根據(jù)dataID列表從服務(wù)端獲取到最新的數(shù)據(jù),并將最新的數(shù)據(jù)保存在一個(gè) CacheData 對(duì)象中,在輪詢過(guò)程中,如果決定使用本地配置,就會(huì)比較當(dāng)前CacheData 的MD5值是否和所有監(jiān)聽者所持有的MD5值相等,如果不相等,,此時(shí)就會(huì)對(duì)該 CacheData 所綁定的 Listener 觸發(fā) receiveConfigInfo 回調(diào),來(lái)通知使用者此配置信息已經(jīng)變更;

原文地址:https://mp.weixin.qq.com/s/qmT-SsYr6yPmqEtN-4XAoQ

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 久久精品99久久久久久2456 | 天天草天天干天天 | 亚洲一区二区三区日本久久九 | 一区二区三区日韩精品 | 国产在线一级视频 | 粉嫩蜜桃麻豆免费大片 | 久久久久久久久久久av | 中国大陆高清aⅴ毛片 | 精品一区二区三区在线播放 | 国产精品99久久久久久大便 | 国产成人精品无人区一区 | 91精彩视频 | 国产免费一级淫片a级中文 99国产精品自拍 | 黄色一级片毛片 | 蜜桃一本色道久久综合亚洲精品冫 | 99r国产精品 | 销魂美女一区二区 | 中国fx性欧美xxxx | 精品国产一区二区在线观看 | 亚州精品天堂中文字幕 | fc2国产成人免费视频 | 成人免费一区 | japan护士性xxxⅹhd | 视频一区二区久久 | 欧美黄一区 | 中文字幕在线观看精品 | 中文字幕在线网 | 福利在线国产 | 国产精品成人久久 | 十级毛片| 国产一级免费片 | 欧美成人久久 | 999久久久免费视频 久久精品国产精品亚洲 | 中文在线免费观看 | 毛片在线播放视频 | 黄色特级毛片 | 亚洲一二三久久 | 香蕉视频网站在线观看 | 黄色三级三级三级 | 极品销魂一区二区三区 | 91在线视频网址 |