elasticsearch源码分析-04集群状态发布

集群状态发布

cluster模块封装了在集群层面执行的任务,如集群健康、集群级元信息管理、分片分配给节点、节点管理等。集群任务执行之后可能会产生新的集群状态,如果产生新的集群状态主节点会将集群状态广播给其他节点。

集群状态封装在clusterState中,支持增量同步

提交集群任务的主要时机有以下几种:

  • 索引的创建、删除、打开、关闭
  • 索引模板、映射、别名的变化
  • gateway模块发布选举出来的集群状态
  • 快照
  • 分片分配
  • 集群节点变化等

提交集群任务入口在ClusterService的submitStateUpdateTask方法,第一个参数是事件来源,第二个参数是要执行的具体任务

public <T extends ClusterStateTaskConfig & ClusterStateTaskExecutor<T> & ClusterStateTaskListener>void submitStateUpdateTask(String source, T updateTask) {submitStateUpdateTask(source, updateTask, updateTask, updateTask, updateTask);}public <T> void submitStateUpdateTask(String source, T task,ClusterStateTaskConfig config,ClusterStateTaskExecutor<T> executor,ClusterStateTaskListener listener) {submitStateUpdateTasks(source, Collections.singletonMap(task, listener), config, executor);}public <T> void submitStateUpdateTasks(final String source,final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,final ClusterStateTaskExecutor<T> executor) {masterService.submitStateUpdateTasks(source, tasks, config, executor);}

最有代表性的任务是ClusterStateUpdateTask,它实现了ClusterStateTaskConfig、ClusterStateTaskExecutor

public abstract class ClusterStateUpdateTaskimplements ClusterStateTaskConfig, ClusterStateTaskExecutor<ClusterStateUpdateTask>, ClusterStateTaskListener {

在这里插入图片描述
ClusterStateTaskConfig包含了任务的配置信息和优先级

TimeValue timeout();
Priority priority();

ClusterStateTaskExecutor主要是定义要执行的任务,最主要的方法就是execute方法

 ClusterTasksResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception;

任务执行时会传入当前集群状态,任务运行过程中如果产生新的集群状态就返回新的集群状态,如果没有就返回原来的集群状态

ClusterStateTaskListener主要是提交任务后的回调处理

/*** A callback called when execute fails.*/void onFailure(String source, Exception e);/*** called when the task was rejected because the local node is no longer master.* Used only for tasks submitted to {@link MasterService}.*/default void onNoLongerMaster(String source) {onFailure(source, new NotMasterException("no longer master. source: [" + source + "]"));}/*** Called when the result of the {@link ClusterStateTaskExecutor#execute(ClusterState, List)} have been processed* properly by all listeners.*/default void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {}

MasterService主要负责集群任务管理和运行,只有主节点会提交集群任务到内部队列,并运行队列中的任务

public <T> void submitStateUpdateTasks(final String source,final Map<T, ClusterStateTaskListener> tasks, final ClusterStateTaskConfig config,final ClusterStateTaskExecutor<T> executor) {if (!lifecycle.started()) {return;}final ThreadContext threadContext = threadPool.getThreadContext();final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(true);try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {threadContext.markAsSystemContext();//封装任务List<Batcher.UpdateTask> safeTasks = tasks.entrySet().stream().map(e -> taskBatcher.new UpdateTask(config.priority(), source, e.getKey(), safe(e.getValue(), supplier), executor)).collect(Collectors.toList());//提交任务taskBatcher.submitTasks(safeTasks, config.timeout());} catch (EsRejectedExecutionException e) {// ignore cases where we are shutting down..., there is really nothing interesting// to be done here...if (!lifecycle.stoppedOrClosed()) {throw e;}}}
public void submitTasks(List<? extends BatchedTask> tasks, @Nullable TimeValue timeout) throws EsRejectedExecutionException {if (tasks.isEmpty()) {return;}final BatchedTask firstTask = tasks.get(0);assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey) :"tasks submitted in a batch should share the same batching key: " + tasks;// convert to an identity map to check for dups based on task identity//根据任务标识检查重复数据final Map<Object, BatchedTask> tasksIdentity = tasks.stream().collect(Collectors.toMap(BatchedTask::getTask,Function.identity(),(a, b) -> { throw new IllegalStateException("cannot add duplicate task: " + a); },IdentityHashMap::new));synchronized (tasksPerBatchingKey) {//添加相同batchingKey的任务,返回已存在batchingKey的任务LinkedHashSet<BatchedTask> existingTasks = tasksPerBatchingKey.computeIfAbsent(firstTask.batchingKey,k -> new LinkedHashSet<>(tasks.size()));//检查是否存在相同batchingKey的任务for (BatchedTask existing : existingTasks) {// check that there won't be two tasks with the same identity for the same batching keyBatchedTask duplicateTask = tasksIdentity.get(existing.getTask());if (duplicateTask != null) {throw new IllegalStateException("task [" + duplicateTask.describeTasks(Collections.singletonList(existing)) + "] with source [" + duplicateTask.source + "] is already queued");}}existingTasks.addAll(tasks);}//执行任务if (timeout != null) {threadExecutor.execute(firstTask, timeout, () -> onTimeoutInternal(tasks, timeout));} else {threadExecutor.execute(firstTask);}}

这里有去重逻辑,拥有相同ClusterStateTaskExecutor对象实例的任务只会执行一次,然后对于其他相同的实例直接赋值相同的执行结果。区分重复任务的方式时通过定义的任务本身,去重的方式不是将重复的数据删除,而是在执行完任务后赋予重复任务相同的结果。

ClusterStateTaskExecutor相同有两种情况可能是提交的任务本身重复,还有就是之前提交的任务已存在,但是尚未执行此时提交相同的任务就会保存到对应的列表中,只会执行一次

任务会被封装到UpdateTask中

class UpdateTask extends BatchedTask {final ClusterStateTaskListener listener;UpdateTask(Priority priority, String source, Object task, ClusterStateTaskListener listener,ClusterStateTaskExecutor<?> executor) {super(priority, source, executor, task);this.listener = listener;}@Overridepublic String describeTasks(List<? extends BatchedTask> tasks) {return ((ClusterStateTaskExecutor<Object>) batchingKey).describeTasks(tasks.stream().map(BatchedTask::getTask).collect(Collectors.toList()));}}

提交到线程池运行调用run方法

@Override
public void run() {//运行还没处理的任务runIfNotProcessed(this);
}
void runIfNotProcessed(BatchedTask updateTask) {//具有相同batching key的任务只会执行一次if (updateTask.processed.get() == false) {final List<BatchedTask> toExecute = new ArrayList<>();final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();synchronized (tasksPerBatchingKey) {//获取任务列表LinkedHashSet<BatchedTask> pending = tasksPerBatchingKey.remove(updateTask.batchingKey);if (pending != null) {for (BatchedTask task : pending) {if (task.processed.getAndSet(true) == false) {logger.trace("will process {}", task);//构建要执行的任务列表toExecute.add(task);processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);} else {logger.trace("skipping {}, already processed", task);}}}}if (toExecute.isEmpty() == false) {final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> {String tasks = updateTask.describeTasks(entry.getValue());return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");//执行任务run(updateTask.batchingKey, toExecute, tasksSummary);}}}

执行任务并发布集群状态的逻辑在MasterService中

@Overrideprotected void run(Object batchingKey, List<? extends BatchedTask> tasks, String tasksSummary) {ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey;List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;//运行任务,并发布集群状态runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummary));}
private void runTasks(TaskInputs taskInputs) {final String summary = taskInputs.summary;if (!lifecycle.started()) {logger.debug("processing [{}]: ignoring, master service not started", summary);return;}logger.debug("executing cluster state update for [{}]", summary);//之前集群状态final ClusterState previousClusterState = state();//只在主节点执行if (!previousClusterState.nodes().isLocalNodeElectedMaster() && taskInputs.runOnlyWhenMaster()) {logger.debug("failing [{}]: local node is no longer master", summary);taskInputs.onNoLongerMaster();return;}final long computationStartTime = threadPool.relativeTimeInMillis();//执行task任务生成新的集群状态final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState);taskOutputs.notifyFailedTasks();final TimeValue computationTime = getTimeSince(computationStartTime);logExecutionTime(computationTime, "compute cluster state update", summary);if (taskOutputs.clusterStateUnchanged()) {final long notificationStartTime = threadPool.relativeTimeInMillis();taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();final TimeValue executionTime = getTimeSince(notificationStartTime);logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary);} else {//集群状态发生改变final ClusterState newClusterState = taskOutputs.newClusterState;if (logger.isTraceEnabled()) {logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState);} else {logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary);}final long publicationStartTime = threadPool.relativeTimeInMillis();try {ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);// new cluster state, notify all listenersfinal DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {String nodesDeltaSummary = nodesDelta.shortSummary();if (nodesDeltaSummary.length() > 0) {logger.info("{}, term: {}, version: {}, delta: {}",summary, newClusterState.term(), newClusterState.version(), nodesDeltaSummary);}}logger.debug("publishing cluster state version [{}]", newClusterState.version());//发布集群状态publish(clusterChangedEvent, taskOutputs, publicationStartTime);} catch (Exception e) {handleException(summary, publicationStartTime, newClusterState, e);}}}

执行方法前判断是不是主节点因为只有主节点可以运行集群任务,根据执行任务前的集群状态执行任务生成新的集群状态

执行任务获取任务执行结果,并生成新的集群状态

private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState) {//执行提交的任务,并且返回新的集群状态ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, previousClusterState);//根据分配分片结果生成新的集群状态ClusterState newClusterState = patchVersions(previousClusterState, clusterTasksResult);return new TaskOutputs(taskInputs, previousClusterState, newClusterState, getNonFailedTasks(taskInputs, clusterTasksResult),clusterTasksResult.executionResults);}

获取任务列表,调用executor的execute方法

private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterState previousClusterState) {ClusterTasksResult<Object> clusterTasksResult;try {List<Object> inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());//执行任务,并返回新的集群状态clusterTasksResult = taskInputs.executor.execute(previousClusterState, inputs);if (previousClusterState != clusterTasksResult.resultingState &&previousClusterState.nodes().isLocalNodeElectedMaster() &&(clusterTasksResult.resultingState.nodes().isLocalNodeElectedMaster() == false)) {throw new AssertionError("update task submitted to MasterService cannot remove master");}} catch (Exception e) {......clusterTasksResult = ClusterTasksResult.builder().failures(taskInputs.updateTasks.stream().map(updateTask -> updateTask.task)::iterator, e).build(previousClusterState);}......return clusterTasksResult;}

这里我们以gateway恢复集群状态为例

ClusterTasksResult<T> execute(ClusterState currentState, List<T> tasks) throws Exception;
@Overridepublic final ClusterTasksResult<ClusterStateUpdateTask> execute(ClusterState currentState, List<ClusterStateUpdateTask> tasks)throws Exception {//执行集群状态变更task,并且返回执行之后的集群状态结果ClusterState result = execute(currentState);return ClusterTasksResult.<ClusterStateUpdateTask>builder().successes(tasks).build(result);}@Overridepublic void onSuccess(final ClusterState recoveredState) {logger.trace("successful state recovery, importing cluster state...");clusterService.submitStateUpdateTask("local-gateway-elected-state",new RecoverStateUpdateTask() {@Overridepublic ClusterState execute(final ClusterState currentState) {final ClusterState updatedState = ClusterStateUpdaters.mixCurrentStateAndRecoveredState(currentState, recoveredState);return super.execute(ClusterStateUpdaters.recoverClusterBlocks(updatedState));}});}@Overridepublic ClusterState execute(final ClusterState currentState) {if (currentState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {logger.debug("cluster is already recovered");return currentState;}//状态信息恢复完成final ClusterState newState = Function.<ClusterState>identity().andThen(ClusterStateUpdaters::updateRoutingTable).andThen(ClusterStateUpdaters::removeStateNotRecoveredBlock).apply(currentState);//开始分配分片return allocationService.reroute(newState, "state recovered");}

生成新的集群状态,开始分配分片,根据之前的集群状态和新生成的结果构造新的集群状态

private ClusterState patchVersions(ClusterState previousClusterState, ClusterTasksResult<?> executionResult) {//新的集群状态ClusterState newClusterState = executionResult.resultingState;if (previousClusterState != newClusterState) {// only the master controls the version numbers//生成新的集群状态版本号,递增的Builder builder = incrementVersion(newClusterState);//路由表发生了改变,也就是分片信息发送了改变,分片-nodeif (previousClusterState.routingTable() != newClusterState.routingTable()) {builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1).build());}//集群元数据发生了改变if (previousClusterState.metadata() != newClusterState.metadata()) {builder.metadata(Metadata.builder(newClusterState.metadata()).version(newClusterState.metadata().version() + 1));}newClusterState = builder.build();}return newClusterState;}

回到MasterService的runTasks方法中新的集群状态已经生成并返回,然后判断集群状态和之前的集群状态是否相同,如果发生变化则将进入集群状态发布阶段,将最新的集群状态广播到所有节点

//发布集群状态
publish(clusterChangedEvent, taskOutputs, publicationStartTime);
protected void publish(ClusterChangedEvent clusterChangedEvent, TaskOutputs taskOutputs, long startTimeMillis) {final PlainActionFuture<Void> fut = new PlainActionFuture<Void>() {@Overrideprotected boolean blockingAllowed() {return isMasterUpdateThread() || super.blockingAllowed();}};//发布集群状态clusterStatePublisher.publish(clusterChangedEvent, fut, taskOutputs.createAckListener(threadPool, clusterChangedEvent.state()));// indefinitely wait for publication to complete//无限期等待发布完成try {FutureUtils.get(fut);onPublicationSuccess(clusterChangedEvent, taskOutputs);} catch (Exception e) {onPublicationFailed(clusterChangedEvent, taskOutputs, startTimeMillis, e);}}
@Overridepublic void publish(ClusterChangedEvent clusterChangedEvent, ActionListener<Void> publishListener, AckListener ackListener) {//新的集群状态ClusterState newState = clusterChangedEvent.state();assert newState.getNodes().isLocalNodeElectedMaster() : "Shouldn't publish state when not master " + clusterChangedEvent.source();try {// state got changed locally (maybe because another master published to us)if (clusterChangedEvent.previousState() != this.committedState.get()) {throw new FailedToCommitClusterStateException("state was mutated while calculating new CS update");}pendingStatesQueue.addPending(newState);//发布集群状态publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);} catch (FailedToCommitClusterStateException t) {// cluster service logs a WARN messagelogger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])",newState.version(), electMaster.minimumMasterNodes());synchronized (stateMutex) {pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("failed to publish cluster state"));rejoin("zen-disco-failed-to-publish");}publishListener.onFailure(t);return;}final DiscoveryNode localNode = newState.getNodes().getLocalNode();final AtomicBoolean processedOrFailed = new AtomicBoolean();pendingStatesQueue.markAsCommitted(newState.stateUUID(),new PendingClusterStatesQueue.StateProcessedListener() {@Overridepublic void onNewClusterStateProcessed() {processedOrFailed.set(true);publishListener.onResponse(null);ackListener.onNodeAck(localNode, null);}@Overridepublic void onNewClusterStateFailed(Exception e) {processedOrFailed.set(true);publishListener.onFailure(e);ackListener.onNodeAck(localNode, e);logger.warn(() -> new ParameterizedMessage("failed while applying cluster state locally [{}]", clusterChangedEvent.source()), e);}});synchronized (stateMutex) {if (clusterChangedEvent.previousState() != this.committedState.get()) {publishListener.onFailure(new FailedToCommitClusterStateException("local state was mutated while CS update was published to other nodes"));return;}//经过二阶段提交状态已经发布到了集群,但不能保证所有节点都成功了,下面处理提交后的集群状态boolean sentToApplier = processNextCommittedClusterState("master " + newState.nodes().getMasterNode() +" committed version [" + newState.version() + "] source [" + clusterChangedEvent.source() + "]");if (sentToApplier == false && processedOrFailed.get() == false) {assert false : "cluster state published locally neither processed nor failed: " + newState;logger.warn("cluster state with version [{}] that is published locally has neither been processed nor failed",newState.version());publishListener.onFailure(new FailedToCommitClusterStateException("cluster state that is published locally has neither " +"been processed nor failed"));}}}

首先准备发送集群状态的目标节点列表,剔除本节点。构建增量发布或全量发布集群状态,然后执行序列化并压缩,以便将状态发布出去

public void publish(final ClusterChangedEvent clusterChangedEvent, final int minMasterNodes,final Discovery.AckListener ackListener) throws FailedToCommitClusterStateException {final DiscoveryNodes nodes;final SendingController sendingController;final Set<DiscoveryNode> nodesToPublishTo;final Map<Version, BytesReference> serializedStates;final Map<Version, BytesReference> serializedDiffs;final boolean sendFullVersion;try {//需要发送目的节点nodes = clusterChangedEvent.state().nodes();nodesToPublishTo = new HashSet<>(nodes.getSize());DiscoveryNode localNode = nodes.getLocalNode();final int totalMasterNodes = nodes.getMasterNodes().size();for (final DiscoveryNode node : nodes) {if (node.equals(localNode) == false) {nodesToPublishTo.add(node);}}sendFullVersion = !discoverySettings.getPublishDiff() || clusterChangedEvent.previousState() == null;//全量状态serializedStates = new HashMap<>();//增量状态serializedDiffs = new HashMap<>();// we build these early as a best effort not to commit in the case of error.// sadly this is not water tight as it may that a failed diff based publishing to a node// will cause a full serialization based on an older version, which may fail after the// change has been committed.//构建序列化后的结果buildDiffAndSerializeStates(clusterChangedEvent.state(), clusterChangedEvent.previousState(),nodesToPublishTo, sendFullVersion, serializedStates, serializedDiffs);//发布状态返回结果处理final BlockingClusterStatePublishResponseHandler publishResponseHandler =new AckClusterStatePublishResponseHandler(nodesToPublishTo, ackListener);sendingController = new SendingController(clusterChangedEvent.state(), minMasterNodes,totalMasterNodes, publishResponseHandler);} catch (Exception e) {throw new FailedToCommitClusterStateException("unexpected error while preparing to publish", e);}try {//发布innerPublish(clusterChangedEvent, nodesToPublishTo, sendingController, ackListener, sendFullVersion, serializedStates,serializedDiffs);} catch (FailedToCommitClusterStateException t) {throw t;} catch (Exception e) {// try to fail committing, in cause it's still on goingif (sendingController.markAsFailed("unexpected error", e)) {// signal the change should be rejectedthrow new FailedToCommitClusterStateException("unexpected error", e);} else {throw e;}}}

全量状态保存在serializedStates,增量状态保存在serializedDiffs。每个集群状态都有自己为一个版本好,在发布集群状态时允许相邻版本好之间只发送增量内容

构造需要发送的状态,如果上次发布集群状态的节点不存在或设置了全量发送配置,则构建全量状态否则构建增量状态然后进行序列化并压缩

private void buildDiffAndSerializeStates(ClusterState clusterState, ClusterState previousState, Set<DiscoveryNode> nodesToPublishTo,boolean sendFullVersion, Map<Version, BytesReference> serializedStates,Map<Version, BytesReference> serializedDiffs) {Diff<ClusterState> diff = null;for (final DiscoveryNode node : nodesToPublishTo) {try {//发送全量if (sendFullVersion || !previousState.nodes().nodeExists(node)) {// will send a full referenceif (serializedStates.containsKey(node.getVersion()) == false) {serializedStates.put(node.getVersion(), serializeFullClusterState(clusterState, node.getVersion()));}} else {//发送增量// will send a diffif (diff == null) {diff = clusterState.diff(previousState);}if (serializedDiffs.containsKey(node.getVersion()) == false) {serializedDiffs.put(node.getVersion(), serializeDiffClusterState(diff, node.getVersion()));}}} catch (IOException e) {throw new ElasticsearchException("failed to serialize cluster_state for publishing to node {}", e, node);}}}

es使用二阶段提交来实现状态发布,第一步是push及先将状态数据发送到node节点,但不应用,如果得到超过半数的节点的返回确认,则执行第二步commit及发送提交请求,二阶段提交不能保证节点收到commit请求后可以正确应用,也就是它只能保证发了commit请求,但是无法保证单个节点上的状态应用是成功还是失败的

  • push阶段发送集群状态数据
private void innerPublish(final ClusterChangedEvent clusterChangedEvent, final Set<DiscoveryNode> nodesToPublishTo,final SendingController sendingController, final Discovery.AckListener ackListener,final boolean sendFullVersion, final Map<Version, BytesReference> serializedStates,final Map<Version, BytesReference> serializedDiffs) {final ClusterState clusterState = clusterChangedEvent.state();final ClusterState previousState = clusterChangedEvent.previousState();//发布超时时间final TimeValue publishTimeout = discoverySettings.getPublishTimeout();//发布起始时间final long publishingStartInNanos = System.nanoTime();//遍历节点异步发送全量或增量状态数据for (final DiscoveryNode node : nodesToPublishTo) {// try and serialize the cluster state once (or per version), so we don't serialize it// per node when we send it over the wire, compress it while we are at it...// we don't send full version if node didn't exist in the previous version of cluster state//发生全量状态if (sendFullVersion || !previousState.nodes().nodeExists(node)) {sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController);} else {//发布增量状态sendClusterStateDiff(clusterState, serializedDiffs, serializedStates, node, publishTimeout, sendingController);}}//等待提交,等待第一阶段完成收到足够的响应或达到了超时时间sendingController.waitForCommit(discoverySettings.getCommitTimeout());final long commitTime = System.nanoTime() - publishingStartInNanos;ackListener.onCommit(TimeValue.timeValueNanos(commitTime));try {long timeLeftInNanos = Math.max(0, publishTimeout.nanos() - commitTime);final BlockingClusterStatePublishResponseHandler publishResponseHandler = sendingController.getPublishResponseHandler();sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(TimeValue.timeValueNanos(timeLeftInNanos)));if (sendingController.getPublishingTimedOut()) {DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes();// everyone may have just respondedif (pendingNodes.length > 0) {logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})",clusterState.version(), publishTimeout, pendingNodes);}}// The failure is logged under debug when a sending failed. we now log a summary.Set<DiscoveryNode> failedNodes = publishResponseHandler.getFailedNodes();if (failedNodes.isEmpty() == false) {logger.warn("publishing cluster state with version [{}] failed for the following nodes: [{}]",clusterChangedEvent.state().version(), failedNodes);}} catch (InterruptedException e) {// ignore & restore interruptThread.currentThread().interrupt();}}

无论是发送全量数据还是发送增量数据最终都会调用到sendClusterStateToNode方法

private void sendClusterStateToNode(final ClusterState clusterState, BytesReference bytes,final DiscoveryNode node,final TimeValue publishTimeout,final SendingController sendingController,final boolean sendDiffs, final Map<Version, BytesReference> serializedStates) {try {//调用底层的传输层发送transportService.sendRequest(node, SEND_ACTION_NAME,new BytesTransportRequest(bytes, node.getVersion()),stateRequestOptions,new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {@Overridepublic void handleResponse(TransportResponse.Empty response) {//发布超时if (sendingController.getPublishingTimedOut()) {logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node,clusterState.version(), publishTimeout);}//检查收到的响应是否过半,然后执行commitsendingController.onNodeSendAck(node);}@Overridepublic void handleException(TransportException exp) {if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) {logger.debug("resending full cluster state to node {} reason {}", node, exp.getDetailedMessage());sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController);} else {logger.debug(() -> new ParameterizedMessage("failed to send cluster state to {}", node), exp);sendingController.onNodeSendFailed(node, exp);}}});} catch (Exception e) {logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", node), e);sendingController.onNodeSendFailed(node, e);}}

调用transportService的sendRequest方法异步发送数据,rpc请求为internal:discovery/zen/publish/send对应节点注册的处理器为SendClusterStateRequestHandler

//发送处理transportService.registerRequestHandler(SEND_ACTION_NAME, ThreadPool.Names.SAME, false, false, BytesTransportRequest::new,new SendClusterStateRequestHandler());private class SendClusterStateRequestHandler implements TransportRequestHandler<BytesTransportRequest> {@Overridepublic void messageReceived(BytesTransportRequest request, final TransportChannel channel, Task task) throws Exception {//处理状态变更请求handleIncomingClusterStateRequest(request, channel);}}protected void handleIncomingClusterStateRequest(BytesTransportRequest request, TransportChannel channel) throws IOException {Compressor compressor = CompressorFactory.compressor(request.bytes());StreamInput in = request.bytes().streamInput();final ClusterState incomingState;synchronized (lastSeenClusterStateMutex) {try {if (compressor != null) {in = compressor.streamInput(in);}in = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry);in.setVersion(request.version());// If true we received full cluster state - otherwise diffs//true:全量状态,false:增量if (in.readBoolean()) {incomingState = ClusterState.readFrom(in, transportService.getLocalNode());fullClusterStateReceivedCount.incrementAndGet();logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(),request.bytes().length());} else if (lastSeenClusterState != null) {Diff<ClusterState> diff = ClusterState.readDiffFrom(in, lastSeenClusterState.nodes().getLocalNode());incomingState = diff.apply(lastSeenClusterState);compatibleClusterStateDiffReceivedCount.incrementAndGet();logger.debug("received diff cluster state version [{}] with uuid [{}], diff size [{}]",incomingState.version(), incomingState.stateUUID(), request.bytes().length());} else {logger.debug("received diff for but don't have any local cluster state - requesting full state");throw new IncompatibleClusterStateVersionException("have no local cluster state");}} catch (IncompatibleClusterStateVersionException e) {incompatibleClusterStateDiffReceivedCount.incrementAndGet();throw e;} catch (Exception e) {logger.warn("unexpected error while deserializing an incoming cluster state", e);throw e;} finally {IOUtils.close(in);}//触发监听器incomingClusterStateListener.onIncomingClusterState(incomingState);lastSeenClusterState = incomingState;}//发送发回空结果channel.sendResponse(TransportResponse.Empty.INSTANCE);}

保存集群状态,然后返回空结果

继续回到主节点发送数据的回调函数中,检查响应是否足够

public synchronized void onNodeSendAck(DiscoveryNode node) {if (committed) {//提交状态assert sendAckedBeforeCommit.isEmpty();sendCommitToNode(node, clusterState, this);} else if (committedOrFailed()) {logger.trace("ignoring ack from [{}] for cluster state version [{}]. already failed", node, clusterState.version());} else {// we're still waitingsendAckedBeforeCommit.add(node);if (node.isMasterNode()) {checkForCommitOrFailIfNoPending(node);}}}//检查返回ack的节点数,如果超过了半数就执行commitprivate synchronized void checkForCommitOrFailIfNoPending(DiscoveryNode masterNode) {logger.trace("master node {} acked cluster state version [{}]. processing ... (current pending [{}], needed [{}])",masterNode, clusterState.version(), pendingMasterNodes, neededMastersToCommit);neededMastersToCommit--;if (neededMastersToCommit == 0) {if (markAsCommitted()) {for (DiscoveryNode nodeToCommit : sendAckedBeforeCommit) {sendCommitToNode(nodeToCommit, clusterState, this);}sendAckedBeforeCommit.clear();}}decrementPendingMasterAcksAndChangeForFailure();}
  • commit阶段

接收到了足够的响应后开始执行commit逻辑

private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final SendingController sendingController) {try {logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]",clusterState.stateUUID(), clusterState.version(), node);transportService.sendRequest(node, COMMIT_ACTION_NAME,new CommitClusterStateRequest(clusterState.stateUUID()),stateRequestOptions,new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {@Overridepublic void handleResponse(TransportResponse.Empty response) {if (sendingController.getPublishingTimedOut()) {logger.debug("node {} responded to cluster state commit [{}]", node, clusterState.version());}sendingController.getPublishResponseHandler().onResponse(node);}@Overridepublic void handleException(TransportException exp) {logger.debug(() -> new ParameterizedMessage("failed to commit cluster state (uuid [{}], version [{}]) to {}",clusterState.stateUUID(), clusterState.version(), node), exp);sendingController.getPublishResponseHandler().onFailure(node, exp);}});} catch (Exception t) {logger.warn(() -> new ParameterizedMessage("error sending cluster state commit (uuid [{}], version [{}]) to {}",clusterState.stateUUID(), clusterState.version(), node), t);sendingController.getPublishResponseHandler().onFailure(node, t);}}

同样通过transportService发生RPC请求,内部请求的url为internal:discovery/zen/publish/commit

接收数据的节点注册的处理器为CommitClusterStateRequestHandler

//提交处理transportService.registerRequestHandler(COMMIT_ACTION_NAME, ThreadPool.Names.SAME, false, false, CommitClusterStateRequest::new,new CommitClusterStateRequestHandler());//提交集群状态处理private class CommitClusterStateRequestHandler implements TransportRequestHandler<CommitClusterStateRequest> {@Overridepublic void messageReceived(CommitClusterStateRequest request, final TransportChannel channel, Task task) throws Exception {handleCommitRequest(request, channel);}}

节点应用集群状态

@Overridepublic void onClusterStateCommitted(String stateUUID, ActionListener<Void> processedListener) {//更新提交新状态final ClusterState state = pendingStatesQueue.markAsCommitted(stateUUID,new PendingClusterStatesQueue.StateProcessedListener() {@Overridepublic void onNewClusterStateProcessed() {processedListener.onResponse(null);}@Overridepublic void onNewClusterStateFailed(Exception e) {processedListener.onFailure(e);}});if (state != null) {synchronized (stateMutex) {//应用新的集群状态processNextCommittedClusterState("master " + state.nodes().getMasterNode() +" committed version [" + state.version() + "]");}}}//集群应用新的集群状态clusterApplier.onNewClusterState("apply cluster state (from master [" + reason + "])",this::clusterState,new ClusterApplyListener() {@Overridepublic void onSuccess(String source) {try {pendingStatesQueue.markAsProcessed(newClusterState);} catch (Exception e) {onFailure(source, e);}}@Overridepublic void onFailure(String source, Exception e) {logger.error(() -> new ParameterizedMessage("unexpected failure applying [{}]", reason), e);try {// TODO: use cluster state uuid instead of full cluster state so that we don't keep reference to CS around// for too long.pendingStatesQueue.markAsFailed(newClusterState, e);} catch (Exception inner) {inner.addSuppressed(e);logger.error(() -> new ParameterizedMessage("unexpected exception while failing [{}]", reason), inner);}}});

最终调用到ClusterApplierService的runTask方法

private void runTask(UpdateTask task) {if (!lifecycle.started()) {logger.debug("processing [{}]: ignoring, cluster applier service not started", task.source);return;}logger.debug("processing [{}]: execute", task.source);//获取之前的集群状态final ClusterState previousClusterState = state.get();//任务执行起始时间long startTimeMS = currentTimeInMillis();//简单的秒表,允许对许多任务进行计时final StopWatch stopWatch = new StopWatch();final ClusterState newClusterState;try {try (Releasable ignored = stopWatch.timing("running task [" + task.source + ']')) {newClusterState = task.apply(previousClusterState);}} catch (Exception e) {TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));logger.trace(() -> new ParameterizedMessage("failed to execute cluster state applier in [{}], state:\nversion [{}], source [{}]\n{}",executionTime, previousClusterState.version(), task.source, previousClusterState), e);warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);task.listener.onFailure(task.source, e);return;}if (previousClusterState == newClusterState) {TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);task.listener.onSuccess(task.source);} else {if (logger.isTraceEnabled()) {logger.debug("cluster state updated, version [{}], source [{}]\n{}", newClusterState.version(), task.source,newClusterState);} else {logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), task.source);}try {//执行状态更新applyChanges(task, previousClusterState, newClusterState, stopWatch);TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));logger.debug("processing [{}]: took [{}] done applying updated cluster state (version: {}, uuid: {})", task.source,executionTime, newClusterState.version(),newClusterState.stateUUID());warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);task.listener.onSuccess(task.source);} catch (Exception e) {TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));assert applicationMayFail();task.listener.onFailure(task.source, e);}}}

遍历所有状态应用者,调用集群状态的应用者的应用集群状态方法

//发送集群状态应用者
callClusterStateAppliers(clusterChangedEvent, stopWatch);
private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {clusterStateAppliers.forEach(applier -> {logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());try (Releasable ignored = stopWatch.timing("running applier [" + applier + "]")) {applier.applyClusterState(clusterChangedEvent);}});}

遍历所有集群状态监听器,调用集群状态变更回调函数

//发送集群状态监听器
callClusterStateListeners(clusterChangedEvent, stopWatch);
//执行集群状态变更后的回调private void callClusterStateListeners(ClusterChangedEvent clusterChangedEvent, StopWatch stopWatch) {Stream.concat(clusterStateListeners.stream(), timeoutClusterStateListeners.stream()).forEach(listener -> {try {logger.trace("calling [{}] with change to version [{}]", listener, clusterChangedEvent.state().version());try (Releasable ignored = stopWatch.timing("notifying listener [" + listener + "]")) {listener.clusterChanged(clusterChangedEvent);}} catch (Exception ex) {logger.warn("failed to notify ClusterStateListener", ex);}});}

回到主节点执行回调函数handleResponse和handleException两个回调函数执行相同的处理逻辑,将latch减一,如果有的节点执行失败也不会执行修复逻辑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/375387.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python作业二

# 二进制转化为十进制 num input("num:")def binaryToDecimal(binaryString):he 0length len(binaryString)for i in range(length):he int(binaryString[i]) * 2 ** (length - i - 1)return heprint(binaryToDecimal(num))代码运行如下&#xff1a; import math…

ADC 性能规格-静态性能- (2) - 偏移误差( offset error)和满标度增益误差(full scale gain error)

偏移误差(Offset error) 失调(Offset) 定义:失调是指ADC输出数字代码中零位与实际模拟输入零位之间的差异。简单来说,就是当输入信号为零时,ADC输出的数字代码并不一定是零,这个偏差就是失调。影响:失调会影响ADC的整体精度,因为它在整个输入范围内引入了一个固定的偏…

攻防世界 Web_python_template_injection(flask模版注入)

学习文章&#xff1a;https://www.freebuf.com/column/187845.html https://blog.csdn.net/weixin_54515836/article/details/113778233 flask的渲染方法有render_template和render_template_string两种。 render_template()是用来渲染一个指定的文件的。使用如下 return re…

一文读懂DNS和CDN

一.什么是DNS DNS&#xff08;Domain Name System&#xff09;全称为域名系统&#xff0c;是一个将域名和IP地址相互映射的分布式服务&#xff0c;他的作用就是把一个域名解析成为IP地址。我们平时输入的网址&#xff08;或者域名&#xff09;不能被计算机直接识别&#xff0c;…

操作系统——内存管理(面试准备)

虚拟内存 单片机没有操作系统&#xff0c;每次写完代码&#xff0c;都需要借助工具把程序烧录进去&#xff0c;这样程序才能跑起来。 另外&#xff0c;单片机的CPU是直接操作内存的物理地址。 在这种情况下&#xff0c;想在内存中同时运行两个程序是不可能的&#xff0c;如果第…

Python数据分析案例49——基于机器学习的垃圾邮件分类系统构建(朴素贝叶斯,支持向量机)

案例背景 trec06c是非常经典的邮件分类的数据&#xff0c;还是难能可贵的中文数据集。 这个数据集从一堆txt压缩包里面提取出来整理为excel文件还真不容不易&#xff0c;肯定要做一下文本分类。 虽然现在文本分类基本都是深度学习了&#xff0c;但是传统的机器学习也能做。本案…

【论文速读】《面向深度学习的联合消息传递与自编码器》,无线AI的挑战和解决思路

这篇文章来自华为的渥太华无线先进系统能力中心和无线技术实验室&#xff0c;作者中有大名鼎鼎的童文。 一、自编码架构的全局收发机面临的主要问题 文章对我比较有启发的地方&#xff0c;是提到自编码架构的全局收发机面临的主要问题&#xff1a; 问题一&#xff1a;基于随…

【算法笔记自学】第 9 章 提高篇(3)——数据结构专题(2)

9.1树与二叉树 #include <cstdio>int main() {int n, m;scanf("%d%d", &n, &m);printf(n m 1 ? "Yes" : "No");return 0; } 9.2二叉树的遍历 #include <cstdio> #include <vector> using namespace std;const int…

高精度定位与AI技术的深度融合——未来智慧世界的钥匙

引言在当今迅速发展的科技时代&#xff0c;精确定位和人工智能&#xff08;AI&#xff09;技术正在快速推动各领域的创新与变革。高精度定位结合AI技术所产生的融合效应&#xff0c;正在加速智慧城市、智能驾驶、智能物流以及许多其他领域的实现。这篇文章将详细探讨高精度定位…

科技云报道:产业为根大模型应用为擎,容联云推动企业营销服场景重塑

科技云报道原创。 “没有应用&#xff0c;光有一个基础模型&#xff0c;不管是开源还是闭源&#xff0c;一文不值。”在2024世界人工智能大会&#xff08;WAIC 2024&#xff09;现场&#xff0c;百度创始人、董事长兼首席执行官李彦宏直言。 国产大模型的种类越发丰富&#x…

【爬虫】解析爬取的数据

目录 一、正则表达式1、常用元字符2、量词3、Re模块4、爬取豆瓣电影 二、Xpath1、Xpath解析Ⅰ、节点选择Ⅱ、路径表达式Ⅲ、常用函数 2、爬取豆瓣电影 解析数据&#xff0c;除了前面的BeautifulSoup库&#xff0c;还有正则表达式和Xpath两种方法。 一、正则表达式 正则表达式…

RK3588开发笔记(四):基于定制的RK3588一体主板升级镜像

若该文为原创文章&#xff0c;转载请注明原文出处 本文章博客地址&#xff1a;https://hpzwl.blog.csdn.net/article/details/140288662 长沙红胖子Qt&#xff08;长沙创微智科&#xff09;博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、OpenCV…

Java---SpringBoot详解一

人性本善亦本恶&#xff0c; 喜怒哀乐显真情。 寒冬暖夏皆有道&#xff0c; 善恶终归一念间。 善念慈悲天下广&#xff0c; 恶行自缚梦难安。 人心如镜自省照&#xff0c; 善恶分明照乾坤。 目录 一&#xff0c;入门程序 ①&#xff0c;创建springboot工程&#…

Apache配置与应用(优化apache)

Apache配置解析&#xff08;配置优化&#xff09; Apache链接保持 KeepAlive&#xff1a;决定是否打开连接保持功能&#xff0c;后面接 OFF 表示关闭&#xff0c;接 ON 表示打开 KeepAliveTimeout&#xff1a;表示一次连接多次请求之间的最大间隔时间&#xff0c;即两次请求之间…

秋招Java后端开发冲刺——Mybatis使用总结

一、基本知识 1. 介绍 MyBatis 是 Apache 的一个开源项目&#xff0c;它封装了 JDBC&#xff0c;使开发者只需要关注 SQL 语句本身&#xff0c;而不需要再进行繁琐的 JDBC 编码。MyBatis 可以使用简单的 XML 或注解来配置和映射原生类型、接口和 Java POJO&#xff08;Plain …

【网络安全科普】网络安全指南请查收

随着社会信息化深入发展&#xff0c;互联网对人类文明进步奖发挥更大的促进作用。但与此同时&#xff0c;互联网领域的问题也日益凸显。网络犯罪、网络监听、网络攻击等是又发生&#xff0c;网络安全与每个人都息息相关&#xff0c;下面&#xff0c;一起来了解网络安全知识吧。…

开放式耳机哪款性价比高?这五款超值精品不容错过

喜欢进行户外运动的小伙伴们&#xff0c;应该都很需要一款既可以匹配运动场景&#xff0c;又兼顾音质体验的无线蓝牙耳机吧。而开放式耳机拥有佩戴舒适牢固&#xff0c;不堵塞耳部&#xff0c;不影响外部声音传入耳部的优点&#xff0c;完全可以成为运动健身人士户外运动的好伴…

『C + ⒈』‘\‘

&#x1f942;在反斜杠(\)有⒉种最常用的功能如下所示&#x1f44b; #define _CRT_SECURE_NO_WARNINGS 1 #include<stdio.h> int main(void) {int a 10;int b 20;int c 30;if (a 10 &&\b 20 &&\c 30){printf("Your print\n");}else{prin…

Java 多继承与接口

Java 多继承与接口 1、为什么Java不支持多继承&#xff1f;2、使用接口实现多继承2.1 接口的定义与实现 3、接口的优点4、结论 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 多继承是指一个类可以继承多个父类&#xff0c;从而获得多个父类…

Spring Boot Vue 毕设系统讲解 3

目录 项目配置类 项目中配置的相关代码 spring Boot 拦截器相关知识 一、基于URL实现的拦截器&#xff1a; 二、基于注解的拦截器 三、把拦截器添加到配置中&#xff0c;相当于SpringMVC时的配置文件干的事儿&#xff1a; 项目配置类 项目中配置的相关代码 首先定义项目认…