elasticsearch8源码学习之rest请求
Healthy Mind Lv3

​ es 支持NIO 和netty 两种默认的方式 ,这里看es 的nio 主要是学习es是如何使用和封装的

直接看org.elasticsearch.action.ActionModule 的构造函数 这里主要有两个 setupActions 和 new RestController 这个两个 ,这里的 Actions 目前还不知道是干啥的,猜测是TransportActtion 主要是socket 请求,RestController 这个主要是http请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public ActionModule(Settings settings, IndexNameExpressionResolver indexNameExpressionResolver,
IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter,
ThreadPool threadPool, List<ActionPlugin> actionPlugins, NodeClient nodeClient,
CircuitBreakerService circuitBreakerService, UsageService usageService) {
this.settings = settings;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.indexScopedSettings = indexScopedSettings;
this.clusterSettings = clusterSettings;
this.settingsFilter = settingsFilter;
this.actionPlugins = actionPlugins;
actions = setupActions(actionPlugins);
...

restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService);
}

这里主要学习一下,es 是如何一步一步,通过 Nio 把socket 转换成HTTP在通过对应 的org.elasticsearch.rest.RestHandler (这个是请求处理的顶层接口)来处理的

这里主要涉及的包有如下

1.es对java Nio的封装

  1. nio到http的转换

3.http协议的封装

这里画了一个简单层级关系,一个请求过来后,通过nioSocket ,转换成http请求最后在restHandle里处理,完成后在反向写回。这是逻辑上的一个理解。

我们在看看代码上又是如何实现落地的,很多时候我们大脑是可以理解这种模型,但是到了实际编写代码的时候,发现却无从下手,或者是写出来的代码,层次不是那么清晰。看看下面的类图结构。

​ 看这个类图,最好选理解一下java nio编程模型。

  1. 首选是SokectChannel 这个Java Nio 封装的一个基础接口。ServerSokectChannel是处理服务端的一个Channel 继承 SokectChannel 。

  2. NioChannel是es 自己抽象出来的一个 Channel,NioSokectChannelNioServerSokectChannel 都实现了NioChannel 并且是 SokectChannel和ServerCokectChannel 的包装

  3. HttpChannel和HttpServerChannel 这两个接口是es对 http的抽象,这一层主要实现是NioHttpChannel和NioHttpServerChannel ,这两个类是通过继承 NioSokectChannelNioServerSokectChannel 的方式来进行包装的,

  4. Http的请求是如何分发到不同的RestHandle的,这里就需要查看 RestController ,我们看看 RestController的初始化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    public ActionModule(Settings settings, IndexNameExpressionResolver indexNameExpressionResolver,
    IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter,
    ThreadPool threadPool, List<ActionPlugin> actionPlugins, NodeClient nodeClient,
    CircuitBreakerService circuitBreakerService, UsageService usageService) {
    this.settings = settings;
    this.indexNameExpressionResolver = indexNameExpressionResolver;
    this.indexScopedSettings = indexScopedSettings;
    this.clusterSettings = clusterSettings;
    this.settingsFilter = settingsFilter;
    this.actionPlugins = actionPlugins;
    actions = setupActions(actionPlugins);
    actionFilters = setupActionFilters(actionPlugins);
    autoCreateIndex = new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver);
    destructiveOperations = new DestructiveOperations(settings, clusterSettings);
    Set<String> headers = Stream.concat(
    actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()),
    Stream.of(Task.X_OPAQUE_ID)
    ).collect(Collectors.toSet());
    UnaryOperator<RestHandler> restWrapper = null;
    for (ActionPlugin plugin : actionPlugins) {
    UnaryOperator<RestHandler> newRestWrapper = plugin.getRestHandlerWrapper(threadPool.getThreadContext());
    if (newRestWrapper != null) {
    logger.debug("Using REST wrapper from plugin " + plugin.getClass().getName());
    if (restWrapper != null) {
    throw new IllegalArgumentException("Cannot have more than one plugin implementing a REST wrapper");
    }
    restWrapper = newRestWrapper;
    }
    }
    ...
    //这里的 headers和restWrapper 分别是对应 setupActions(actionPlugins) 方法初始化后的数据
    restController = new RestController(headers, restWrapper, nodeClient, circuitBreakerService, usageService);
    }

    5.在RestController 中有一个 dispatchRequest 看名字就 可以猜测这个是请求分发的,但是如何把http请路由到不同handle ? 这里有一个技巧,选中方法用idea 中 Hierarchy (快捷键 Ctrl+Shift+H) 如图

​ 最终我们看到 一个 org.elasticsearch.http.nio.HttpReadWriteHandler handleRequest 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@SuppressWarnings("unchecked")
private void handleRequest(Object msg) {
final HttpPipelinedRequest<FullHttpRequest> pipelinedRequest = (HttpPipelinedRequest<FullHttpRequest>) msg;
FullHttpRequest request = pipelinedRequest.getRequest();

final FullHttpRequest copiedRequest;
try {
copiedRequest = new DefaultFullHttpRequest(
request.protocolVersion(),
request.method(),
request.uri(),
Unpooled.copiedBuffer(request.content()),
request.headers(),
request.trailingHeaders());
} finally {
// As we have copied the buffer, we can release the request
request.release();
}
NioHttpRequest httpRequest = new NioHttpRequest(copiedRequest, pipelinedRequest.getSequence());

if (request.decoderResult().isFailure()) {
Throwable cause = request.decoderResult().cause();
if (cause instanceof Error) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
transport.incomingRequestError(httpRequest, nioHttpChannel, new Exception(cause));
} else {
transport.incomingRequestError(httpRequest, nioHttpChannel, (Exception) cause);
}
} else {
transport.incomingRequest(httpRequest, nioHttpChannel);
}
}

最后 transport.incomingRequest(httpRequest, nioHttpChannel); 这个代码会调用下面的代码 这样 整个流程就窜起来了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
if (request.rawPath().equals("/favicon.ico")) {
handleFavicon(request.method(), request.uri(), channel);
return;
}
try {
tryAllHandlers(request, channel, threadContext);
} catch (Exception e) {
try {
channel.sendResponse(new BytesRestResponse(channel, e));
} catch (Exception inner) {
inner.addSuppressed(e);
logger.error(() ->
new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
}
}
}

代码是窜起来, es 又是如何分配线程,暂时还不清楚