成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

渣渣的 ElasticSearch 源碼解析 —— 啟動流程(下)

ztyzz / 1850人閱讀

摘要:關(guān)注我轉(zhuǎn)載請務必注明原創(chuàng)地址為前提上篇文章寫完了流程啟動的一部分,方法都入口,以及創(chuàng)建運行的必須環(huán)境以及相關(guān)配置,接著就是創(chuàng)建該環(huán)境的節(jié)點了。的創(chuàng)建看下新建節(jié)點的代碼代碼比較多,這里是比較關(guān)鍵的地方,我就把注釋直接寫在代碼上面了,實在不好

關(guān)注我

轉(zhuǎn)載請務必注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2018/08/12/es-code03/

前提

上篇文章寫完了 ES 流程啟動的一部分,main 方法都入口,以及創(chuàng)建 Elasticsearch 運行的必須環(huán)境以及相關(guān)配置,接著就是創(chuàng)建該環(huán)境的節(jié)點了。

Node 的創(chuàng)建

看下新建節(jié)點的代碼:(代碼比較多,這里是比較關(guān)鍵的地方,我就把注釋直接寫在代碼上面了,實在不好拆開這段代碼,300 多行代碼)

public Node(Environment environment) {
        this(environment, Collections.emptyList()); //執(zhí)行下面的代碼
    }

protected Node(final Environment environment, Collection> classpathPlugins) {
    final List resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error
    boolean success = false;
    {
// use temp logger just to say we are starting. we can"t use it later on because the node name might not be set
        Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(environment.settings()));
        logger.info("initializing ...");
    }
    try {
        originalSettings = environment.settings();
        Settings tmpSettings = Settings.builder().put(environment.settings())
            .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build();

// create the node environment as soon as possible, to recover the node id and enable logging
        try {
            nodeEnvironment = new NodeEnvironment(tmpSettings, environment); //1、創(chuàng)建節(jié)點環(huán)境,比如節(jié)點名稱,節(jié)點ID,分片信息,存儲元,以及分配內(nèi)存準備給節(jié)點使用
            resourcesToClose.add(nodeEnvironment);
        } catch (IOException ex) {
        throw new IllegalStateException("Failed to create node environment", ex);
        }
        final boolean hadPredefinedNodeName = NODE_NAME_SETTING.exists(tmpSettings);
        final String nodeId = nodeEnvironment.nodeId();
        tmpSettings = addNodeNameIfNeeded(tmpSettings, nodeId);
        final Logger logger = Loggers.getLogger(Node.class, tmpSettings);
// this must be captured after the node name is possibly added to the settings
        final String nodeName = NODE_NAME_SETTING.get(tmpSettings);
        if (hadPredefinedNodeName == false) {
            logger.info("node name derived from node ID [{}]; set [{}] to override", nodeId, NODE_NAME_SETTING.getKey());
        } else {
            logger.info("node name [{}], node ID [{}]", nodeName, nodeId);
        }

        //2、打印出JVM相關(guān)信息
        final JvmInfo jvmInfo = JvmInfo.jvmInfo();
        logger.info(
"version[{}], pid[{}], build[{}/{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",
            Version.displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot()),
            jvmInfo.pid(), Build.CURRENT.flavor().displayName(),
            Build.CURRENT.type().displayName(), Build.CURRENT.shortHash(),
            Build.CURRENT.date(), Constants.OS_NAME, Constants.OS_VERSION,
            Constants.OS_ARCH,Constants.JVM_VENDOR,Constants.JVM_NAME,
            Constants.JAVA_VERSION,Constants.JVM_VERSION);
        logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments()));
        //檢查當前版本是不是 pre-release 版本(Snapshot),
        warnIfPreRelease(Version.CURRENT, Build.CURRENT.isSnapshot(), logger);
        。。。
        this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(), environment.pluginsFile(), classpathPlugins);   //3、利用PluginsService加載相應的模塊和插件
        this.settings = pluginsService.updatedSettings();
        localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());

// create the environment based on the finalized (processed) view of the settings
// this is just to makes sure that people get the same settings, no matter where they ask them from
        this.environment = new Environment(this.settings, environment.configFile());
        Environment.assertEquivalent(environment, this.environment);

        final List> executorBuilders = pluginsService.getExecutorBuilders(settings);        //線程池

        final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
        resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
        // adds the context to the DeprecationLogger so that it does not need to be injected everywhere
        DeprecationLogger.setThreadContext(threadPool.getThreadContext());
        resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));

        final List> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());       //額外配置
        final List additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
        for (final ExecutorBuilder builder : threadPool.builders()) {
            //4、加載一些額外配置
            additionalSettings.addAll(builder.getRegisteredSettings());
        }
        client = new NodeClient(settings, threadPool);//5、創(chuàng)建一個節(jié)點客戶端                                                                                  

        //6、緩存一系列模塊,如NodeModule,ClusterModule,IndicesModule,ActionModule,GatewayModule,SettingsModule,RepositioriesModule,scriptModule,analysisModule
        final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
        final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
        AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
        // this is as early as we can validate settings at this point. we already pass them to ScriptModule as well as ThreadPool so we might be late here already
        final SettingsModule settingsModule = new SettingsModule(this.settings, additionalSettings, additionalSettingsFilter);
scriptModule.registerClusterSettingsListeners(settingsModule.getClusterSettings());
        resourcesToClose.add(resourceWatcherService);
        final NetworkService networkService = new NetworkService(
  getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
        List clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
        final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool,                                                      ClusterModule.getClusterStateCustomSuppliers(clusterPlugins));
        clusterService.addStateApplier(scriptModule.getScriptService());
        resourcesToClose.add(clusterService);
        final IngestService ingestService = new IngestService(settings, threadPool, this.environment,                                                  scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
        final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state, clusterService.getClusterSettings(), client);
        final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,
listener::onNewInfo);
        final UsageService usageService = new UsageService(settings);

        ModulesBuilder modules = new ModulesBuilder();
// plugin modules must be added here, before others or we can get crazy injection errors...
        for (Module pluginModule : pluginsService.createGuiceModules()) {
            modules.add(pluginModule);
        }
        final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService);
        ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService);
        modules.add(clusterModule);
        IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
        modules.add(indicesModule);

        SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class));
        CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
                                                                                  settingsModule.getClusterSettings());
        resourcesToClose.add(circuitBreakerService);
        modules.add(new GatewayModule());

        PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
        BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
        resourcesToClose.add(bigArrays);
        modules.add(settingsModule);
        List namedWriteables = Stream.of(
            NetworkModule.getNamedWriteables().stream(),
            indicesModule.getNamedWriteables().stream(),
            searchModule.getNamedWriteables().stream(),
            pluginsService.filterPlugins(Plugin.class).stream()
            .flatMap(p -> p.getNamedWriteables().stream()),
            ClusterModule.getNamedWriteables().stream())
            .flatMap(Function.identity()).collect(Collectors.toList());
        final NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(namedWriteables);
        NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of(
            NetworkModule.getNamedXContents().stream(),
            searchModule.getNamedXContents().stream(),
            pluginsService.filterPlugins(Plugin.class).stream()
            .flatMap(p -> p.getNamedXContent().stream()),
            ClusterModule.getNamedXWriteables().stream())
.flatMap(Function.identity()).collect(toList()));
        modules.add(new RepositoriesModule(this.environment, pluginsService.filterPlugins(RepositoryPlugin.class), xContentRegistry));
        final MetaStateService metaStateService = new MetaStateService(settings, nodeEnvironment, xContentRegistry);
        final IndicesService indicesService = new IndicesService(settings, pluginsService, nodeEnvironment, xContentRegistry,
analysisModule.getAnalysisRegistry(),                                                                clusterModule.getIndexNameExpressionResolver(), indicesModule.getMapperRegistry(), namedWriteableRegistry,threadPool, settingsModule.getIndexScopedSettings(), circuitBreakerService, bigArrays, scriptModule.getScriptService(),client, metaStateService);

        Collection pluginComponents = pluginsService.filterPlugins(Plugin.class).stream()
            .flatMap(p -> p.createComponents(client, clusterService, threadPool, resourceWatcherService,scriptModule.getScriptService(), xContentRegistry, environment, nodeEnvironment,namedWriteableRegistry).stream())
.collect(Collectors.toList());

        ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
                                                     settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService);
        modules.add(actionModule);

        //7、獲取RestController,用于處理各種Elasticsearch的rest命令,如_cat,_all,_cat/health,_clusters等rest命令(Elasticsearch稱之為action)
        final RestController restController = actionModule.getRestController();
        final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,networkService, restController);
        Collection>> customMetaDataUpgraders =
            pluginsService.filterPlugins(Plugin.class).stream()
            .map(Plugin::getCustomMetaDataUpgrader)
            .collect(Collectors.toList());
        Collection>> indexTemplateMetaDataUpgraders =
            pluginsService.filterPlugins(Plugin.class).stream()
            .map(Plugin::getIndexTemplateMetaDataUpgrader)
            .collect(Collectors.toList());
        Collection> indexMetaDataUpgraders = pluginsService.filterPlugins(Plugin.class).stream()
            .map(Plugin::getIndexMetaDataUpgrader).collect(Collectors.toList());
        final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders, indexTemplateMetaDataUpgraders);
        final MetaDataIndexUpgradeService metaDataIndexUpgradeService = new MetaDataIndexUpgradeService(settings, xContentRegistry,                                                                                            indicesModule.getMapperRegistry(), settingsModule.getIndexScopedSettings(), indexMetaDataUpgraders);
        final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, nodeEnvironment, metaStateService,                                                      metaDataIndexUpgradeService, metaDataUpgrader);
        new TemplateUpgradeService(settings, client, clusterService, threadPool, indexTemplateMetaDataUpgraders);
        final Transport transport = networkModule.getTransportSupplier().get();
        Set taskHeaders = Stream.concat(
            pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()),
            Stream.of("X-Opaque-Id")
        ).collect(Collectors.toSet());
        final TransportService transportService = newTransportService(settings, transport, threadPool,
                                                                      networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
        final ResponseCollectorService responseCollectorService = new ResponseCollectorService(this.settings, clusterService);
        final SearchTransportService searchTransportService =  new SearchTransportService(settings, transportService,
                                                                                          SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
        final Consumer httpBind;
        final HttpServerTransport httpServerTransport;
        if (networkModule.isHttpEnabled()) {
            httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
            httpBind = b -> {
b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
            };
        } else {
            httpBind = b -> {
                b.bind(HttpServerTransport.class).toProvider(Providers.of(null));
            };
            httpServerTransport = null;
        }

        final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService, namedWriteableRegistry,networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),clusterModule.getAllocationService());
        
        this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,searchTransportService);

        final SearchService searchService = newSearchService(clusterService, indicesService, threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),responseCollectorService);

        final List> tasksExecutors = pluginsService
            .filterPlugins(PersistentTaskPlugin.class).stream()
     .map(p -> p.getPersistentTasksExecutor(clusterService, threadPool, client))
            .flatMap(List::stream)
            .collect(toList());

        final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(settings, tasksExecutors);
        final PersistentTasksClusterService persistentTasksClusterService =
            new PersistentTasksClusterService(settings, registry, clusterService);
        final PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, client);

//8、綁定處理各種服務的實例,這里是最核心的地方,也是Elasticsearch能處理各種服務的核心.
        modules.add(b -> {
            b.bind(Node.class).toInstance(this);
            b.bind(NodeService.class).toInstance(nodeService);
            b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
            b.bind(PluginsService.class).toInstance(pluginsService);
            b.bind(Client.class).toInstance(client);
            b.bind(NodeClient.class).toInstance(client);
            b.bind(Environment.class).toInstance(this.environment);
            b.bind(ThreadPool.class).toInstance(threadPool);
            b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
 b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
            b.bind(BigArrays.class).toInstance(bigArrays);
      b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());
 b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
            b.bind(IngestService.class).toInstance(ingestService);
            b.bind(UsageService.class).toInstance(usageService);
 b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
            b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);
            b.bind(MetaStateService.class).toInstance(metaStateService);
            b.bind(IndicesService.class).toInstance(indicesService);
            b.bind(SearchService.class).toInstance(searchService);            b.bind(SearchTransportService.class).toInstance(searchTransportService);
b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, searchService::createReduceContext));
            b.bind(Transport.class).toInstance(transport);
            b.bind(TransportService.class).toInstance(transportService);
            b.bind(NetworkService.class).toInstance(networkService);
            b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService()));
b.bind(MetaDataIndexUpgradeService.class).toInstance(metaDataIndexUpgradeService);
            b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
            b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
            b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
            {
                RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
                processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
                b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(settings, transportService,
indicesService, recoverySettings));
                b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(settings, threadPool,
transportService, recoverySettings, clusterService));
            }
            httpBind.accept(b);
            pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
b.bind(PersistentTasksService.class).toInstance(persistentTasksService);       b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService);
b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry); });
        injector = modules.createInjector();

        // TODO hack around circular dependencies problems in AllocationService
clusterModule.getAllocationService().setGatewayAllocator(injector.getInstance(GatewayAllocator.class));

        List pluginLifecycleComponents = pluginComponents.stream()
            .filter(p -> p instanceof LifecycleComponent)
            .map(p -> (LifecycleComponent) p).collect(Collectors.toList());

        //9、利用Guice將各種模塊以及服務(xxxService)注入到Elasticsearch環(huán)境中
pluginLifecycleComponents.addAll(pluginsService.getGuiceServiceClasses().stream()                                     .map(injector::getInstance).collect(Collectors.toList()));
        resourcesToClose.addAll(pluginLifecycleComponents);
        this.pluginLifecycleComponents = Collections.unmodifiableList(pluginLifecycleComponents);
        client.initialize(injector.getInstance(new Key>() {}), () -> clusterService.localNode().getId(), transportService.getRemoteClusterService());

        if (NetworkModule.HTTP_ENABLED.get(settings)) { //如果elasticsearch.yml文件中配置了http.enabled參數(shù)(默認為true),則會初始化RestHandlers
            logger.debug("initializing HTTP handlers ...");
            actionModule.initRestHandlers(() -> clusterService.state().nodes()); //初始化RestHandlers, 解析集群命令,如_cat/,_cat/health
        }
        //10、初始化工作完成
        logger.info("initialized");

        success = true;
    } catch (IOException ex) {
        throw new ElasticsearchException("failed to bind service", ex);
    } finally {
        if (!success) {
            IOUtils.closeWhileHandlingException(resourcesToClose);
        }
    }
}

上面代碼真的很多,這里再說下上面這么多代碼主要干了什么吧:(具體是哪行代碼執(zhí)行的如下流程,上面代碼中也標記了)

1、創(chuàng)建節(jié)點環(huán)境,比如節(jié)點名稱,節(jié)點 ID,分片信息,存儲元,以及分配內(nèi)存準備給節(jié)點使用

2、打印出 JVM 相關(guān)信息

3、利用 PluginsService 加載相應的模塊和插件,具體哪些模塊可以去 modules 目錄下查看

4、加載一些額外的配置參數(shù)

5、創(chuàng)建一個節(jié)點客戶端

6、緩存一系列模塊,如NodeModule,ClusterModule,IndicesModule,ActionModule,GatewayModule,SettingsModule,RepositioriesModule,scriptModule,analysisModule

7、獲取 RestController,用于處理各種 Elasticsearch 的 rest 命令,如 _cat, _all, _cat/health, _clusters 等 rest命令

8、綁定處理各種服務的實例

9、利用 Guice 將各種模塊以及服務(xxxService)注入到 Elasticsearch 環(huán)境中

10、初始化工作完成(打印日志)

JarHell 報錯解釋

前一篇閱讀源碼環(huán)境搭建的文章寫過用 JDK 1.8 編譯 ES 源碼是會遇到如下異常:

org.elasticsearch.bootstrap.StartupException: java.lang.IllegalStateException: jar hell!

這里說下就是 setup 方法中的如下代碼導致的

try {
    // look for jar hell
    final Logger logger = ESLoggerFactory.getLogger(JarHell.class);
    JarHell.checkJarHell(logger::debug);
} catch (IOException | URISyntaxException e) {
    throw new BootstrapException(e);
}

所以你如果是用 JDK 1.8 編譯的,那么就需要把所有的有這塊的代碼給注釋掉就可以編譯成功的。

我自己試過用 JDK 10 編譯是沒有出現(xiàn)這里報錯的。

正式啟動 ES 節(jié)點

回到上面 Bootstrap 中的靜態(tài) init 方法中,接下來就是正式啟動 elasticsearch 節(jié)點了:

INSTANCE.start();  //調(diào)用下面的 start 方法

private void start() throws NodeValidationException {
    node.start();                                       //正式啟動 Elasticsearch 節(jié)點
    keepAliveThread.start();
}

接下來看看這個 start 方法里面的 node.start() 方法源碼:

public Node start() throws NodeValidationException {
    if (!lifecycle.moveToStarted()) {
        return this;
    }

    Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
    logger.info("starting ...");
    pluginLifecycleComponents.forEach(LifecycleComponent::start); 
    
    //1、利用Guice獲取上述注冊的各種模塊以及服務
    //Node 的啟動其實就是 node 里每個組件的啟動,同樣的,分別調(diào)用不同的實例的 start 方法來啟動這個組件, 如下:
    injector.getInstance(MappingUpdatedAction.class).setClient(client);
    injector.getInstance(IndicesService.class).start();
    injector.getInstance(IndicesClusterStateService.class).start();
    injector.getInstance(SnapshotsService.class).start();
    injector.getInstance(SnapshotShardsService.class).start();
    injector.getInstance(RoutingService.class).start();
    injector.getInstance(SearchService.class).start();
    nodeService.getMonitorService().start();

    final ClusterService clusterService = injector.getInstance(ClusterService.class);

    final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
    nodeConnectionsService.start();
    clusterService.setNodeConnectionsService(nodeConnectionsService);

    injector.getInstance(ResourceWatcherService.class).start();
    injector.getInstance(GatewayService.class).start();
    Discovery discovery = injector.getInstance(Discovery.class);
    clusterService.getMasterService().setClusterStatePublisher(discovery::publish);

    // Start the transport service now so the publish address will be added to the local disco node in ClusterService
    TransportService transportService = injector.getInstance(TransportService.class);
    transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
    transportService.start();
    assert localNodeFactory.getNode() != null;
    assert transportService.getLocalNode().equals(localNodeFactory.getNode())
        : "transportService has a different local node than the factory provided";
    final MetaData onDiskMetadata;
    try {
        // we load the global state here (the persistent part of the cluster state stored on disk) to
        // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
        if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) {//根據(jù)配置文件看當前節(jié)點是master還是data節(jié)點
            onDiskMetadata = injector.getInstance(GatewayMetaState.class).loadMetaState();
        } else {
            onDiskMetadata = MetaData.EMPTY_META_DATA;
        }
        assert onDiskMetadata != null : "metadata is null but shouldn"t"; // this is never null
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
    validateNodeBeforeAcceptingRequests(new BootstrapContext(settings, onDiskMetadata), transportService.boundAddress(), pluginsService
        .filterPlugins(Plugin
        .class)
        .stream()
        .flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));

    //2、將當前節(jié)點加入到一個集群簇中去,并啟動當前節(jié)點
    clusterService.addStateApplier(transportService.getTaskManager());
    // start after transport service so the local disco is known
    discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService
    clusterService.start();
    assert clusterService.localNode().equals(localNodeFactory.getNode())
        : "clusterService has a different local node than the factory provided";
    transportService.acceptIncomingRequests();
    discovery.startInitialJoin();
    // tribe nodes don"t have a master so we shouldn"t register an observer         s
    final TimeValue initialStateTimeout = DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.get(settings);
    if (initialStateTimeout.millis() > 0) {
        final ThreadPool thread = injector.getInstance(ThreadPool.class);
        ClusterState clusterState = clusterService.state();
        ClusterStateObserver observer = new ClusterStateObserver(clusterState, clusterService, null, logger, thread.getThreadContext());
        if (clusterState.nodes().getMasterNodeId() == null) {
            logger.debug("waiting to join the cluster. timeout [{}]", initialStateTimeout);
            final CountDownLatch latch = new CountDownLatch(1);
            observer.waitForNextChange(new ClusterStateObserver.Listener() {
                @Override
                public void onNewClusterState(ClusterState state) { latch.countDown(); }

                @Override
                public void onClusterServiceClose() {
                    latch.countDown();
                }

                @Override
                public void onTimeout(TimeValue timeout) {
                    logger.warn("timed out while waiting for initial discovery state - timeout: {}",
                        initialStateTimeout);
                    latch.countDown();
                }
            }, state -> state.nodes().getMasterNodeId() != null, initialStateTimeout);

            try {
                latch.await();
            } catch (InterruptedException e) {
                throw new ElasticsearchTimeoutException("Interrupted while waiting for initial discovery state");
            }
        }
    }


    if (NetworkModule.HTTP_ENABLED.get(settings)) {
        injector.getInstance(HttpServerTransport.class).start();
    }

    if (WRITE_PORTS_FILE_SETTING.get(settings)) {
        if (NetworkModule.HTTP_ENABLED.get(settings)) {
            HttpServerTransport http = injector.getInstance(HttpServerTransport.class);
            writePortsFile("http", http.boundAddress());
        }
        TransportService transport = injector.getInstance(TransportService.class);
        writePortsFile("transport", transport.boundAddress());
    }

    logger.info("started");

    pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);

    return this;
}

上面代碼主要是:

1、利用 Guice 獲取上述注冊的各種模塊以及服務,然后啟動 node 里每個組件(分別調(diào)用不同的實例的 start 方法來啟動)

2、打印日志(啟動節(jié)點完成)

總結(jié)

這篇文章主要把大概啟動流程串通了,講了下 node 節(jié)點的創(chuàng)建和正式啟動 ES 節(jié)點了。因為篇幅較多所以拆開成兩篇,先不扣細節(jié)了,后面流程啟動文章寫完后我們再單一的扣細節(jié)。

相關(guān)文章

1、渣渣菜雞為什么要看 ElasticSearch 源碼?

2、渣渣菜雞的 ElasticSearch 源碼解析 —— 環(huán)境搭建

3、渣渣菜雞的 ElasticSearch 源碼解析 —— 啟動流程(上)

4、渣渣菜雞的 ElasticSearch 源碼解析 —— 啟動流程(下)

5、Elasticsearch 系列文章(一):Elasticsearch 默認分詞器和中分分詞器之間的比較及使用方法

6、Elasticsearch 系列文章(二):全文搜索引擎 Elasticsearch 集群搭建入門教程

7、Elasticsearch 系列文章(三):ElasticSearch 集群監(jiān)控

8、Elasticsearch 系列文章(四):ElasticSearch 單個節(jié)點監(jiān)控

9、Elasticsearch 系列文章(五):ELK 實時日志分析平臺環(huán)境搭建

10、教你如何在 IDEA 遠程 Debug ElasticSearch

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://systransis.cn/yun/76904.html

相關(guān)文章

  • 渣的 ElasticSearch 源碼解析 —— 啟動流程(上)

    摘要:總結(jié)這篇文章主要先把大概啟動流程串通,因為篇幅較多所以拆開成兩篇,先不扣細節(jié)了,后面流程啟動文章寫完后我們再單一的扣細節(jié)。 關(guān)注我 showImg(https://segmentfault.com/img/remote/1460000012730965?w=258&h=258); 轉(zhuǎn)載請務必注明原創(chuàng)地址為:http://www.54tianzhisheng.cn/2018/08/11/...

    AZmake 評論0 收藏0
  • 渣的 ElasticSearch 源碼解析 —— 環(huán)境搭建

    摘要:注意這個版本需要和下面的源碼版本一致下載源碼從上下載相應版本的源代碼,這里建議用,這樣的話后面你可以隨意切換到的其他版本去。我們看下有哪些版本的找到了目前源碼版本最新的版本的穩(wěn)定版為切換到該版本于是就可以切換到該穩(wěn)定版本了。 關(guān)注我 showImg(https://segmentfault.com/img/remote/1460000012730965?w=258&h=258); 轉(zhuǎn)載...

    wudengzan 評論0 收藏0

發(fā)表評論

0條評論

ztyzz

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<