摘要:于是我們繼續(xù)斷點(diǎn)往下走,發(fā)現(xiàn)對(duì)象里面只有一個(gè)類(lèi)名路徑為。進(jìn)入看看,這個(gè)配置類(lèi)有哪些重要的方法。。。分析一果不其然,方法被調(diào)用了,緊接著也進(jìn)入斷點(diǎn),然后在往下走,又進(jìn)入的方法中的回調(diào)處。
SpringCloud(第 050 篇)Netflix Eureka 源碼深入剖析(下)
-
一、大致介紹1、鑒于一些朋友的提問(wèn)并提議講解下eureka的源碼分析,由此應(yīng)運(yùn)而產(chǎn)生的本章節(jié)的內(nèi)容; 2、所以我站在自我的理解角度試著整理了這篇Eureka源碼的分析,希望對(duì)大家有所幫助; 3、由于篇幅太長(zhǎng)不能在一篇里面發(fā)布出來(lái),所以拆分了上下篇;二、基本原理
1、Eureka Server 提供服務(wù)注冊(cè)服務(wù),各個(gè)節(jié)點(diǎn)啟動(dòng)后,會(huì)在Eureka Server中進(jìn)行注冊(cè),這樣Eureka Server中的服務(wù)注冊(cè)表中將會(huì)存儲(chǔ)所有可用服務(wù)節(jié)點(diǎn)的信息,服務(wù)節(jié)點(diǎn)的信息可以在界面中直觀的看到。 2、Eureka Client 是一個(gè)Java 客戶(hù)端,用于簡(jiǎn)化與Eureka Server的交互,客戶(hù)端同時(shí)也具備一個(gè)內(nèi)置的、使用輪詢(xún)負(fù)載算法的負(fù)載均衡器。 3、在應(yīng)用啟動(dòng)后,將會(huì)向Eureka Server發(fā)送心跳(默認(rèn)周期為30秒),如果Eureka Server在多個(gè)心跳周期沒(méi)有收到某個(gè)節(jié)點(diǎn)的心跳,Eureka Server 將會(huì)從服務(wù)注冊(cè)表中把這個(gè)服務(wù)節(jié)點(diǎn)移除(默認(rèn)90秒)。 4、Eureka Server之間將會(huì)通過(guò)復(fù)制的方式完成數(shù)據(jù)的同步; 5、Eureka Client具有緩存的機(jī)制,即使所有的Eureka Server 都掛掉的話(huà),客戶(hù)端依然可以利用緩存中的信息消費(fèi)其它服務(wù)的API;三、EurekaServer 啟動(dòng)流程分析
詳見(jiàn) SpringCloud(第 049 篇)Netflix Eureka 源碼深入剖析(上)
四、EurekaServer 處理服務(wù)注冊(cè)、集群數(shù)據(jù)復(fù)制詳見(jiàn) SpringCloud(第 049 篇)Netflix Eureka 源碼深入剖析(上)
五、EurekaClient 啟動(dòng)流程分析 5.1 調(diào)換運(yùn)行模式,Run運(yùn)行 springms-discovery-eureka 服務(wù),Debug 運(yùn)行 springms-provider-user 服務(wù),先觀察日志先;2017-10-23 19:43:07.688 INFO 1488 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0 2017-10-23 19:43:07.694 INFO 1488 --- [ main] o.s.c.n.eureka.InstanceInfoFactory : Setting initial instance status as: STARTING 2017-10-23 19:43:07.874 INFO 1488 --- [ main] c.n.d.provider.DiscoveryJerseyProvider : Using JSON encoding codec LegacyJacksonJson 2017-10-23 19:43:07.874 INFO 1488 --- [ main] c.n.d.provider.DiscoveryJerseyProvider : Using JSON decoding codec LegacyJacksonJson 2017-10-23 19:43:07.971 INFO 1488 --- [ main] c.n.d.provider.DiscoveryJerseyProvider : Using XML encoding codec XStreamXml 2017-10-23 19:43:07.971 INFO 1488 --- [ main] c.n.d.provider.DiscoveryJerseyProvider : Using XML decoding codec XStreamXml 2017-10-23 19:43:08.134 INFO 1488 --- [ main] c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration 2017-10-23 19:43:08.344 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Disable delta property : false 2017-10-23 19:43:08.344 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Single vip registry refresh property : null 2017-10-23 19:43:08.344 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Force full registry fetch : false 2017-10-23 19:43:08.344 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Application is null : false 2017-10-23 19:43:08.344 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Registered Applications size is zero : true 2017-10-23 19:43:08.344 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Application version is -1: true 2017-10-23 19:43:08.345 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Getting all instance registry info from the eureka server 2017-10-23 19:43:08.630 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : The response status is 200 2017-10-23 19:43:08.631 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Starting heartbeat executor: renew interval is: 30 2017-10-23 19:43:08.634 INFO 1488 --- [ main] c.n.discovery.InstanceInfoReplicator : InstanceInfoReplicator onDemand update allowed rate per min is 4 2017-10-23 19:43:08.637 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Discovery Client initialized at timestamp 1508758988637 with initial instances count: 0 2017-10-23 19:43:08.657 INFO 1488 --- [ main] c.n.e.EurekaDiscoveryClientConfiguration : Registering application springms-provider-user with eureka with status UP 2017-10-23 19:43:08.658 INFO 1488 --- [ main] com.netflix.discovery.DiscoveryClient : Saw local status change event StatusChangeEvent [timestamp=1508758988658, current=UP, previous=STARTING] 2017-10-23 19:43:08.659 INFO 1488 --- [nfoReplicator-0] com.netflix.discovery.DiscoveryClient : DiscoveryClient_SPRINGMS-PROVIDER-USER/springms-provider-user:192.168.3.101:7900: registering service... 2017-10-23 19:43:08.768 INFO 1488 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 7900 (http) 2017-10-23 19:43:08.768 INFO 1488 --- [ main] c.n.e.EurekaDiscoveryClientConfiguration : Updating port to 7900 2017-10-23 19:43:08.773 INFO 1488 --- [ main] c.s.cloud.MsProviderUserApplication : Started MsProviderUserApplication in 9.694 seconds (JVM running for 10.398) 【【【【【【 用戶(hù)微服務(wù) 】】】】】】已啟動(dòng). 【分析一】:根據(jù)日志粗粒度看,大多數(shù)日志都是在 DiscoveryClient 打印出來(lái)的,由此我們先不妨將這些打印日志的地方都打上斷點(diǎn),為了后序 斷點(diǎn)查看調(diào)用堆棧信息。 【分析二】:仔細(xì)查看下日志,先是 DefaultLifecycleProcessor 類(lèi)處理了一些 bean,然后接下來(lái)肯定會(huì)調(diào)用一些實(shí)現(xiàn) SmartLifecycle 類(lèi)的 start 方法; 【分析三】: 接著初始化設(shè)置了EurekaClient的狀態(tài)為 STARTING,初始化編碼使用的格式,哪些用JSON,哪些用XML; 【分析四】: 緊接著打印了強(qiáng)制獲取注冊(cè)信息狀態(tài)為false,已注冊(cè)的應(yīng)用大小為0,客戶(hù)端發(fā)送心跳續(xù)約,心跳續(xù)約間隔為30秒,最后打印Client 初始化完成; 【分析五】:帶著這些通過(guò)日志查看出來(lái)的端倪,然后我們還得吸取分析EurekaServer的教訓(xùn),我們得先去 @EnableEurekaClient 注解瞧瞧。5.2 有目的性的先去 MsProviderUserApplication 看看,鏈接點(diǎn)進(jìn) EnableEurekaClient 瞧瞧。
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @EnableDiscoveryClient public @interface EnableEurekaClient { } 【分析一】:我們會(huì)發(fā)現(xiàn),@EnableEurekaClient 注解類(lèi)竟然也使用了注解 @EnableDiscoveryClient,那么我們有必要去這個(gè)注解類(lèi)看看。 @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Import(EnableDiscoveryClientImportSelector.class) public @interface EnableDiscoveryClient { } 【分析二】:我們看到的是 @EnableDiscoveryClient 注解類(lèi)有個(gè)比較特殊的注解 @Import,由此我們猜想,這里的大多數(shù)邏輯是不是都寫(xiě)在這個(gè) EnableDiscoveryClientImportSelector 類(lèi)呢?5.3 進(jìn)入 EnableDiscoveryClientImportSelector 看看到底做了些啥?
@Order(Ordered.LOWEST_PRECEDENCE - 100) public class EnableDiscoveryClientImportSelector extends SpringFactoryImportSelector5.4 EnableDiscoveryClientImportSelector.selectImports 這個(gè)方法果然進(jìn)斷點(diǎn)了。{ @Override protected boolean isEnabled() { return new RelaxedPropertyResolver(getEnvironment()).getProperty( "spring.cloud.discovery.enabled", Boolean.class, Boolean.TRUE); } @Override protected boolean hasDefaultFactory() { return true; } } 【分析一】:EnableDiscoveryClientImportSelector 類(lèi)集成了 SpringFactoryImportSelector 類(lèi),但是重寫(xiě)了一個(gè) isEnabled() 方 法,默認(rèn)值返回 true,為什么會(huì)返回true,也得有個(gè)說(shuō)法吧,于是我們進(jìn)入父類(lèi) EnableDiscoveryClientImportSelector 看看。 /** * Select and return the names of which class(es) should be imported based on * the {@link AnnotationMetadata} of the importing @{@link Configuration} class. */ @Override public String[] selectImports(AnnotationMetadata metadata) { if (!isEnabled()) { // 打上斷點(diǎn) return new String[0]; } AnnotationAttributes attributes = AnnotationAttributes.fromMap( metadata.getAnnotationAttributes(this.annotationClass.getName(), true)); Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is " + metadata.getClassName() + " annotated with @" + getSimpleName() + "?"); // Find all possible auto configuration classes, filtering duplicates List factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader .loadFactoryNames(this.annotationClass, this.beanClassLoader))); if (factories.isEmpty() && !hasDefaultFactory()) { throw new IllegalStateException("Annotation @" + getSimpleName() + " found, but there are no implementations. Did you forget to include a starter?"); } if (factories.size() > 1) { // there should only ever be one DiscoveryClient, but there might be more than // one factory log.warn("More than one implementation " + "of @" + getSimpleName() + " (now relying on @Conditionals to pick one): " + factories); } return factories.toArray(new String[factories.size()]); } 【分析二】:發(fā)現(xiàn)父類(lèi)有這么一個(gè) selectImports 方法使用了 isEnabled() 方法,這個(gè)方法干了些啥事情呢?我們細(xì)看下 selectImports 方法上面的英文注釋?zhuān)笾乱馑际牵哼x擇并且返回需要導(dǎo)入經(jīng)過(guò)注解配置的類(lèi),由此我們猜想這個(gè)導(dǎo)入的類(lèi)肯定對(duì)我們此次客戶(hù)端分析有莫大的幫助,于 是我們現(xiàn)在這個(gè)方法打上斷點(diǎn)先。于是我們現(xiàn)在該干的事情也干了,沒(méi)有頭緒的時(shí)候,我們現(xiàn)在才Run運(yùn)行EurekaServer,Debug運(yùn)行springms-provider-user。
【分析一】:既然進(jìn)了斷點(diǎn),我們看看這個(gè)方法,首先通過(guò)注解獲取了一些屬性,然后加載了一些類(lèi)名稱(chēng),于是我們進(jìn)入 loadFactoryNames 方法看看。 public static List5.5 進(jìn)入 EurekaDiscoveryClientConfiguration 看看,這個(gè)配置類(lèi)有哪些重要的方法?loadFactoryNames(Class> factoryClass, ClassLoader classLoader) { String factoryClassName = factoryClass.getName(); try { // 注釋?zhuān)簆ublic static final String FACTORIES_RESOURCE_LOCATION = "META-INF/spring.factories"; // 注釋?zhuān)哼@個(gè) jar 包下的一個(gè)配置文件 Enumeration urls = (classLoader != null ? classLoader.getResources(FACTORIES_RESOURCE_LOCATION) : ClassLoader.getSystemResources(FACTORIES_RESOURCE_LOCATION)); List result = new ArrayList (); while (urls.hasMoreElements()) { URL url = urls.nextElement(); Properties properties = PropertiesLoaderUtils.loadProperties(new UrlResource(url)); String factoryClassNames = properties.getProperty(factoryClassName); result.addAll(Arrays.asList(StringUtils.commaDelimitedListToStringArray(factoryClassNames))); } return result; } catch (IOException ex) { throw new IllegalArgumentException("Unable to load [" + factoryClass.getName() + "] factories from location [" + FACTORIES_RESOURCE_LOCATION + "]", ex); } } 【分析二】:加載了一個(gè)配置文件,配置文件里面寫(xiě)了啥呢?打開(kāi)SpringFactoryImportSelector該文件所在的jar包的spring.factories文件一看。 # AutoConfiguration org.springframework.boot.autoconfigure.EnableAutoConfiguration= org.springframework.cloud.client.CommonsClientAutoConfiguration, org.springframework.cloud.client.discovery.noop.NoopDiscoveryClientAutoConfiguration, org.springframework.cloud.client.hypermedia.CloudHypermediaAutoConfiguration, org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration, org.springframework.cloud.commons.util.UtilAutoConfiguration # Environment Post Processors org.springframework.boot.env.EnvironmentPostProcessor= org.springframework.cloud.client.HostInfoEnvironmentPostProcessor 【分析三】:看名稱(chēng),都是一些 Configuration 后綴的類(lèi)名,所以這些都是加載的一堆堆的配置文件類(lèi)。于是我們繼續(xù)斷點(diǎn)往下走,發(fā)現(xiàn) factories 對(duì)象里面只有一個(gè)類(lèi)名路徑為 org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration ??催@個(gè) 名字就應(yīng)該知道這是我們分析EurekaClient的一個(gè)重要的配置類(lèi),先不管三七二十一,找到該類(lèi)先。
@Configuration @EnableConfigurationProperties @ConditionalOnClass(EurekaClientConfig.class) @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true) @CommonsLog public class EurekaDiscoveryClientConfiguration implements SmartLifecycle, Ordered { @Override public void start() { // only set the port if the nonSecurePort is 0 and this.port != 0 if (this.port.get() != 0 && this.instanceConfig.getNonSecurePort() == 0) { this.instanceConfig.setNonSecurePort(this.port.get()); } // only initialize if nonSecurePort is greater than 0 and it isn"t already running // because of containerPortInitializer below if (!this.running.get() && this.instanceConfig.getNonSecurePort() > 0) { maybeInitializeClient(); if (log.isInfoEnabled()) { log.info("Registering application " + this.instanceConfig.getAppname() + " with eureka with status " + this.instanceConfig.getInitialStatus()); } this.applicationInfoManager .setInstanceStatus(this.instanceConfig.getInitialStatus()); if (this.healthCheckHandler != null) { this.eurekaClient.registerHealthCheck(this.healthCheckHandler); } this.context.publishEvent( new InstanceRegisteredEvent<>(this, this.instanceConfig)); this.running.set(true); } } 。。。 其它省略了 } 【分析一】:進(jìn)入這個(gè)類(lèi),首先看到該類(lèi)實(shí)現(xiàn)了 SmartLifecycle 接口,那么就肯定會(huì)實(shí)現(xiàn) start 方法,而且這個(gè) start 方法感覺(jué)應(yīng)在 “步驟5.1之分析二” 會(huì)被加載執(zhí)行的。 【分析二】:因?yàn)?start 這段代碼不多,所以我就索性將 start 方法中的每段代碼都點(diǎn)進(jìn)去看了看,發(fā)現(xiàn) this.applicationInfoManager.setInstanceStatus(this.instanceConfig.getInitialStatus()) 這段代碼有一個(gè)觀察者模式的回調(diào)存在。 // ApplicationInfoManager.setInstanceStatus 的方法 public synchronized void setInstanceStatus(InstanceStatus status) {// 打上斷點(diǎn) InstanceStatus prev = instanceInfo.setStatus(status); if (prev != null) { for (StatusChangeListener listener : listeners.values()) { try { listener.notify(new StatusChangeEvent(prev, status)); } catch (Exception e) { logger.warn("failed to notify listener: {}", listener.getId(), e); } } } } 【分析三】:這個(gè)方法會(huì)因?yàn)闋顟B(tài)的改變而回調(diào)所有實(shí)現(xiàn) StatusChangeListener 這個(gè)類(lèi)的地方,前提得先注冊(cè)到 listeners 中去才行。 【分析四】:于是乎,我們斷定,若想要回調(diào),那么就必須有地方先注冊(cè)這個(gè)事件,而且這個(gè)注冊(cè)還必須提前執(zhí)行在 start 方法前執(zhí)行,于是我們得先 在 ApplicationInfoManager 這個(gè)類(lèi)中找到注冊(cè)到 listeners 的這個(gè)方法。 public void registerStatusChangeListener(StatusChangeListener listener) {// 打上斷點(diǎn) listeners.put(listener.getId(), listener); } 【分析五】:沒(méi)錯(cuò),就是這個(gè)方法,肯定有地方調(diào)用這個(gè)方法,不然的話(huà),那調(diào)用 setInstanceStatus 這個(gè)方法的意義就什么用了。于是我們逆向找 下 registerStatusChangeListener 被調(diào)用的地方。 【分析六】:很不巧的是,盡然只有1個(gè)地方被調(diào)用,這個(gè)地方就是 DiscoveryClient.initScheduledTasks 方法,而且 initScheduledTasks 方法又是在 DiscoveryClient 的構(gòu)造函數(shù)里面調(diào)用的,同時(shí)我們也對(duì) initScheduledTasks 以及 initScheduledTasks 被調(diào)用的構(gòu)造方法地方 打上斷點(diǎn)。5.6 由于翻閱代碼時(shí)間有點(diǎn)久了,因此我們關(guān)閉 springms-provider-user 微服務(wù),重新 Debug 運(yùn)行一下。
【分析一】:果不其然,EurekaDiscoveryClientConfiguration.start 方法被調(diào)用了,緊接著 this.applicationInfoManager.setInstanceStatus(this.instanceConfig.getInitialStatus()) 也進(jìn)入斷點(diǎn),然后在往下走,又進(jìn)入的 DiscoveryClient.initScheduledTasks 方法中的 notify 回調(diào)處。 【分析二】:看著斷點(diǎn)依次經(jīng)過(guò)我們上述分析的地方,然后也符合日志打印的順序,所以我們現(xiàn)在應(yīng)該是有必要好好看看 DiscoveryClient.initScheduledTasks 這個(gè)方法究竟干了什么偉大的事情。然而又想了想,還不如看看 initScheduledTasks 被調(diào)用的構(gòu)造方法。5.7 進(jìn)入 DiscoveryClient 經(jīng)過(guò) @Inject 注解過(guò)的構(gòu)造方法。
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, DiscoveryClientOptionalArgs args, Provider六、下載地址backupRegistryProvider) { if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; } this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference (clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI"d DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } try { // 注釋?zhuān)憾〞r(shí)任務(wù)調(diào)度準(zhǔn)備 scheduler = Executors.newScheduledThreadPool(3, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); // 注釋?zhuān)簩?shí)例化心跳定時(shí)任務(wù)線(xiàn)程池 heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue (), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff // 注釋?zhuān)簩?shí)例化緩存刷新定時(shí)任務(wù)線(xiàn)程池 cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue (), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } // 注釋?zhuān)撼跏蓟{(diào)度任務(wù) initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI"d DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); } 【分析一】:從往下看,initScheduledTasks 這個(gè)方法顧名思義就是初始化調(diào)度任務(wù),所以這里面的內(nèi)容應(yīng)該就是重頭戲,進(jìn)入看看。 private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer // 注釋?zhuān)洪g隔多久去拉取服務(wù)注冊(cè)信息,默認(rèn)時(shí)間 30秒 int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); // 注釋?zhuān)憾〞r(shí)任務(wù),每間隔 30秒 去拉取一次服務(wù)注冊(cè)信息 scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { // 注釋?zhuān)洪g隔多久發(fā)送一次心跳續(xù)約,默認(rèn)間隔時(shí)間 30 秒 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs); // Heartbeat timer // 注釋?zhuān)憾〞r(shí)任務(wù),每間隔 30秒 去想 EurekaServer 發(fā)送一次心跳續(xù)約 scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator // 注釋?zhuān)簩?shí)例信息復(fù)制器,定時(shí)刷新dataCenterInfo數(shù)據(jù)中心信息,默認(rèn)30秒 instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize // 注釋?zhuān)簩?shí)例化狀態(tài)變化監(jiān)聽(tīng)器 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } // 注釋?zhuān)籂顟B(tài)有變化的話(huà),會(huì)回調(diào)這個(gè)方法 instanceInfoReplicator.onDemandUpdate(); } }; // 注釋?zhuān)鹤?cè)狀態(tài)變化監(jiān)聽(tīng)器 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } } 【分析二】:在這個(gè)方法從上往下一路注釋分析下來(lái),干了EurekaClient我們最想知道的一些事情,定時(shí)任務(wù)獲取注冊(cè)信息,定時(shí)任務(wù)刷新緩存,定時(shí) 任務(wù)心跳續(xù)約,定時(shí)任務(wù)同步數(shù)據(jù)中心數(shù)據(jù),狀態(tài)變化監(jiān)聽(tīng)回調(diào)等。但是唯獨(dú)沒(méi)看到注冊(cè),這是怎么回事呢? 【分析三】:我們忘記了一個(gè)重要的方法,instanceInfoReplicator.onDemandUpdate() 就是在狀態(tài)改變的時(shí)候,我們是如何處理的?由此,我們覺(jué)得這里面肯定有貓膩,不然沒(méi)辦法注冊(cè)呀。 public boolean onDemandUpdate() { if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { scheduler.submit(new Runnable() { @Override public void run() { logger.debug("Executing on-demand update of local InstanceInfo"); Future latestPeriodic = scheduledPeriodicRef.get(); if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); latestPeriodic.cancel(false); } // 注釋?zhuān)哼@里進(jìn)行了實(shí)例信息刷新和注冊(cè) InstanceInfoReplicator.this.run(); } }); return true; } else { logger.warn("Ignoring onDemand update due to rate limiter"); return false; } } 【分析四】:onDemandUpdate 這個(gè)方法,看來(lái)看去,唯獨(dú) InstanceInfoReplicator.this.run() 這個(gè)方法還有點(diǎn)用,而且還是 run 方法呢,感情 InstanceInfoReplicator 這個(gè)類(lèi)還是實(shí)現(xiàn)了 Runnable 接口?經(jīng)過(guò)查看這個(gè)類(lèi),還真是實(shí)現(xiàn)了 Runnable 接口。 【分析五】:于是乎,我們有理由相信,這個(gè)方法應(yīng)該我們要找的注冊(cè)所在的地方,翻開(kāi)代碼看看究竟。 public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } } 【分析六】:映入眼簾的就是 discoveryClient.register() 這個(gè)刺眼的 register 方法,終于有點(diǎn)苗頭了,原來(lái)注冊(cè)方法找的這么千辛萬(wàn)苦。雖然找到了這里,但是我還是想看看這個(gè)讓我們找的千辛萬(wàn)苦的注冊(cè)方法到底是怎么注冊(cè)的呢? boolean register() throws Throwable { logger.info(PREFIX + appPathIdentifier + ": registering service..."); EurekaHttpResponse httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; } 【分析七】:原來(lái)調(diào)用了 EurekaHttpClient 封裝的客戶(hù)端請(qǐng)求對(duì)象來(lái)進(jìn)行注冊(cè)的,再繼續(xù)深探 registrationClient.register 方法,于是我們來(lái)到了 AbstractJerseyEurekaHttpClient.register 方法。 @Override public EurekaHttpResponse register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); response = resourceBuilder .header("Accept-Encoding", "gzip") .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) // 注釋?zhuān)捍虬鼛袭?dāng)前應(yīng)用的所有信息 info .post(ClientResponse.class, info); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } } 【分析八】:原來(lái)調(diào)用的是 Jersey RESTful 框架來(lái)進(jìn)行請(qǐng)求的,然后在 EurekaServer 那邊就會(huì)在 ApplicationResource.addInstance 方法接收客戶(hù)端的注冊(cè)請(qǐng)求,因此我們的 EurekaClient 是如何注冊(cè)的就到此為止了。 【分析九】:至于那些續(xù)約、心跳的流程分析和這個(gè)注冊(cè)的流程大體差不多,相信大家按照我剛剛這么分析斷點(diǎn)下去,一定能分析的很到位的。
https://gitee.com/ylimhhmily/SpringCloudTutorial.git
SpringCloudTutorial交流QQ群: 235322432
SpringCloudTutorial交流微信群: 微信溝通群二維碼圖片鏈接
歡迎關(guān)注,您的肯定是對(duì)我最大的支持!!!
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/67867.html
摘要:在應(yīng)用啟動(dòng)后,將會(huì)向發(fā)送心跳默認(rèn)周期為秒,如果在多個(gè)心跳周期沒(méi)有收到某個(gè)節(jié)點(diǎn)的心跳,將會(huì)從服務(wù)注冊(cè)表中把這個(gè)服務(wù)節(jié)點(diǎn)移除默認(rèn)秒。進(jìn)入類(lèi)看看,看這個(gè)類(lèi)的名字,見(jiàn)名知意,應(yīng)該就是的啟動(dòng)類(lèi)了。。。分析一由于是我們剛剛打斷點(diǎn) SpringCloud(第 049 篇)Netflix Eureka 源碼深入剖析(上) - 一、大致介紹 1、鑒于一些朋友的提問(wèn)并提議講解下eureka的源碼分析,由此...
摘要:第篇的過(guò)濾器的使用一大致介紹我們?cè)趯W(xué)的時(shí)候,就有過(guò)濾器和攔截器的使用,而同樣也有過(guò)濾器的使用,本章節(jié)我們指在如何簡(jiǎn)單使用。是否執(zhí)行該過(guò)濾器。說(shuō)明需要過(guò)濾說(shuō)明不要過(guò)濾過(guò)濾器的具體邏輯。請(qǐng)求的添加服務(wù)網(wǎng)關(guān)微服務(wù)啟動(dòng)類(lèi)的過(guò)濾器的使用。 SpringCloud(第 021 篇)Zuul 的過(guò)濾器 ZuulFilter 的使用 - 一、大致介紹 1、我們?cè)趯W(xué) Spring 的時(shí)候,就有過(guò)濾器和攔...
摘要:提供給文件上傳微服務(wù)用的。注意注解能注冊(cè)到服務(wù)上,是因?yàn)樵撟⒔獍丝蛻?hù)端的注解,該是一個(gè)復(fù)合注解。地址可以查看該微服務(wù)網(wǎng)關(guān)代理了多少微服務(wù)的。 SpringCloud(第 024 篇)簡(jiǎn)單文件上傳微服務(wù),并加入 zuul 微服務(wù)后用 zuul 微服務(wù)地址采取curl或者頁(yè)面點(diǎn)擊實(shí)現(xiàn)文件上傳 - 一、大致介紹 1、本章節(jié)主要將文件上傳微服務(wù)加入到 zuul 服務(wù)中去,然后利用 zuul...
閱讀 2149·2023-04-25 18:49
閱讀 1854·2019-08-30 14:02
閱讀 2657·2019-08-29 17:24
閱讀 3334·2019-08-28 18:10
閱讀 2938·2019-08-28 18:03
閱讀 503·2019-08-26 12:01
閱讀 3322·2019-08-26 11:31
閱讀 1442·2019-08-26 10:29