elasticsearch8检索流程
Healthy Mind Lv3

​ 在es 中涉及到操作最多就是检索,接下来看看检索的源码是怎么个流程,在es Rest请求中我们看到了 **org.elasticsearch.rest.RestController#dispatchRequest(org.elasticsearch.rest.RestRequest, org.elasticsearch.rest.RestChannel, org.elasticsearch.rest.RestHandler) **这个方法 ,这是 es http 请求的总的一个控制器,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void dispatchRequest(RestRequest request, RestChannel channel, RestHandler handler) throws Exception {
...
RestChannel responseChannel = channel;
try {
if (handler.canTripCircuitBreaker()) {
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
} else {
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
}
// iff we could reserve bytes for the request we need to send the response also over this channel
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
//请求处理主要方法
handler.handleRequest(request, responseChannel, client);
} catch (Exception e) {
responseChannel.sendResponse(new BytesRestResponse(responseChannel, e));
}
}

ctrl+alt+B 查看 handleRequest 实现类有哪些,这里有很多实现,但是大部分都是 test 可以看出都是测试用例,所以不是我们需要的,我们 需要看看其他的,可以猜 base 开头的 一般都是 基础实现 org.elasticsearch.rest.BaseRestHandler#handleRequest 当然有时候一次不一定正确。

这个方法代码都有注释,prepareRequest(request, client) 主要方法就是这个了,看看下面都有哪些实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public final void handleRequest(RestRequest request, RestChannel channel, NodeClient client) throws Exception {
// prepare the request for execution; has the side effect of touching the request parameters
final RestChannelConsumer action = prepareRequest(request, client);

// validate unconsumed params, but we must exclude params used to format the response
// use a sorted set so the unconsumed parameters appear in a reliable sorted order
final SortedSet<String> unconsumedParams =
request.unconsumedParams().stream().filter(p -> !responseParams().contains(p)).collect(Collectors.toCollection(TreeSet::new));

// validate the non-response params
if (!unconsumedParams.isEmpty()) {
final Set<String> candidateParams = new HashSet<>();
candidateParams.addAll(request.consumedParams());
candidateParams.addAll(responseParams());
throw new IllegalArgumentException(unrecognized(request, unconsumedParams, candidateParams, "parameter"));
}

if (request.hasContent() && request.isContentConsumed() == false) {
throw new IllegalArgumentException("request [" + request.method() + " " + request.path() + "] does not support having a body");
}

usageCount.increment();
// execute the action
action.accept(channel);
}

这下面有239个实现,懵逼了有没有,这里我们需要查看RestSearchAction 类,这些类的命名让我们很容就和es的操作进行关联的。

查看如下方法org.elasticsearch.rest.action.search.RestSearchAction#prepareRequest

这里是返回的是一个 RestChannelConsumer 其实和 java.util.function.Consumer 一样的功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
SearchRequest searchRequest = new SearchRequest();
/*
* We have to pull out the call to `source().size(size)` because
* _update_by_query and _delete_by_query uses this same parsing
* path but sets a different variable when it sees the `size`
* url parameter.
*
* Note that we can't use `searchRequest.source()::size` because
* `searchRequest.source()` is null right now. We don't have to
* guard against it being null in the IntConsumer because it can't
* be null later. If that is confusing to you then you are in good
* company.
*/
IntConsumer setSize = size -> searchRequest.source().size(size);
request.withContentOrSourceParamParserOrNull(parser ->
parseSearchRequest(searchRequest, request, parser, setSize));

return channel -> {
// RestStatusToXContentListener 是一个包装类,添加 监听
//es 中有很多这样的包装 类 例如 org.elasticsearch.action.ActionListener 类
RestStatusToXContentListener<SearchResponse> listener = new RestStatusToXContentListener<>(channel);
HttpChannelTaskHandler.INSTANCE.execute(client, request.getHttpChannel(), searchRequest, SearchAction.INSTANCE, listener);
};
}

类似 org.elasticsearch.action.ActionListener 类的使用方式 可以把监听和 源分离,看看例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static <Response> ActionListener<Response> wrap(CheckedConsumer<Response, ? extends Exception> onResponse,
Consumer<Exception> onFailure) {
return new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
onResponse.accept(response);
} catch (Exception e) {
onFailure(e);
}
}

@Override
public void onFailure(Exception e) {
onFailure.accept(e);
}
};
}

下面我主要看看 HttpChannelTaskHandler.INSTANCE.execute 这行代码,这个执行任务的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<Response extends ActionResponse> void execute(NodeClient client, HttpChannel httpChannel, ActionRequest request,
ActionType<Response> actionType, ActionListener<Response> listener) {

CloseListener closeListener = httpChannels.computeIfAbsent(httpChannel, channel -> new CloseListener(client));
//任务Holder
TaskHolder taskHolder = new TaskHolder();
//执行任务
Task task = client.executeLocally(actionType, request,
new ActionListener<>() {
@Override
public void onResponse(Response searchResponse) {
try {
closeListener.unregisterTask(taskHolder);
} finally {
listener.onResponse(searchResponse);
}
}

@Override
public void onFailure(Exception e) {
try {
closeListener.unregisterTask(taskHolder);
} finally {
listener.onFailure(e);
}
}
});
// 注册任务 的关闭监听
closeListener.registerTask(taskHolder, new TaskId(client.getLocalNodeId(), task.getId()));
closeListener.maybeRegisterChannel(httpChannel);
}

查看这个executeLocally 的实现,NodeClient

查看 NodeClient executeLocally 方法

1
2
3
4
5
6
7
public <    Request extends ActionRequest,
Response extends ActionResponse
> Task executeLocally(ActionType<Response> action, Request request, ActionListener<Response> listener) {
return taskManager.registerAndExecute("transport", transportAction(action), request,
(t, r) -> listener.onResponse(r), (t, e) -> listener.onFailure(e));
}

在 NodeClient 内部有一个 taskManager 任务管理器,注册并执行任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public <Request extends ActionRequest, Response extends ActionResponse>
Task registerAndExecute(String type, TransportAction<Request, Response> action, Request request,
BiConsumer<Task, Response> onResponse, BiConsumer<Task, Exception> onFailure) {
//注册任务 //这里的 action.actionName
Task task = register(type, action.actionName, request);
// NOTE: ActionListener cannot infer Response, see https://bugs.openjdk.java.net/browse/JDK-8203195
//执行任务
action.execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
try {
//取消注册
unregister(task);
} finally {
onResponse.accept(task, response);
}
}

@Override
public void onFailure(Exception e) {
try {
unregister(task);
} finally {
onFailure.accept(task, e);
}
}
});
return task;
}

在往下面看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final void execute(Task task, Request request, ActionListener<Response> listener) {
//各种校验
ActionRequestValidationException validationException = request.validate();
if (validationException != null) {
listener.onFailure(validationException);
return;
}

if (task != null && request.getShouldStoreResult()) {
listener = new TaskResultStoringActionListener<>(taskManager, task, listener);
}
//创建 请求Filter连 类似 servlet Filter,这里是用数组实现的,就不在细看了
RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);

requestFilterChain.proceed(task, actionName, request, listener);
}

在看 proceed方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {
int i = index.getAndIncrement();
try {
if (i < this.action.filters.length) {
this.action.filters[i].apply(task, actionName, request, listener, this);
//走完前面的filter 后最后执行 doExecute方法
} else if (i == this.action.filters.length) {
this.action.doExecute(task, request, listener);
} else {
listener.onFailure(new IllegalStateException("proceed was called too many times"));
}
} catch(Exception e) {
logger.trace("Error during transport action execution.", e);
listener.onFailure(e);
}
}

我们就直接看 org.elasticsearch.action.search.TransportSearchAction#doExecute 这里是模板方法,

代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Override
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
...
ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
if (source != searchRequest.source()) {
// only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
// situations when source is rewritten to null due to a bug
searchRequest.source(source);
}
final ClusterState clusterState = clusterService.state();
final Map<String, OriginalIndices> remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(),
searchRequest.indices());
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
//判断所以是否在远程, 可以猜测,上面两行代码就是判断 索引是否在 当前节点,
//如是 就执行 executeLocalSearch
if (remoteClusterIndices.isEmpty()) {
executeLocalSearch(task, timeProvider, searchRequest, localIndices, clusterState, listener);
} else {
if (shouldMinimizeRoundtrips(searchRequest)) {
ccsRemoteReduce(searchRequest, localIndices, remoteClusterIndices, timeProvider, searchService::createReduceContext,
remoteClusterService, threadPool, listener,
(r, l) -> executeLocalSearch(task, timeProvider, r, localIndices, clusterState, l));
} else {
AtomicInteger skippedClusters = new AtomicInteger(0);
//远程收集检索的分片,
collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),
skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,
//添加监听,当获取分片信息完成后执行监听的内容,进行查询。
ActionListener.wrap(
searchShardsResponses -> {
List<SearchShardIterator> remoteShardIterators = new ArrayList<>();
Map<String, AliasFilter> remoteAliasFilters = new HashMap<>();
BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(
searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
int localClusters = localIndices == null ? 0 : 1;
int totalClusters = remoteClusterIndices.size() + localClusters;
int successfulClusters = searchShardsResponses.size() + localClusters;
//执行查询 executeLocalSearch 最终也是执行的 这个方法
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices,
remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener,
new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()));
},
listener::onFailure));
}
}
}, listener::onFailure);
if (searchRequest.source() == null) {
rewriteListener.onResponse(searchRequest.source());
} else {
Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener);
}
}

在这里 ActionListener 这个接口 在 es里用的比较普遍,很多程序都是通过包装原有监听来进行添加额外操作的,可以学习一下,平时我们对一个 函数添加runBefore 操作的时候大都是写在 已经搭好的 算法中,这里ActionListener 是使用的装饰模式来进行增强的,源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static <Response> ActionListener<Response> runBefore(ActionListener<Response> delegate, CheckedRunnable<?> runBefore) {
return new ActionListener<>() {
@Override
public void onResponse(Response response) {
try {
runBefore.run();
} catch (Exception ex) {
delegate.onFailure(ex);
return;
}
delegate.onResponse(response);
}

@Override
public void onFailure(Exception e) {
try {
runBefore.run();
} catch (Exception ex) {
e.addSuppressed(ex);
}
delegate.onFailure(e);
}
};
}

接下来的流程代码比较简单就用时序图表示

最终都是通过这里把数据返回给客户端,或者是发送请求到其他节点查询数据。底层有Nio实现和netty 不同的实现方式。