大家看可以看ElasticSearch源码:Rest请求与Master节点处理流程(1)
这个图非常好,下午的讲解代码在各个类和方法之间流转,都体现这个图上
- 一、Master节点处理请求的逻辑
- 1、节点(数据节点)要和主节点进行通讯,需要继承自基类MasterNodeRequest
- 2、Master节点处理来自客户端的请求(以创建索引请求举例)
- (1)首先会找到RestHandler中创建索引的Action(RestCreateIndexAction)
- (2)再执行继承自TransportMasterNodeAction的Action必须实现的masterOperation方法
- 二、RestHander的Action如何映射到TransportMasterNodeAction(还是以创建索引举例)
- 1、首先通过nodeClient执行doExecute()
- 2、创建一个task任务异步执行TransportAction
- 3、TransportMasterNodeAction中doExecute会通过线程池调用子类实现的masterOperation方法
- 4、TransportCreateIndexAction的masterOperation实现会调用createIndexService接口创建索引
一、Master节点处理请求的逻辑
不是所有的请求都需要
Master
节点处理,但是有些请求必须让Master
节点处理,比如创建index,下面的3就是用创建索引做的示例
1、节点(数据节点)要和主节点进行通讯,需要继承自基类MasterNodeRequest
主节点在
Elasticsearch
集群中负责集群的管理和协调工作。当节点需要执行某些操作时,它将创建相应的MasterNodeRequest
实现类的实例,填充请求的参数和数据,并将其发送给主节点。主节点根据不同的MasterNodeRequest
实现类的类型,执行相应的操作
/*** A based request for master based operation.* 在master上*/
public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Request>> extends ActionRequest {public static final TimeValue DEFAULT_MASTER_NODE_TIMEOUT = TimeValue.timeValueSeconds(30);protected TimeValue masterNodeTimeout = DEFAULT_MASTER_NODE_TIMEOUT;protected MasterNodeRequest() {}protected MasterNodeRequest(StreamInput in) throws IOException {super(in);masterNodeTimeout = in.readTimeValue();}@Overridepublic void writeTo(StreamOutput out) throws IOException {super.writeTo(out);out.writeTimeValue(masterNodeTimeout);}/*** A timeout value in case the master has not been discovered yet or disconnected.*/@SuppressWarnings("unchecked")public final Request masterNodeTimeout(TimeValue timeout) {this.masterNodeTimeout = timeout;return (Request) this;}/*** A timeout value in case the master has not been discovered yet or disconnected.*/public final Request masterNodeTimeout(String timeout) {return masterNodeTimeout(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".masterNodeTimeout"));}public final TimeValue masterNodeTimeout() {return this.masterNodeTimeout;}
}
这里有点模糊,后面学到数据节点向主节点请求或者同步什么时,我再挂个链接
2、Master节点处理来自客户端的请求(以创建索引请求举例)
(1)首先会找到RestHandler中创建索引的Action(RestCreateIndexAction)
至于请求如何找到
RestCreateIndexAction
的,可以参考Elasticsearch 8.9启动时构建接收Rest请求的hander过程源码
@ServerlessScope(Scope.PUBLIC)
public class RestCreateIndexAction extends BaseRestHandler {//省略代码 @Overridepublic List<Route> routes() {return List.of(new Route(PUT, "/{index}"));}@Overridepublic String getName() {return "create_index_action";}@Overridepublic RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {CreateIndexRequest createIndexRequest;if (request.getRestApiVersion() == RestApiVersion.V_7) {createIndexRequest = prepareRequestV7(request);} else {createIndexRequest = prepareRequest(request);}return channel -> client.admin().indices().create(createIndexRequest, new RestToXContentListener<>(channel));}//省略代码
}
(2)再执行继承自TransportMasterNodeAction的Action必须实现的masterOperation方法
TransportMasterNodeAction
主要用于处理来自节点的各种管理操作请求,如创建索引、删除索引、更新集群设置等。
当节点(数据节点)发送请求到主节点时,请求会被传递给相应的TransportMasterNodeAction
实现类进行处理。实现类会根据请求的类型,执行相应的操作逻辑,并返回执行结果给主节点。
/*** 需要在主节点上执行的操作的基类。* A base class for operations that needs to be performed on the master node.**/
public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extendsHandledTransportAction<Request, Response>implementsActionWithReservedState<Request> {//省略代码 }
/*** 创建索引操作*/
public class TransportCreateIndexAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {@Overrideprotected void masterOperation(Task task,final CreateIndexRequest request,final ClusterState state,final ActionListener<CreateIndexResponse> listener) {//省略代码createIndexService.createIndex(updateRequest,listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));}
}
二、RestHander的Action如何映射到TransportMasterNodeAction(还是以创建索引举例)
这个场景为主节点和数据节点分离的情况
1、首先通过nodeClient执行doExecute()
client.admin().indices().create
中create
方法调用IndicesAdmin
类的create
方法,再调用execute
方法的入参是 CreateIndexAction.INSTANCE
static class IndicesAdmin implements IndicesAdminClient {@Overridepublic void create(final CreateIndexRequest request, final ActionListener<CreateIndexResponse> listener) {execute(CreateIndexAction.INSTANCE, request, listener);}@Overridepublic <Request extends ActionRequest, Response extends ActionResponse> ActionFuture<Response> execute(ActionType<Response> action,Request request) {return client.execute(action, request);}}
调用的是AbstractClient
的execute
方法
/*** This is the single execution point of *all* clients.* 这是所有客户端的单个执行点。*/@Overridepublic final <Request extends ActionRequest, Response extends ActionResponse> void execute(ActionType<Response> action,Request request,ActionListener<Response> listener) {try {doExecute(action, request, listener);} catch (Exception e) {assert false : new AssertionError(e);listener.onFailure(e);}}
doExecute
方法调用的是NodeClient
类的方法
@Overridepublic <Request extends ActionRequest, Response extends ActionResponse> void doExecute(ActionType<Response> action,Request request,ActionListener<Response> listener) {// Discard the task because the Client interface doesn't use it.try {executeLocally(action, request, listener);} catch (TaskCancelledException | IllegalArgumentException | IllegalStateException e) {listener.onFailure(e);}}/***在本地执行 {@link ActionType},返回用于跟踪它的 {@link Task},并链接 {@link ActionListener}。如果在侦听响应时不需要访问任务,则首选此方法。这是用于实现 {@link 客户端} 接口的方法。*/public <Request extends ActionRequest, Response extends ActionResponse> Task executeLocally(ActionType<Response> action,Request request,ActionListener<Response> listener) {//注册并执行任务return taskManager.registerAndExecute("transport",transportAction(action),request,localConnection,new SafelyWrappedActionListener<>(listener));}
之后调用TaskManager.java
的方法
2、创建一个task任务异步执行TransportAction
public <Request extends ActionRequest, Response extends ActionResponse> Task registerAndExecute(String type,TransportAction<Request, Response> action,Request request,Transport.Connection localConnection,ActionListener<Response> taskListener) { //检查请求是否有父任务,如果有,则注册子连接。final Releasable unregisterChildNode;if (request.getParentTask().isSet()) {unregisterChildNode = registerChildConnection(request.getParentTask().getId(), localConnection);} else {unregisterChildNode = null;}//创建一个新的跟踪上下文try (var ignored = threadPool.getThreadContext().newTraceContext()) {final Task task;//注册一个任务,并捕获可能的取消任务异常。try {task = register(type, action.actionName, request);} catch (TaskCancelledException e) {Releasables.close(unregisterChildNode);throw e;}//执行操作,并在操作完成时调用相应的监听器。action.execute(task, request, new ActionListener<>() {@Overridepublic void onResponse(Response response) {try {release();} finally {taskListener.onResponse(response);}}//根据操作的成功或失败情况,取消子任务并释放资源。@Overridepublic void onFailure(Exception e) {try {if (request.getParentTask().isSet()) {cancelChildLocal(request.getParentTask(), request.getRequestId(), e.toString());}release();} finally {taskListener.onFailure(e);}}@Overridepublic String toString() {return this.getClass().getName() + "{" + taskListener + "}{" + task + "}";}private void release() {Releasables.close(unregisterChildNode, () -> unregister(task));}});//返回任务对象。return task;}}
下面是TransportAction.java
类中的方法
/*** Use this method when the transport action should continue to run in the context of the current task* 当传输操作应继续在当前任务的上下文中运行时,请使用此方法*/public final void execute(Task task, Request request, ActionListener<Response> listener) {final ActionRequestValidationException validationException;//对请求进行验证,如果验证过程中出现异常,则记录错误日志并通知监听器执行失败。try {validationException = request.validate();} catch (Exception e) {assert false : new AssertionError("validating of request [" + request + "] threw exception", e);logger.warn("validating of request [" + request + "] threw exception", e);listener.onFailure(e);return;}if (validationException != null) {listener.onFailure(validationException);return;}//检查是否存在任务且请求需要存储结果,如果满足条件,则创建一个TaskResultStoringActionListener实例,用于在任务完成后将结果存储起来。if (task != null && request.getShouldStoreResult()) {listener = new TaskResultStoringActionListener<>(taskManager, task, listener);}//创建一个请求过滤器链(RequestFilterChain),然后调用proceed方法,将任务、动作名称、请求和监听器传递给过滤器链进行处理。RequestFilterChain<Request, Response> requestFilterChain = new RequestFilterChain<>(this, logger);requestFilterChain.proceed(task, actionName, request, listener);}
@Overridepublic void proceed(Task task, String actionName, Request request, ActionListener<Response> listener) {int i = index.getAndIncrement();try {if (i < this.action.filters.length) {this.action.filters[i].apply(task, actionName, request, listener, this);} else if (i == this.action.filters.length) {//`this.action.doExecute(task, request, listener);` 中`action`对应的是`TransportMasterNodeAction`。this.action.doExecute(task, request, listener);} else {listener.onFailure(new IllegalStateException("proceed was called too many times"));}} catch (Exception e) {logger.trace("Error during transport action execution.", e);listener.onFailure(e);}}
this.action.doExecute(task, request, listener);
中action
对应的是TransportMasterNodeAction
3、TransportMasterNodeAction中doExecute会通过线程池调用子类实现的masterOperation方法
TransportMasterNodeAction
继承HandledTransportAction
,
而HandledTransportAction
继承自TransportAction
public abstract class TransportMasterNodeAction<Request extends MasterNodeRequest<Request>, Response extends ActionResponse> extendsHandledTransportAction<Request, Response>implementsActionWithReservedState<Request> {@Overrideprotected void doExecute(Task task, final Request request, ActionListener<Response> listener) {//省略代码new AsyncSingleAction(task, request, listener).doStart(state);}
}
protected void doStart(ClusterState clusterState) {threadPool.executor(executor).execute(ActionRunnable.wrap(delegate, l -> executeMasterOperation(task, request, clusterState, l)));}
private void executeMasterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)throws Exception {//调用子类实现masterOperation(task, request, state, listener);}
//子类实现
protected abstract void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)throws Exception;
4、TransportCreateIndexAction的masterOperation实现会调用createIndexService接口创建索引
其中创建索引的action
是TransportCreateIndexAction
@Overrideprotected void masterOperation(Task task,final CreateIndexRequest request,final ClusterState state,final ActionListener<CreateIndexResponse> listener) {createIndexService.createIndex(updateRequest,listener.map(response -> new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));
}
之后调用createIndexService.createIndex
创建索引