elasticsearch8源码学习之启动
Healthy Mind Lv3

​ es 在平时工作中用到的越来越多,很多公司和产品都会用到es 而且es的性能也是非常好了,最近有时间就打算学习一下es源码

在github 下载新版的es 源码,我这里下载的是es8 ,es8需要使用jdk11,gradle 6 以上的版本

这里主要看这几个包下面的代码,es的代码量太大了,无法看全,

这个是es modules包 对于netty 也是放在这个包里面的

plugins 是插件包,平时用的比较多的就是分词器了,其实es 是可以存储在 hdfs,s3,gcs上的,而这些代码就在这里

这一块就是es 主体代码了,

这里主要是学习一下es 的这几点

  1. 做到这种高扩展的,
  2. 代码的一个整体架构,
  3. 对于一些开源组件的使用,如netty,Guice,lucene等的使用
  4. 在对es的使用上和性能优化上会有更好的感知。

下面开始看 org.elasticsearch.bootstrap.Elasticsearch 这个类,至于怎么找到这个类,

1.可以查看 es的安装目录下的 elasticsearch 文件

2.可以用命令查看 ps -ef|grep elasticsearch

找到 main 函数就顺着 一直往下点就可以了

最后点到这个方法,这里我们要选择EnvironmentAwareCommand这个 类,原因有两点

1.类的名字,因为这里是启动肯定要做环境检查,

  1. org.elasticsearch.bootstrap.Elasticsearch 这个类继承 EnvironmentAwareCommand 类 ,

    而EnvironmentAwareCommand 又继承 org.elasticsearch.cli.Command所以 这下就根明确了

在 EnvironmentAwareCommand 的execute里还有一个 execute 这个就比较简单了 肯定是

org.elasticsearch.bootstrap.Elasticsearch的execute 了

接着还是一路往下点最后到 org.elasticsearch.bootstrap.Bootstrap > setup 方法 ,前面都是 各种检查,这里 主要是 Node 类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {
Settings settings = environment.settings();

....

node = new Node(environment) {
@Override
protected void validateNodeBeforeAcceptingRequests(
final BootstrapContext context,
final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {
BootstrapChecks.check(context, boundTransportAddress, checks);
}
};
}

这里就到了 es 中的一个核心类 org.elasticsearch.node.Node 这个类的构造函数大概就有400多行😂

这里是各个模块的初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(),
environment.pluginsFile(), classpathPlugins);

final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));

client = new NodeClient(settings, threadPool);

final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));

AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));

final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);

final NetworkService networkService = new NetworkService(
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));

final SearchService searchService = newSearchService(clusterService, indicesService,
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
responseCollectorService);
....

这只是初始化了这些服务,真正的启动是在 org.elasticsearch.bootstrap.Bootstrap > start 方法

这里直接到 org.elasticsearch.node.Node > start 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
    public Node start() throws NodeValidationException {
if (!lifecycle.moveToStarted()) {
return this;
} logger.info("starting ...");
pluginLifecycleComponents.forEach(LifecycleComponent::start);

injector.getInstance(MappingUpdatedAction.class).setClient(client);
injector.getInstance(IndicesService.class).start();
injector.getInstance(IndicesClusterStateService.class).start();
injector.getInstance(SnapshotsService.class).start();
injector.getInstance(SnapshotShardsService.class).start();
injector.getInstance(SearchService.class).start();
nodeService.getMonitorService().start();

final ClusterService clusterService = injector.getInstance(ClusterService.class);

final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);
nodeConnectionsService.start();
clusterService.setNodeConnectionsService(nodeConnectionsService);

injector.getInstance(ResourceWatcherService.class).start();
injector.getInstance(GatewayService.class).start();
Discovery discovery = injector.getInstance(Discovery.class);
clusterService.getMasterService().setClusterStatePublisher(discovery::publish);

// Start the transport service now so the publish address will be added to the local disco node in ClusterService
TransportService transportService = injector.getInstance(TransportService.class);
transportService.getTaskManager().setTaskResultsService(injector.getInstance(TaskResultsService.class));
transportService.start();
assert localNodeFactory.getNode() != null;
assert transportService.getLocalNode().equals(localNodeFactory.getNode())
: "transportService has a different local node than the factory provided";
injector.getInstance(PeerRecoverySourceService.class).start();
final MetaData onDiskMetadata;
// we load the global state here (the persistent part of the cluster state stored on disk) to
// pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state.
if (DiscoveryNode.isMasterNode(settings()) || DiscoveryNode.isDataNode(settings())) {
onDiskMetadata = injector.getInstance(GatewayMetaState.class).getMetaData();
} else {
onDiskMetadata = MetaData.EMPTY_META_DATA;
}
assert onDiskMetadata != null : "metadata is null but shouldn't"; // this is never null
validateNodeBeforeAcceptingRequests(new BootstrapContext(environment, onDiskMetadata), transportService.boundAddress(),
pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.getBootstrapChecks().stream()).collect(Collectors.toList()));
... pluginsService.filterPlugins(ClusterPlugin.class).forEach(ClusterPlugin::onNodeStarted);
return this;
}

这里可以看出 es使用的是 google 的 Guice 作为ioc容器,如果我们是使用的 j2ee应用 也可以使用这个 据说是高性能 可以和 dagger 编译时注入 差不多,

这里的 org.elasticsearch.common.component.LifecycleComponent 有没有觉得和tomcat 的 org.apache.catalina.util.LifecycleBase 很像,都是抽象出来一个生命周期接口。

org.elasticsearch.common.component.AbstractLifecycleComponent 实现了org.elasticsearch.common.component.LifecycleComponent 接口 并做了一个抽象 doStart方法

使用 idea的 diagrams功能可以看到 这个的实现图

es的大致启动流程是这个样了,整个es 的代码在设计和抽象方面是很庞大,应该是一步一步不断进行重构的来的,而我们平时做的项目经过几轮的修改基本已经非常乱了,所以重构代码是一个持久性的工作;在工作中我们经常为了赶功能而忽略这些东西,review这个步骤还是比较重要的。