新聞中心
深入解析 Dubbo 3.0 服務(wù)端暴露全流程
作者:熊聘 2021-09-06 09:46:26
開(kāi)發(fā)
后端
云計(jì)算 隨著云原生時(shí)代的到來(lái),Dubbo 3.0 的一個(gè)很重要的目標(biāo)就是全面擁抱云原生。正因如此,Dubbo 3.0 為了能夠更好的適配云原生,將原來(lái)的接口級(jí)服務(wù)發(fā)現(xiàn)機(jī)制演進(jìn)為應(yīng)用級(jí)服務(wù)發(fā)現(xiàn)機(jī)制。

創(chuàng)新互聯(lián)公司專(zhuān)注于企業(yè)全網(wǎng)營(yíng)銷(xiāo)推廣、網(wǎng)站重做改版、桂陽(yáng)網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、H5建站、成都商城網(wǎng)站開(kāi)發(fā)、集團(tuán)公司官網(wǎng)建設(shè)、成都外貿(mào)網(wǎng)站建設(shè)公司、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁(yè)設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性?xún)r(jià)比高,為桂陽(yáng)等各大城市提供網(wǎng)站開(kāi)發(fā)制作服務(wù)。
背景
隨著云原生時(shí)代的到來(lái),Dubbo 3.0 的一個(gè)很重要的目標(biāo)就是全面擁抱云原生。正因如此,Dubbo 3.0 為了能夠更好的適配云原生,將原來(lái)的接口級(jí)服務(wù)發(fā)現(xiàn)機(jī)制演進(jìn)為應(yīng)用級(jí)服務(wù)發(fā)現(xiàn)機(jī)制。
基于應(yīng)用級(jí)服務(wù)發(fā)現(xiàn)機(jī)制,Dubbo 3.0 能大幅降低框架帶來(lái)的額外資源消耗,大幅提升資源利用率,主要體現(xiàn)在:
單機(jī)常駐內(nèi)存下降 75%
能支持的集群實(shí)例規(guī)模以百萬(wàn)計(jì)的集群
注冊(cè)中心總體數(shù)據(jù)量下降超 90%
目前關(guān)于 Dubbo 服務(wù)端暴露流程的技術(shù)文章很多,但是都是基于 Dubbo 接口級(jí)服務(wù)發(fā)現(xiàn)機(jī)制來(lái)解讀的。在 Dubbo 3.0 的應(yīng)用級(jí)服務(wù)發(fā)現(xiàn)機(jī)制下,服務(wù)端暴露流程與之前有很大的變化,本文希望可以通過(guò) 對(duì)Dubbo 3.0 源碼理解來(lái)解析服務(wù)端暴露全流程。
什么是應(yīng)用級(jí)服務(wù)發(fā)現(xiàn)
簡(jiǎn)單來(lái)說(shuō),以前 Dubbo 是將接口的信息全部注冊(cè)到注冊(cè)中心,而一個(gè)應(yīng)用實(shí)例一般會(huì)存在多個(gè)接口,這樣一來(lái)注冊(cè)的數(shù)據(jù)量就要大很多,而且有冗余。應(yīng)用級(jí)服務(wù)發(fā)現(xiàn)的機(jī)制是同一個(gè)應(yīng)用實(shí)例僅在注冊(cè)中心注冊(cè)一條數(shù)據(jù),這種機(jī)制主要解決以下幾個(gè)問(wèn)題:
對(duì)齊主流微服務(wù)模型,如:Spring Cloud
支持 Kubernetes native service,Kubernetes 中維護(hù)調(diào)度的服務(wù)都是基于應(yīng)用實(shí)例級(jí),不支持接口級(jí)
減少注冊(cè)中心數(shù)據(jù)存儲(chǔ)能力,降低了地址變更推送的壓力
假設(shè)應(yīng)用 dubbo-application 部署了 3 個(gè)實(shí)例(instance1, instance2, instance3),并且對(duì)外提供了 3 個(gè)接口(sayHello, echo, getVersion)分別設(shè)置了不同的超時(shí)時(shí)間。在接口級(jí)和應(yīng)用級(jí)服務(wù)發(fā)現(xiàn)機(jī)制下,注冊(cè)到注冊(cè)中心的數(shù)據(jù)是截然不同的。如下圖所示:
接口級(jí)服務(wù)發(fā)現(xiàn)機(jī)制下注冊(cè)中心中的數(shù)據(jù)
- "sayHello": [ {"application":"dubbo-application","name":"instance1", "ip":"127.0.0.1", "metadata":{"timeout":1000}}, {"application":"dubbo-application","name":"instance2", "ip":"127.0.0.2", "metadata":{"timeout":2000}}, {"application":"dubbo-application","name":"instance3", "ip":"127.0.0.3", "metadata":{"timeout":3000}},],"echo": [ {"application":"dubbo-application","name":"instance1", "ip":"127.0.0.1", "metadata":{"timeout":1000}}, {"application":"dubbo-application","name":"instance2", "ip":"127.0.0.2", "metadata":{"timeout":2000}}, {"application":"dubbo-application","name":"instance3", "ip":"127.0.0.3", "metadata":{"timeout":3000}},],"getVersion": [ {"application":"dubbo-application","name":"instance1", "ip":"127.0.0.1", "metadata":{"timeout":1000}}, {"application":"dubbo-application","name":"instance2", "ip":"127.0.0.2", "metadata":{"timeout":2000}}, {"application":"dubbo-application","name":"instance3", "ip":"127.0.0.3", "metadata":{"timeout":3000}}]
應(yīng)用級(jí)服務(wù)發(fā)現(xiàn)機(jī)制下注冊(cè)中心中的數(shù)據(jù)
- "dubbo-application": [ {"name":"instance1", "ip":"127.0.0.1", "metadata":{"timeout":1000}}, {"name":"instance2", "ip":"127.0.0.2", "metadata":{"timeout":2000}}, {"name":"instance3", "ip":"127.0.0.3", "metadata":{"timeout":3000}}]
通過(guò)對(duì)比我們可以發(fā)現(xiàn),采用應(yīng)用級(jí)服務(wù)發(fā)現(xiàn)機(jī)制確實(shí)使注冊(cè)中心中的數(shù)據(jù)量減少了很多,那些原有的接口級(jí)的數(shù)據(jù)存儲(chǔ)在元數(shù)據(jù)中心中。
服務(wù)端暴露全流程
引入應(yīng)用級(jí)服務(wù)發(fā)現(xiàn)機(jī)制以后,Dubbo 3.0 服務(wù)端暴露全流程和之前有很大的區(qū)別。暴露服務(wù)端全流程的核心代碼在 DubboBootstrap#doStart 中,具體如下:
- private void doStart() { // 1. 暴露Dubbo服務(wù) exportServices(); // If register consumer instance or has exported services if (isRegisterConsumerInstance() || hasExportedServices()) { // 2. 暴露元數(shù)據(jù)服務(wù) exportMetadataService(); // 3. 定時(shí)更新和上報(bào)元數(shù)據(jù) registerServiceInstance(); .... } ......}
假設(shè)以 Zookeeper 作為注冊(cè)中,對(duì)外暴露 Triple 協(xié)議的服務(wù)為例,服務(wù)端暴露全流程時(shí)序圖如下:
我們可以看到,整個(gè)的暴露流程還是挺復(fù)雜的,一共可以分為四個(gè)部分:
暴露 injvm 協(xié)議的服務(wù)
注冊(cè) service-discovery-registry 協(xié)議
暴露 Triple 協(xié)議的服務(wù)并注冊(cè) registry 協(xié)議
暴露 MetadataService 服務(wù)
下面會(huì)分別從這四個(gè)部分對(duì)服務(wù)暴露全流程進(jìn)行詳細(xì)講解。
1、暴露 injvm 協(xié)議的服務(wù)
injvm 協(xié)議的服務(wù)是暴露在本地的,主要原因是在一個(gè)應(yīng)用上往往既有 Service(暴露服務(wù))又有 Reference(服務(wù)引用)的情況存在,并且 Reference 引用的服務(wù)就是在該應(yīng)用上暴露的 Service。為了支持這種使用場(chǎng)景,Dubbo 提供了 injvm 協(xié)議,將 Service 暴露在本地,Reference 就可以不需要走網(wǎng)絡(luò)直接在本地調(diào)用 Service。
整體時(shí)序圖
由于這部分內(nèi)容在之前的接口級(jí)服務(wù)發(fā)現(xiàn)機(jī)制中是類(lèi)似的,所以相關(guān)的核心代碼就不在這里展開(kāi)討論了。
2、注冊(cè) service-discovery-registry 協(xié)議
注冊(cè) service-discovery-registry 協(xié)議的核心目的是為了注冊(cè)與服務(wù)相關(guān)的元數(shù)據(jù),默認(rèn)情況下元數(shù)據(jù)通過(guò) InMemoryWritableMetadataService 將數(shù)據(jù)存儲(chǔ)在本地內(nèi)存和本地文件。
整體時(shí)序圖
核心代碼在 ServiceConfig#exportRemote 中,具體如下:
注冊(cè) service-discovery-registry 協(xié)議的入口
- private URL exportRemote(URL url, List
registryURLs) { if (CollectionUtils.isNotEmpty(registryURLs)) { // 如果是多個(gè)注冊(cè)中心,通過(guò)循環(huán)對(duì)每個(gè)注冊(cè)中心進(jìn)行注冊(cè) for (URL registryURL : registryURLs) { // 判斷是否是service-discovery-registry協(xié)議 // 將service-name-mapping參數(shù)的值設(shè)置為true if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) { url = url.addParameterIfAbsent(SERVICE_NAME_MAPPING_KEY, "true"); } ...... // 注冊(cè)service-discovery-registry協(xié)議復(fù)用服務(wù)暴露流程 doExportUrl(registryURL.putAttribute(EXPORT_KEY, url), true); } ...... return url;}
invoker 中包裝 Metadata
核心代碼在 ServiceConfig#doExportUrl 中,具體如下:
- private void doExportUrl(URL url, boolean withMetaData) { Invoker> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url); // 此時(shí)的withMetaData的值為true // 將invoker包裝成DelegateProviderMetaDataInvoker if (withMetaData) { invoker = new DelegateProviderMetaDataInvoker(invoker, this); } Exporter> exporter = PROTOCOL.export(invoker); exporters.add(exporter);}
通過(guò) RegistryProtocol 將 Invoker 轉(zhuǎn)化成 Exporter
核心代碼在 ProtocolListenerWrapper#export 中,具體如下:
- public
Exporter export(Invoker invoker) throws RpcException { // 此時(shí)的protocol為RegistryProtocol類(lèi)型 if (UrlUtils.isRegistry(invoker.getUrl())) { return protocol.export(invoker); } ......}
RegistryProtocol 將 Invoker 轉(zhuǎn)化成 Exporter 的核心流程
核心代碼在 RegistryProtocol#export 中,具體如下:
- public
Exporter export(final Invoker originInvoker) throws RpcException { URL registryUrl = getRegistryUrl(originInvoker); URL providerUrl = getProviderUrl(originInvoker); ...... // 再次暴露Triple協(xié)議的服務(wù) final ExporterChangeableWrapper exporter = doLocalExport(originInvoker, providerUrl); // registryUrl中包含service-discovery-registry協(xié)議 // 通過(guò)該協(xié)議創(chuàng)建ServiceDiscoveryRegistry對(duì)象 // 然后組合RegistryServiceListener監(jiān)聽(tīng)器, // 最后包裝成ListenerRegistryWrapper對(duì)象 final Registry registry = getRegistry(registryUrl); final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl); boolean register = providerUrl.getParameter(REGISTER_KEY, true); if (register) { // 注冊(cè)service-discovery-registry協(xié)議 // 觸發(fā)RegistryServiceListener的onRegister事件 register(registry, registeredProviderUrl); } ...... // 觸發(fā)RegistryServiceListener的onRegister事件 notifyExport(exporter); return new DestroyableExporter<>(exporter);}
暴露 Triple 協(xié)議的服務(wù)
核心代碼在 RegistryProtocol#doLocalExport 中,具體如下:
- private
ExporterChangeableWrapper doLocalExport(final Invoker originInvoker, URL providerUrl) { String key = getCacheKey(originInvoker); // 此時(shí)的protocol為T(mén)riple協(xié)議的代理類(lèi) // 和暴露injvm協(xié)議的PROTOCOL相同 return (ExporterChangeableWrapper ) bounds.computeIfAbsent(key, s -> { Invoker> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl); return new ExporterChangeableWrapper<>((Exporter ) protocol.export(invokerDelegate), originInvoker); });}
注冊(cè)service-discovery-registry協(xié)議
核心代碼在 ServiceDiscoveryRegistry#register和ServiceDiscoveryRegistry#doRegister 中,具體如下:
1、ServiceDiscoveryRegistry#register
- public final void register(URL url) { // 只有服務(wù)端(Provider)才需要注冊(cè) if (!shouldRegister(url)) { return; } // 注冊(cè)service-discovery-registry協(xié)議 doRegister(url);}
2、ServiceDiscoveryRegistry#doRegister
- public void doRegister(URL url) { url = addRegistryClusterKey(url); // 注冊(cè)元數(shù)據(jù) if (writableMetadataService.exportURL(url)) { if (logger.isInfoEnabled()) { logger.info(format("The URL[%s] registered successfully.", url.toString())); } } else { if (logger.isWarnEnabled()) { logger.warn(format("The URL[%s] has been registered.", url.toString())); } }}
注冊(cè)元數(shù)據(jù)
核心代碼在 InMemoryWritableMetadataService#exportURL 中,具體如下:
- public boolean exportURL(URL url) { // 如果是MetadataService,則不注冊(cè)元數(shù)據(jù) if (MetadataService.class.getName().equals(url.getServiceInterface())) { this.metadataServiceURL = url; return true; } updateLock.readLock().lock(); try { String[] clusters = getRegistryCluster(url).split(","); for (String cluster : clusters) { MetadataInfo metadataInfo = metadataInfos.computeIfAbsent(cluster, k -> new MetadataInfo(ApplicationModel.getName())); // 將Triple協(xié)議的服務(wù)中接口相關(guān)的數(shù)據(jù)生成ServiceInfo // 將ServiceInfo注冊(cè)到MetadataInfo中 metadataInfo.addService(new ServiceInfo(url)); } metadataSemaphore.release(); return addURL(exportedServiceURLs, url); } finally { updateLock.readLock().unlock(); }}
發(fā)布 onRegister 事件
核心代碼在 ListenerRegistryWrapper#register 中,具體如下:
- public void register(URL url) { try { // registry為ServiceDiscoveryRegistry對(duì)象 // 此時(shí)已經(jīng)調(diào)用完ServiceDiscoveryRegistry#registry方法 registry.register(url); } finally { if (CollectionUtils.isNotEmpty(listeners) && !UrlUtils.isConsumer(url)) { RuntimeException exception = null; for (RegistryServiceListener listener : listeners) { if (listener != null) { try { // 注冊(cè)完service-discovery-registry協(xié)議后發(fā)布o(jì)nRegister事件 listener.onRegister(url, registry); } catch (RuntimeException t) { logger.error(t.getMessage(), t); exception = t; } } } if (exception != null) { throw exception; } } }}
發(fā)布服務(wù)注冊(cè)事件
核心代碼在 RegistryProtocol#notifyExport 中,具體如下:
- private
void notifyExport(ExporterChangeableWrapper exporter) { List listeners = ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class) .getActivateExtension(exporter.getOriginInvoker().getUrl(), "registry.protocol.listener"); if (CollectionUtils.isNotEmpty(listeners)) { for (RegistryProtocolListener listener : listeners) { // 發(fā)布RegistryProtocolListener的onExport事件 listener.onExport(this, exporter); } }}
我們可以看出注冊(cè) service-discovery-registry 協(xié)議的核心目的是為了將服務(wù)的接口相關(guān)的信息存儲(chǔ)在內(nèi)存中。從兼容性和平滑遷移兩方面來(lái)考慮,社區(qū)在實(shí)現(xiàn)的時(shí)候采取復(fù)用 ServiceConfig 的暴露流程的方式。
3、暴露Triple協(xié)議服務(wù)并注冊(cè)registry協(xié)議
暴露 Triple 協(xié)議的服務(wù)并注冊(cè) registry 協(xié)議是 Dubbo 3.0 服務(wù)暴露的核心流程,一共分為兩部分:
暴露 Triple 協(xié)議的服務(wù)
注冊(cè) registry 協(xié)議
由于暴露 Triple 協(xié)議服務(wù)的流程和暴露 Injvm 協(xié)議服務(wù)的流程是一致的,所以不再贅述。注冊(cè) registry 協(xié)議的過(guò)程僅僅注冊(cè)了應(yīng)用實(shí)例相關(guān)的信息,也就是之前提到的應(yīng)用級(jí)服務(wù)發(fā)現(xiàn)機(jī)制。
整體時(shí)序圖
通過(guò) InterfaceCompatibleRegistryProtocol 將 Invoker 轉(zhuǎn)化成 Exporter
核心代碼在 ProtocolListenerWrapper#export 中,具體如下:
- public
Exporter export(Invoker invoker) throws RpcException { // 此時(shí)的protocol為InterfaceCompatibleRegistryProtocol類(lèi)型(繼承了RegistryProtocol) // 注意:在注冊(cè)service-discovery-registry協(xié)議的時(shí)候protocol為RegistryProtocol類(lèi)型 if (UrlUtils.isRegistry(invoker.getUrl())) { return protocol.export(invoker); } ......}
RegistryProtocol 將 Invoker 轉(zhuǎn)化成 Exporter 的核心流程
核心代碼在 RegistryProtocol#export 中,具體如下:
- public
Exporter export(final Invoker originInvoker) throws RpcException { URL registryUrl = getRegistryUrl(originInvoker); URL providerUrl = getProviderUrl(originInvoker); ...... // 再次暴露Triple協(xié)議的服務(wù) final ExporterChangeableWrapper exporter = doLocalExport(originInvoker, providerUrl); // registryUrl中包含registry協(xié)議 // 通過(guò)該協(xié)議創(chuàng)建ZookeeperRegistry對(duì)象 // 然后組合RegistryServiceListener監(jiān)聽(tīng)器, // 最后包裝成ListenerRegistryWrapper對(duì)象 // 注意: // 1. service-discovery-registry協(xié)議對(duì)應(yīng)的是ServiceDiscoveryRegistry // 2. registry協(xié)議對(duì)應(yīng)的是ZookeeperRegistry final Registry registry = getRegistry(registryUrl); final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl); boolean register = providerUrl.getParameter(REGISTER_KEY, true); if (register) { // 注冊(cè)registry協(xié)議 // 觸發(fā)RegistryServiceListener的onRegister事件 register(registry, registeredProviderUrl); } ...... // 發(fā)布RegistryProtocolListener的onExport事件 notifyExport(exporter); return new DestroyableExporter<>(exporter);}
注冊(cè) registry 協(xié)議
核心代碼在 FailbackRegistry#register 和 ServiceDiscoveryRegistry#doRegister 中(ZookeeperRegistry 繼承 FailbackRegistry)中,具體如下:
1、FailbackRegistry#register
- public void register(URL url) { if (!acceptable(url)) { ...... try { // 注冊(cè)registry協(xié)議 doRegister(url); } catch (Exception e) { ...... } }}
2、ServiceDiscoveryRegistry#doRegister
- public void doRegister(URL url) { try { // 在zookeeper上注冊(cè)Provider // 目錄:/dubbo/xxxService/providers/*** // 數(shù)據(jù):dubbo://192.168.31.167:20800/xxxService?anyhost=true& // application=application-name&async=false&deprecated=false&dubbo=2.0.2& // dynamic=true&file.cache=false&generic=false&interface=xxxService& // metadata-type=remote&methods=hello&pid=82470&release=& // service-name-mapping=true&side=provider×tamp=1629588251493 zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); }}
訂閱地址變更
核心代碼在 FailbackRegistry#subscribe 和 ZookeeperRegistry#doSubscribe 中,具體如下:
1、FailbackRegistry#subscribe
- public void subscribe(URL url, NotifyListener listener) { ...... try { // 調(diào)用ZookeeperRegistry#doSubscribe doSubscribe(url, listener); } catch (Exception e) { ......}
2、ZookeeperRegistry#doSubscribe
- public void doSubscribe(final URL url, final NotifyListener listener) { try { if (ANY_VALUE.equals(url.getServiceInterface())) { ...... } else { ...... for (String path : toCategoriesPath(url)) { ConcurrentMap
listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>()); ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, path, k, latch)); if (zkListener instanceof RegistryChildListenerImpl) { ((RegistryChildListenerImpl) zkListener).setLatch(latch); } // 創(chuàng)建臨時(shí)節(jié)點(diǎn)用來(lái)存儲(chǔ)configurators數(shù)據(jù) // 目錄:/dubbo/xxxService/configurators // 數(shù)據(jù):應(yīng)用的配置信息,可以在dubbo-admin中進(jìn)行修改,默認(rèn)為空 zkClient.create(path, false); // 添加監(jiān)聽(tīng)器,用來(lái)監(jiān)聽(tīng)configurators中的變化 List children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } ...... } } catch (Throwable e) { ...... }
建立暴露的 Triple 協(xié)議服務(wù)與 Metadata 之間的聯(lián)系
核心代碼在 ServiceConfig#exportUrl、MetadataUtils#publishServiceDefinition、InMemoryWritableMetadataService#publishServiceDefinition、RemoteMetadataServiceImpl#publishServiceDefinition 和 MetadataReport#storeProviderMetadata 中,具體如下:
1、ServiceConfig#exportUrl
- private void exportUrl(URL url, List
registryURLs) { ...... if (!SCOPE_NONE.equalsIgnoreCase(scope)) { ...... if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) { url = exportRemote(url, registryURLs); // 發(fā)布事件,更新服務(wù)接口相關(guān)的數(shù)據(jù) MetadataUtils.publishServiceDefinition(url); } } ......}
2、MetadataUtils#publishServiceDefinition
- public static void publishServiceDefinition(URL url) { // 將服務(wù)接口相關(guān)的數(shù)據(jù)存在到InMemoryWritableMetadataService中 WritableMetadataService.getDefaultExtension().publishServiceDefinition(url); // 將服務(wù)接口相關(guān)的數(shù)據(jù)存在到遠(yuǎn)端的元數(shù)據(jù)中心 if (REMOTE_METADATA_STORAGE_TYPE.equalsIgnoreCase(url.getParameter(METADATA_KEY))) { getRemoteMetadataService().publishServiceDefinition(url); }}
3、InMemoryWritableMetadataService#publishServiceDefinition
- public void publishServiceDefinition(URL url) { try { String interfaceName = url.getServiceInterface(); if (StringUtils.isNotEmpty(interfaceName) && !ProtocolUtils.isGeneric(url.getParameter(GENERIC_KEY))) { Class interfaceClass = Class.forName(interfaceName); ServiceDefinition serviceDefinition = ServiceDefinitionBuilder.build(interfaceClass); Gson gson = new Gson(); String data = gson.toJson(serviceDefinition); // 存儲(chǔ)服務(wù)接口相關(guān)數(shù)據(jù) // 數(shù)據(jù)格式: // { // "canonicalName": "xxxService", // "codeSource": "file:/Users/xxxx", // "methods": [{ // "name": "hello", // "parameterTypes": ["java.lang.String"], // "returnType": "java.lang.String", // "annotations": [] // }], // "types": [{ // "type": "java.lang.String" // }], // "annotations": [] // } serviceDefinitions.put(url.getServiceKey(), data); return; } else if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) { ...... } ...... } catch (Throwable e) { ...... }}
4、RemoteMetadataServiceImpl#publishServiceDefinition
- public void publishServiceDefinition(URL url) { checkRemoteConfigured(); String side = url.getSide(); if (PROVIDER_SIDE.equalsIgnoreCase(side)) { // 發(fā)布服務(wù)端(Provider)的服務(wù)接口信息到元數(shù)據(jù)中心 publishProvider(url); } else { ...... }}RemoteMetadataServiceImpl#publishProviderprivate void publishProvider(URL providerUrl) throws RpcException { ...... try { String interfaceName = providerUrl.getServiceInterface(); if (StringUtils.isNotEmpty(interfaceName)) { ...... for (Map.Entry
entry : getMetadataReports().entrySet()) { // 獲取MetadataReport服務(wù),該服務(wù)用來(lái)訪問(wèn)元數(shù)據(jù)中心 MetadataReport metadataReport = entry.getValue(); // 將服務(wù)接口信息存儲(chǔ)到元數(shù)據(jù)中心 metadataReport.storeProviderMetadata(new MetadataIdentifier(providerUrl.getServiceInterface(), providerUrl.getVersion(), providerUrl.getGroup(), PROVIDER_SIDE, providerUrl.getApplication()), fullServiceDefinition); } return; } ...... } catch (ClassNotFoundException e) { ...... }}
5、AbstractMetadataReport#storeProviderMetadata
- public void storeProviderMetadata(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition){ if (syncReport) { storeProviderMetadataTask(providerMetadataIdentifier, serviceDefinition); } else { // 異步存儲(chǔ)到元數(shù)據(jù)中心 reportCacheExecutor.execute(() -> storeProviderMetadataTask(providerMetadataIdentifier, serviceDefinition)); }}private void storeProviderMetadataTask(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) { try { ...... allMetadataReports.put(providerMetadataIdentifier, serviceDefinition); failedReports.remove(providerMetadataIdentifier); Gson gson = new Gson(); // data的數(shù)據(jù)格式: // { // "parameters": { // "side": "provider", // "interface": "xxxService", // "metadata-type": "remote", // "service-name-mapping": "true", // }, // "canonicalName": "xxxService", // "codeSource": "file:/Users/xxxx", // "methods": [{ // "name": "hello", // "parameterTypes": ["java.lang.String"], // "returnType": "java.lang.String", // "annotations": [] // }], // "types": [{ // "type": "java.lang.String" // }], // "annotations": [] // } String data = gson.toJson(serviceDefinition); // 存儲(chǔ)到元數(shù)據(jù)中心,實(shí)例中的元數(shù)據(jù)中心是ZookeeperMetadataReport // 目錄:元數(shù)據(jù)中心Metadata-report的/dubbo/metadata/xxxService/provider/${application-name}節(jié)點(diǎn)下 doStoreProviderMetadata(providerMetadataIdentifier, data); // 存儲(chǔ)到本地文件 // 路徑:xxxService:::provider:${application-name} saveProperties(providerMetadataIdentifier, data, true, !syncReport); } catch (Exception e) { ...... }}
建立 Triple 協(xié)議服務(wù)與 MetadataReport 服務(wù)之間的關(guān)系
核心代碼在 ServiceConfig#exported、MetadataServiceNameMapping#map 和 ZookeeperMetadataReport#registerServiceAppMapping 中,具體如下:
1、ServiceConfig#exported
- protected void exported() { exported = true; List
exportedURLs = this.getExportedUrls(); exportedURLs.forEach(url -> { // 判斷URL中是否標(biāo)記有service-name-mapping的字段 // 標(biāo)記有該字段的服務(wù)是需要將暴露的服務(wù)與元數(shù)據(jù)中心關(guān)聯(lián)起來(lái) // Consumer可以通過(guò)元數(shù)據(jù)中心的消息變更感知到Provider端元數(shù)據(jù)的變更 if (url.getParameters().containsKey(SERVICE_NAME_MAPPING_KEY)) { ServiceNameMapping serviceNameMapping = ServiceNameMapping.getDefaultExtension(); // 建立關(guān)系 serviceNameMapping.map(url); } }); onExported();}
2、MetadataServiceNameMapping#map
- public void map(URL url) { execute(() -> { String registryCluster = getRegistryCluster(url); // 獲取MetadataReport,也就是元數(shù)據(jù)中心的訪問(wèn)路徑 MetadataReport metadataReport = MetadataReportInstance.getMetadataReport(registryCluster); ...... int currentRetryTimes = 1; boolean success; String newConfigContent = getName(); do { // 獲取元數(shù)據(jù)中心中存儲(chǔ)的應(yīng)用的版本信息 ConfigItem configItem = metadataReport.getConfigItem(serviceInterface, DEFAULT_MAPPING_GROUP); String oldConfigContent = configItem.getContent(); if (StringUtils.isNotEmpty(oldConfigContent)) { boolean contains = StringUtils.isContains(oldConfigContent, getName()); if (contains) { break; } newConfigContent = oldConfigContent + COMMA_SEPARATOR + getName(); } // 在元數(shù)據(jù)中心創(chuàng)建mapping節(jié)點(diǎn),并將暴露的服務(wù)數(shù)據(jù)存到元數(shù)據(jù)中心,這里的元數(shù)據(jù)中心用zookeeper實(shí)現(xiàn)的 // 目錄:/dubbo/mapping/xxxService // 數(shù)據(jù):configItem.content為${application-name},configItem.ticket為版本好 success = metadataReport.registerServiceAppMapping(serviceInterface, DEFAULT_MAPPING_GROUP, newConfigContent, configItem.getTicket()); } while (!success && currentRetryTimes++ <= CAS_RETRY_TIMES); });}
3、ZookeeperMetadataReport#registerServiceAppMapping
- public boolean registerServiceAppMapping(String key, String group, String content, Object ticket) { try { if (ticket != null && !(ticket instanceof Stat)) { throw new IllegalArgumentException("zookeeper publishConfigCas requires stat type ticket"); } String pathKey = buildPathKey(group, key); // 1. 創(chuàng)建/dubbo/mapping/xxxService目錄,存儲(chǔ)的數(shù)據(jù)為configItem // 2. 生成版本號(hào) zkClient.createOrUpdate(pathKey, content, false, ticket == null ? 0 : ((Stat) ticket).getVersion()); return true; } catch (Exception e) { logger.warn("zookeeper publishConfigCas failed.", e); return false; }}
到這里,暴露Triple協(xié)議的服務(wù)并注冊(cè) registry 協(xié)議的流程就結(jié)束了。主要是將以前接口級(jí)服務(wù)發(fā)現(xiàn)機(jī)制中注冊(cè)到注冊(cè)中心中的數(shù)據(jù)(應(yīng)用實(shí)例數(shù)據(jù)+服務(wù)接口數(shù)據(jù))拆分出來(lái)了。注冊(cè) registry 協(xié)議部分將應(yīng)用實(shí)例數(shù)據(jù)注冊(cè)到注冊(cè)中心,在 Exporter 暴露完以后通過(guò)調(diào)用 MetadataUtils#publishServiceDefinition 將服務(wù)接口數(shù)據(jù)注冊(cè)到元數(shù)據(jù)中心。
4、暴露MetadataService服務(wù)
MetadataService 主要是對(duì) Consumer 側(cè)提供一個(gè)可以獲取元數(shù)據(jù)的 API,暴露流程是復(fù)用了 Triple 協(xié)議的服務(wù)暴露流程
整體時(shí)序圖
暴露 MetadataService 的入口
核心代碼在 DubboBootstrap#exportMetadataService 中,具體如下:
- private void exportMetadataService() { // 暴露MetadataServer metadataServiceExporter.export();}
暴露 MetadataService
核心代碼在 ConfigurableMetadataServiceExporter#export 中,具體如下:
- public ConfigurableMetadataServiceExporter export() { if (!isExported()) { // 定義MetadataService的ServiceConfig ServiceConfig
serviceConfig = new ServiceConfig<>(); serviceConfig.setApplication(getApplicationConfig()); // 不會(huì)注冊(cè)到注冊(cè)中心 serviceConfig.setRegistry(new RegistryConfig("N/A")); serviceConfig.setProtocol(generateMetadataProtocol()); serviceConfig.setInterface(MetadataService.class); serviceConfig.setDelay(0); serviceConfig.setRef(metadataService); serviceConfig.setGroup(getApplicationConfig().getName()); serviceConfig.setVersion(metadataService.version()); serviceConfig.setMethods(generateMethodConfig()); // 用暴露Triple協(xié)議服務(wù)的流程來(lái)暴露MetadataService // 采用的是Dubbo協(xié)議 serviceConfig.export(); this.serviceConfig = serviceConfig; } return this;}
由于暴露 MetadataService 的流程是復(fù)用前面提到的暴露 Triple 協(xié)議服務(wù)的流程,整個(gè)過(guò)程有少許地方會(huì)不同,這些不同之處在上面的代碼中都已經(jīng)標(biāo)明,所以就不再贅述了。
注冊(cè) ServiceInstance 實(shí)例
注冊(cè) ServiceInstance 的目的是為了定時(shí)更新 Metadata,當(dāng)有更新的時(shí)候就會(huì)通過(guò) MetadataReport 來(lái)更新版本號(hào)讓 Consumer 端感知到。
核心代碼在 DubboBootstrap#registerServiceInstance 和 DubboBootstrap#doRegisterServiceInstance 中,具體如下:
- private void registerServiceInstance() { .... // 創(chuàng)建ServiceInstance // ServiceInstance中包含以下字段 // 1. serviceName:${application-name} // 2. host: 192.168.31.167 // 3. port: 2080 // 4. metadata: 服務(wù)接口級(jí)相關(guān)的數(shù)據(jù),比如:methods等數(shù)據(jù) // 同時(shí),還會(huì)對(duì)ServiceInstance數(shù)據(jù)中的字段進(jìn)行補(bǔ)充,分別調(diào)用下面4個(gè)ServiceInstanceCustomizer實(shí)例 // 1)ServiceInstanceMetadataCustomizer // 2)MetadataServiceURLParamsMetadataCustomizer // 3)ProtocolPortsMetadataCustomizer // 4)ServiceInstanceHostPortCustomizer ServiceInstance serviceInstance = createServiceInstance(serviceName); boolean registered = true; try { // 注冊(cè)ServiceInstance doRegisterServiceInstance(serviceInstance); } catch (Exception e) { registered = false; logger.error("Register instance error", e); } // 如果注冊(cè)成功,定時(shí)更新Metadata,沒(méi)10s更新一次 if(registered){ executorRepository.nextScheduledExecutor().scheduleAtFixedRate(() -> { ...... try { // 刷新Metadata和ServiceInstance ServiceInstanceMetadataUtils.refreshMetadataAndInstance(serviceInstance); } catch (Exception e) { ...... } finally { ...... } }, 0, ConfigurationUtils.get(METADATA_PUBLISH_DELAY_KEY, DEFAULT_METADATA_PUBLISH_DELAY), TimeUnit.MILLISECONDS); }}
DubboBootstrap#doRegisterServiceInstance
- private void doRegisterServiceInstance(ServiceInstance serviceInstance) { if (serviceInstance.getPort() > 0) { // 發(fā)布Metadata數(shù)據(jù)到遠(yuǎn)端存儲(chǔ)元數(shù)據(jù)中心 // 調(diào)用RemoteMetadataServiceImpl#publishMetadata, // 內(nèi)部會(huì)調(diào)用metadataReport#publishAppMetadata publishMetadataToRemote(serviceInstance); logger.info("Start registering instance address to registry."); getServiceDiscoveries().forEach(serviceDiscovery ->{ ServiceInstance serviceInstanceForRegistry = new DefaultServiceInstance((DefaultServiceInstance) serviceInstance); calInstanceRevision(serviceDiscovery, serviceInstanceForRegistry);
文章名稱(chēng):深入解析Dubbo3.0服務(wù)端暴露全流程
鏈接分享:http://m.fisionsoft.com.cn/article/cciijeg.html


咨詢(xún)
建站咨詢(xún)
