es 支持NIO 和netty 两种默认的方式 ,这里看es 的nio 主要是学习es是如何使用和封装的
直接看org.elasticsearch.action.ActionModule 的构造函数 这里主要有两个 setupActions 和 new RestController 这个两个 ,这里的 Actions 目前还不知道是干啥的,猜测是TransportActtion 主要是socket 请求,RestController 这个主要是http请求
1 | public ActionModule(Settings settings, IndexNameExpressionResolver indexNameExpressionResolver, |
这里主要学习一下,es 是如何一步一步,通过 Nio 把socket 转换成HTTP在通过对应 的org.elasticsearch.rest.RestHandler (这个是请求处理的顶层接口)来处理的
这里主要涉及的包有如下
1.es对java Nio的封装
- nio到http的转换
3.http协议的封装
这里画了一个简单层级关系,一个请求过来后,通过nioSocket ,转换成http请求最后在restHandle里处理,完成后在反向写回。这是逻辑上的一个理解。
我们在看看代码上又是如何实现落地的,很多时候我们大脑是可以理解这种模型,但是到了实际编写代码的时候,发现却无从下手,或者是写出来的代码,层次不是那么清晰。看看下面的类图结构。
看这个类图,最好选理解一下java nio编程模型。
首选是SokectChannel 这个Java Nio 封装的一个基础接口。ServerSokectChannel是处理服务端的一个Channel 继承 SokectChannel 。
NioChannel是es 自己抽象出来的一个 Channel,NioSokectChannel 和NioServerSokectChannel 都实现了NioChannel 并且是 SokectChannel和ServerCokectChannel 的包装
HttpChannel和HttpServerChannel 这两个接口是es对 http的抽象,这一层主要实现是NioHttpChannel和NioHttpServerChannel ,这两个类是通过继承 NioSokectChannel 和NioServerSokectChannel 的方式来进行包装的,
Http的请求是如何分发到不同的RestHandle的,这里就需要查看 RestController ,我们看看 RestController的初始化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33public ActionModule(Settings settings, IndexNameExpressionResolver indexNameExpressionResolver,
IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter,
ThreadPool threadPool, List<ActionPlugin> actionPlugins, NodeClient nodeClient,
CircuitBreakerService circuitBreakerService, UsageService usageService) {
this.settings = settings;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.indexScopedSettings = indexScopedSettings;
this.clusterSettings = clusterSettings;
this.settingsFilter = settingsFilter;
this.actionPlugins = actionPlugins;
actions = setupActions(actionPlugins);
actionFilters = setupActionFilters(actionPlugins);
autoCreateIndex = new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver);
destructiveOperations = new DestructiveOperations(settings, clusterSettings);
Set<String> headers = Stream.concat(
actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
Stream.of(Task.X_OPAQUE_ID)
).collect(Collectors.toSet());
UnaryOperator<RestHandler> restWrapper = null;
for (ActionPlugin plugin : actionPlugins) {
UnaryOperator<RestHandler> newRestWrapper = plugin.getRestHandlerWrapper(threadPool.getThreadContext());
if (newRestWrapper != null) {
logger.debug("Using REST wrapper from plugin " + plugin.getClass().getName());
if (restWrapper != null) {
throw new IllegalArgumentException("Cannot have more than one plugin implementing a REST wrapper");
}
restWrapper = newRestWrapper;
}
}
...
//这里的 headers和restWrapper 分别是对应 setupActions(actionPlugins) 方法初始化后的数据
restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService);
}5.在RestController 中有一个 dispatchRequest 看名字就 可以猜测这个是请求分发的,但是如何把http请路由到不同handle ? 这里有一个技巧,选中方法用idea 中 Hierarchy (快捷键 Ctrl+Shift+H) 如图
最终我们看到 一个 org.elasticsearch.http.nio.HttpReadWriteHandler handleRequest 方法
1 |
|
最后 transport.incomingRequest(httpRequest, nioHttpChannel); 这个代码会调用下面的代码 这样 整个流程就窜起来了
1 |
|
代码是窜起来, es 又是如何分配线程,暂时还不清楚