【Elasticsearch源码】 分片恢复分析

带着疑问学源码,第七篇:Elasticsearch 分片恢复分析
代码分析基于:https://github.com/jiankunking/elasticsearch
Elasticsearch 8.0.0-SNAPSHOT

目的

在看源码之前先梳理一下,自己对于分片恢复的疑问点:

  • 网上对于ElasticSearch分片恢复的逻辑说法一抓一把,网上说的对不对?新版本中有没有更新?
  • 在分片恢复的时候,如果收到Api _forcemerge请求,这时候,会如何处理?(因为副本恢复的第一节点是复制segment文件)
    • 这部分等看/_forcemerge api的时候,再解答一下。
  • 分片恢复的第二阶段是同步translog,这一步会不会加锁?不加锁的话,如何确保是同步完成了?

如果说看源码有捷径的话,那么找到网上一篇写的比较权威的源码分析文章跟着看,那不失为一种好方法。
下面源码分析部分将参考腾讯云的:Elasticsearch 底层系列之分片恢复解析,一边参考,一边印证。

源码分析

目标节点请求恢复

先找到分片恢复的入口:IndicesClusterStateService.createOrUpdateShards

在这里会判断本地节点是否在routingNodes中,如果在,说明本地节点有分片创建或更新的需求,否则跳过。

private void createOrUpdateShards(final ClusterState state) {// 节点到索引分片的映射关系,主要用于分片分配、均衡决策// 具体的内容可以看下:https://jiankunking.com/elasticsearch-cluster-state.htmlRoutingNode localRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId());if (localRoutingNode == null) {return;}DiscoveryNodes nodes = state.nodes();RoutingTable routingTable = state.routingTable();for (final ShardRouting shardRouting : localRoutingNode) {ShardId shardId = shardRouting.shardId();// failedShardsCache:https://github.com/jiankunking/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java#L116// 恢复过程中失败的碎片列表;我们跟踪这些碎片,以防止在每次集群状态更新时重复恢复这些碎片if (failedShardsCache.containsKey(shardId) == false) {AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardId.getIndex());assert indexService != null : "index " + shardId.getIndex() + " should have been created by createIndices";Shard shard = indexService.getShardOrNull(shardId.id());if (shard == null) { // shard不存在则需创建assert shardRouting.initializing() : shardRouting + " should have been removed by failMissingShards";createShard(nodes, routingTable, shardRouting, state);} else { // 存在则更新updateShard(nodes, shardRouting, shard, routingTable, state);}}}}

副本分片恢复走的是createShard分支,在该方法中,首先获取shardRouting的类型,如果恢复类型为PEER,说明该分片需要从远端获取,则需要找到源节点,然后调用IndicesService.createShard:

RecoverySource的Type有以下几种:

EMPTY_STORE,
EXISTING_STORE,//主分片本地恢复
PEER,//副分片从远处主分片恢复
SNAPSHOT,//从快照恢复
LOCAL_SHARDS//从本节点其它分片恢复(shrink时)

createShard代码如下:

private void createShard(DiscoveryNodes nodes, RoutingTable routingTable, ShardRouting shardRouting, ClusterState state) {assert shardRouting.initializing() : "only allow shard creation for initializing shard but was " + shardRouting;DiscoveryNode sourceNode = null;// 如果恢复方式是peer,则会找到shard所在的源节点进行恢复if (shardRouting.recoverySource().getType() == Type.PEER)  {sourceNode = findSourceNodeForPeerRecovery(logger, routingTable, nodes, shardRouting);if (sourceNode == null) {logger.trace("ignoring initializing shard {} - no source node can be found.", shardRouting.shardId());return;}}try {final long primaryTerm = state.metadata().index(shardRouting.index()).primaryTerm(shardRouting.id());logger.debug("{} creating shard with primary term [{}]", shardRouting.shardId(), primaryTerm);indicesService.createShard(shardRouting,recoveryTargetService,new RecoveryListener(shardRouting, primaryTerm),repositoriesService,failedShardHandler,this::updateGlobalCheckpointForShard,retentionLeaseSyncer,nodes.getLocalNode(),sourceNode);} catch (Exception e) {failAndRemoveShard(shardRouting, true, "failed to create shard", e, state);}}/*** Finds the routing source node for peer recovery, return null if its not found. Note, this method expects the shard* routing to *require* peer recovery, use {@link ShardRouting#recoverySource()} to check if its needed or not.*/private static DiscoveryNode findSourceNodeForPeerRecovery(Logger logger, RoutingTable routingTable, DiscoveryNodes nodes,ShardRouting shardRouting) {DiscoveryNode sourceNode = null;if (!shardRouting.primary()) {ShardRouting primary = routingTable.shardRoutingTable(shardRouting.shardId()).primaryShard();// only recover from started primary, if we can't find one, we will do it next roundif (primary.active()) {// 找到primary shard所在节点sourceNode = nodes.get(primary.currentNodeId());if (sourceNode == null) {logger.trace("can't find replica source node because primary shard {} is assigned to an unknown node.", primary);}} else {logger.trace("can't find replica source node because primary shard {} is not active.", primary);}} else if (shardRouting.relocatingNodeId() != null) {// 找到搬迁的源节点sourceNode = nodes.get(shardRouting.relocatingNodeId());if (sourceNode == null) {logger.trace("can't find relocation source node for shard {} because it is assigned to an unknown node [{}].",shardRouting.shardId(), shardRouting.relocatingNodeId());}} else {throw new IllegalStateException("trying to find source node for peer recovery when routing state means no peer recovery: " +shardRouting);}return sourceNode;}

源节点的确定分两种情况,如果当前shard本身不是primary shard,则源节点为primary shard所在节点,否则,如果当前shard正在搬迁中(从其他节点搬迁到本节点),则源节点为数据搬迁的源头节点。得到源节点后调用IndicesService.createShard,在该方法中调用方法IndexShard.startRecovery开始恢复。

public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,Consumer<MappingMetadata> mappingUpdateConsumer,IndicesService indicesService) {// TODO: Create a proper object to encapsulate the recovery context// all of the current methods here follow a pattern of:// resolve context which isn't really dependent on the local shards and then async// call some external method with this pointer.// with a proper recovery context object we can simply change this to:// startRecovery(RecoveryState recoveryState, ShardRecoverySource source ) {//     markAsRecovery("from " + source.getShortDescription(), recoveryState);//     threadPool.generic().execute()  {//           onFailure () { listener.failure() };//           doRun() {//                if (source.recover(this)) {//                  recoveryListener.onRecoveryDone(recoveryState);//                }//           }//     }}// }assert recoveryState.getRecoverySource().equals(shardRouting.recoverySource());switch (recoveryState.getRecoverySource().getType()) {case EMPTY_STORE:case EXISTING_STORE:executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);break;case PEER:try {markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);} catch (Exception e) {failShard("corrupted preexisting index", e);recoveryListener.onRecoveryFailure(recoveryState,new RecoveryFailedException(recoveryState, null, e), true);}break;case SNAPSHOT:final String repo = ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot().getRepository();executeRecovery("from snapshot",recoveryState, recoveryListener, l -> restoreFromRepository(repositoriesService.repository(repo), l));break;case LOCAL_SHARDS:final IndexMetadata indexMetadata = indexSettings().getIndexMetadata();final Index resizeSourceIndex = indexMetadata.getResizeSourceIndex();final List<IndexShard> startedShards = new ArrayList<>();final IndexService sourceIndexService = indicesService.indexService(resizeSourceIndex);final Set<ShardId> requiredShards;final int numShards;if (sourceIndexService != null) {requiredShards = IndexMetadata.selectRecoverFromShards(shardId().id(),sourceIndexService.getMetadata(), indexMetadata.getNumberOfShards());for (IndexShard shard : sourceIndexService) {if (shard.state() == IndexShardState.STARTED && requiredShards.contains(shard.shardId())) {startedShards.add(shard);}}numShards = requiredShards.size();} else {numShards = -1;requiredShards = Collections.emptySet();}if (numShards == startedShards.size()) {assert requiredShards.isEmpty() == false;executeRecovery("from local shards", recoveryState, recoveryListener,l -> recoverFromLocalShards(mappingUpdateConsumer,startedShards.stream().filter((s) -> requiredShards.contains(s.shardId())).collect(Collectors.toList()), l));} else {final RuntimeException e;if (numShards == -1) {e = new IndexNotFoundException(resizeSourceIndex);} else {e = new IllegalStateException("not all required shards of index " + resizeSourceIndex+ " are started yet, expected " + numShards + " found " + startedShards.size() + " can't recover shard "+ shardId());}throw e;}break;default:throw new IllegalArgumentException("Unknown recovery source " + recoveryState.getRecoverySource());}}

对于恢复类型为PEER的任务,恢复动作的真正执行者为PeerRecoveryTargetService.doRecovery。在该方法中,首先调用getStartRecoveryRequest获取shard的metadataSnapshot,该结构中包含shard的段信息,如syncid、checksum、doc数等,然后封装为StartRecoveryRequest,通过RPC发送到源节点:

private void doRecovery(final long recoveryId, final StartRecoveryRequest preExistingRequest) {final String actionName;final TransportRequest requestToSend;final StartRecoveryRequest startRequest;final RecoveryState.Timer timer;try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) {if (recoveryRef == null) {logger.trace("not running recovery with id [{}] - can not find it (probably finished)", recoveryId);return;}final RecoveryTarget recoveryTarget = recoveryRef.target();timer = recoveryTarget.state().getTimer();if (preExistingRequest == null) {try {final IndexShard indexShard = recoveryTarget.indexShard();indexShard.preRecovery();assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());indexShard.prepareForIndexRecovery();final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint();assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG :"unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";// 构造recovery request startRequest = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);requestToSend = startRequest;actionName = PeerRecoverySourceService.Actions.START_RECOVERY;} catch (final Exception e) {// this will be logged as warning later on...logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e);onGoingRecoveries.failRecovery(recoveryId,new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true);return;}logger.trace("{} starting recovery from {}", startRequest.shardId(), startRequest.sourceNode());} else {startRequest = preExistingRequest;requestToSend = new ReestablishRecoveryRequest(recoveryId, startRequest.shardId(), startRequest.targetAllocationId());actionName = PeerRecoverySourceService.Actions.REESTABLISH_RECOVERY;logger.trace("{} reestablishing recovery from {}", startRequest.shardId(), startRequest.sourceNode());}}// 向源节点发送请求,请求恢复transportService.sendRequest(startRequest.sourceNode(), actionName, requestToSend,new RecoveryResponseHandler(startRequest, timer));}/*** Prepare the start recovery request.** @param logger         the logger* @param localNode      the local node of the recovery target* @param recoveryTarget the target of the recovery* @param startingSeqNo  a sequence number that an operation-based peer recovery can start with.*                       This is the first operation after the local checkpoint of the safe commit if exists.* @return a start recovery request*/public static StartRecoveryRequest getStartRecoveryRequest(Logger logger, DiscoveryNode localNode,RecoveryTarget recoveryTarget, long startingSeqNo) {final StartRecoveryRequest request;logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());Store.MetadataSnapshot metadataSnapshot;try {metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();// Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene index.try {final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY);final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID);assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint;} catch (IOException | TranslogCorruptedException e) {logger.warn(new ParameterizedMessage("error while reading global checkpoint from translog, " +"resetting the starting sequence number from {} to unassigned and recovering as if there are none", startingSeqNo), e);metadataSnapshot = Store.MetadataSnapshot.EMPTY;startingSeqNo = UNASSIGNED_SEQ_NO;}} catch (final org.apache.lucene.index.IndexNotFoundException e) {// happens on an empty folder. no need to logassert startingSeqNo == UNASSIGNED_SEQ_NO : startingSeqNo;logger.trace("{} shard folder empty, recovering all files", recoveryTarget);metadataSnapshot = Store.MetadataSnapshot.EMPTY;} catch (final IOException e) {if (startingSeqNo != UNASSIGNED_SEQ_NO) {logger.warn(new ParameterizedMessage("error while listing local files, resetting the starting sequence number from {} " +"to unassigned and recovering as if there are none", startingSeqNo), e);startingSeqNo = UNASSIGNED_SEQ_NO;} else {logger.warn("error while listing local files, recovering as if there are none", e);}metadataSnapshot = Store.MetadataSnapshot.EMPTY;}logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());request = new StartRecoveryRequest(recoveryTarget.shardId(),recoveryTarget.indexShard().routingEntry().allocationId().getId(),recoveryTarget.sourceNode(),localNode,metadataSnapshot,recoveryTarget.state().getPrimary(),recoveryTarget.recoveryId(),startingSeqNo);return request;}

注意,请求的发送是异步的。

源节点处理恢复请求

源节点接收到请求后会调用恢复的入口函数PeerRecoverySourceService.messageReceived#recover:

class StartRecoveryTransportRequestHandler implements TransportRequestHandler<StartRecoveryRequest> {@Overridepublic void messageReceived(final StartRecoveryRequest request, final TransportChannel channel, Task task) throws Exception {recover(request, new ChannelActionListener<>(channel, Actions.START_RECOVERY, request));}}

recover方法根据request得到shard并构造RecoverySourceHandler对象,然后调用handler.recoverToTarget进入恢复的执行体:

private void recover(StartRecoveryRequest request, ActionListener<RecoveryResponse> listener) {final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());final IndexShard shard = indexService.getShard(request.shardId().id());final ShardRouting routingEntry = shard.routingEntry();if (routingEntry.primary() == false || routingEntry.active() == false) {throw new DelayRecoveryException("source shard [" + routingEntry + "] is not an active primary");}if (request.isPrimaryRelocation() && (routingEntry.relocating() == false ||routingEntry.relocatingNodeId().equals(request.targetNode().getId()) == false)) {logger.debug("delaying recovery of {} as source shard is not marked yet as relocating to {}",request.shardId(), request.targetNode());throw new DelayRecoveryException("source shard is not marked yet as relocating to [" + request.targetNode() + "]");}RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, shard);logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(),request.targetNode());handler.recoverToTarget(ActionListener.runAfter(listener, () -> ongoingRecoveries.remove(shard, handler)));}/*** performs the recovery from the local engine to the target*/public void recoverToTarget(ActionListener<RecoveryResponse> listener) {addListener(listener);final Closeable releaseResources = () -> IOUtils.close(resources);try {cancellableThreads.setOnCancel((reason, beforeCancelEx) -> {final RuntimeException e;if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on use = new IndexShardClosedException(shard.shardId(), "shard is closed and recovery was canceled reason [" + reason + "]");} else {e = new CancellableThreads.ExecutionCancelledException("recovery was canceled reason [" + reason + "]");}if (beforeCancelEx != null) {e.addSuppressed(beforeCancelEx);}IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));throw e;});final Consumer<Exception> onFailure = e -> {assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[onFailure]");IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));};final SetOnce<RetentionLease> retentionLeaseRef = new SetOnce<>();runUnderPrimaryPermit(() -> {final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId());if (targetShardRouting == null) {logger.debug("delaying recovery of {} as it is not listed as assigned to target node {}", request.shardId(),request.targetNode());throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");}assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;retentionLeaseRef.set(shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)));}, shardId + " validating recovery target ["+ request.targetAllocationId() + "] registered ",shard, cancellableThreads, logger);// 获取一个保留锁,使得translog不被清理final Closeable retentionLock = shard.acquireHistoryRetentionLock();resources.add(retentionLock);final long startingSeqNo;// 判断是否可以从SequenceNumber恢复// 除了异常检测和版本号检测,主要在shard.hasCompleteHistoryOperations()方法中判断请求的序列号是否小于主分片节点的localCheckpoint,// 以及translog中的数据是否足以恢复(有可能因为translog数据太大或者过期删除而无法恢复)final boolean isSequenceNumberBasedRecovery= request.startingSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO&& isTargetSameHistory()&& shard.hasCompleteHistoryOperations("peer-recovery", request.startingSeqNo())&& ((retentionLeaseRef.get() == null && shard.useRetentionLeasesInPeerRecovery() == false) ||(retentionLeaseRef.get() != null && retentionLeaseRef.get().retainingSequenceNumber() <= request.startingSeqNo()));// NB check hasCompleteHistoryOperations when computing isSequenceNumberBasedRecovery, even if there is a retention lease,// because when doing a rolling upgrade from earlier than 7.4 we may create some leases that are initially unsatisfied. It's// possible there are other cases where we cannot satisfy all leases, because that's not a property we currently expect to hold.// Also it's pretty cheap when soft deletes are enabled, and it'd be a disaster if we tried a sequence-number-based recovery// without having a complete history.if (isSequenceNumberBasedRecovery && retentionLeaseRef.get() != null) {// all the history we need is retained by an existing retention lease, so we do not need a separate retention lockretentionLock.close();logger.trace("history is retained by {}", retentionLeaseRef.get());} else {// all the history we need is retained by the retention lock, obtained before calling shard.hasCompleteHistoryOperations()// and before acquiring the safe commit we'll be using, so we can be certain that all operations after the safe commit's// local checkpoint will be retained for the duration of this recovery.logger.trace("history is retained by retention lock");}final StepListener<SendFileResult> sendFileStep = new StepListener<>();final StepListener<TimeValue> prepareEngineStep = new StepListener<>();final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();final StepListener<Void> finalizeStep = new StepListener<>();// 若可以基于序列号进行恢复,则获取开始的序列号if (isSequenceNumberBasedRecovery) {// 如果基于SequenceNumber恢复,则startingSeqNo取值为恢复请求中的序列号,// 从请求的序列号开始快照translog。否则取值为0,快照完整的translog。logger.trace("performing sequence numbers based recovery. starting at [{}]", request.startingSeqNo());// 获取开始序列号startingSeqNo = request.startingSeqNo();if (retentionLeaseRef.get() == null) {createRetentionLease(startingSeqNo, sendFileStep.map(ignored -> SendFileResult.EMPTY));} else {// 发送的文件设置为空sendFileStep.onResponse(SendFileResult.EMPTY);}} else {final Engine.IndexCommitRef safeCommitRef;try {// Releasing a safe commit can access some commit files.safeCommitRef = acquireSafeCommit(shard);resources.add(safeCommitRef);} catch (final Exception e) {throw new RecoveryEngineException(shard.shardId(), 1, "snapshot failed", e);}// Try and copy enough operations to the recovering peer so that if it is promoted to primary then it has a chance of being// able to recover other replicas using operations-based recoveries. If we are not using retention leases then we// conservatively copy all available operations. If we are using retention leases then "enough operations" is just the// operations from the local checkpoint of the safe commit onwards, because when using soft deletes the safe commit retains// at least as much history as anything else. The safe commit will often contain all the history retained by the current set// of retention leases, but this is not guaranteed: an earlier peer recovery from a different primary might have created a// retention lease for some history that this primary already discarded, since we discard history when the global checkpoint// advances and not when creating a new safe commit. In any case this is a best-effort thing since future recoveries can// always fall back to file-based ones, and only really presents a problem if this primary fails before things have settled// down.startingSeqNo = Long.parseLong(safeCommitRef.getIndexCommit().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) + 1L;logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo);try {final int estimateNumOps = estimateNumberOfHistoryOperations(startingSeqNo);final Releasable releaseStore = acquireStore(shard.store());resources.add(releaseStore);sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> {try {IOUtils.close(safeCommitRef, releaseStore);} catch (final IOException ex) {logger.warn("releasing snapshot caused exception", ex);}});final StepListener<ReplicationResponse> deleteRetentionLeaseStep = new StepListener<>();runUnderPrimaryPermit(() -> {try {// If the target previously had a copy of this shard then a file-based recovery might move its global// checkpoint backwards. We must therefore remove any existing retention lease so that we can create a// new one later on in the recovery.shard.removePeerRecoveryRetentionLease(request.targetNode().getId(),new ThreadedActionListener<>(logger, shard.getThreadPool(), ThreadPool.Names.GENERIC,deleteRetentionLeaseStep, false));} catch (RetentionLeaseNotFoundException e) {logger.debug("no peer-recovery retention lease for " + request.targetAllocationId());deleteRetentionLeaseStep.onResponse(null);}}, shardId + " removing retention lease for [" + request.targetAllocationId() + "]",shard, cancellableThreads, logger);// 第一阶段deleteRetentionLeaseStep.whenComplete(ignored -> {assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase1]");phase1(safeCommitRef.getIndexCommit(), startingSeqNo, () -> estimateNumOps, sendFileStep);}, onFailure);} catch (final Exception e) {throw new RecoveryEngineException(shard.shardId(), 1, "sendFileStep failed", e);}}assert startingSeqNo >= 0 : "startingSeqNo must be non negative. got: " + startingSeqNo;sendFileStep.whenComplete(r -> {assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]");// 等待phase1执行完毕,主分片节点通知副分片节点启动此分片的Engine:prepareTargetForTranslog          // 该方法会阻塞处理,直到分片 Engine 启动完毕。// 待副分片启动Engine 完毕,就可以正常接收写请求了。// 注意,此时phase2尚未开始,此分片的恢复流程尚未结束。// 等待当前操作处理完成后,以startingSeqNo为起始点,对translog做快照,开始执行phase2:// For a sequence based recovery, the target can keep its local translogprepareTargetForTranslog(estimateNumberOfHistoryOperations(startingSeqNo), prepareEngineStep);}, onFailure);prepareEngineStep.whenComplete(prepareEngineTime -> {assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[phase2]");/** add shard to replication group (shard will receive replication requests from this point on) now that engine is open.* This means that any document indexed into the primary after this will be replicated to this replica as well* make sure to do this before sampling the max sequence number in the next step, to ensure that we send* all documents up to maxSeqNo in phase2.*/runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();logger.trace("snapshot for recovery; current size is [{}]", estimateNumberOfHistoryOperations(startingSeqNo));final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot("peer-recovery", startingSeqNo, Long.MAX_VALUE, false);resources.add(phase2Snapshot);retentionLock.close();// we have to capture the max_seen_auto_id_timestamp and the max_seq_no_of_updates to make sure that these values// are at least as high as the corresponding values on the primary when any of these operations were executed on it.final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();final RetentionLeases retentionLeases = shard.getRetentionLeases();final long mappingVersionOnPrimary = shard.indexSettings().getIndexMetadata().getMappingVersion();// 第二阶段,发送translogphase2(startingSeqNo, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,retentionLeases, mappingVersionOnPrimary, sendSnapshotStep);}, onFailure);// Recovery target can trim all operations >= startingSeqNo as we have sent all these operations in the phase 2final long trimAboveSeqNo = startingSeqNo - 1;sendSnapshotStep.whenComplete(r -> finalizeRecovery(r.targetLocalCheckpoint, trimAboveSeqNo, finalizeStep), onFailure);finalizeStep.whenComplete(r -> {final long phase1ThrottlingWaitTime = 0L; // TODO: return the actual throttle timefinal SendSnapshotResult sendSnapshotResult = sendSnapshotStep.result();final SendFileResult sendFileResult = sendFileStep.result();final RecoveryResponse response = new RecoveryResponse(sendFileResult.phase1FileNames, sendFileResult.phase1FileSizes,sendFileResult.phase1ExistingFileNames, sendFileResult.phase1ExistingFileSizes, sendFileResult.totalSize,sendFileResult.existingTotalSize, sendFileResult.took.millis(), phase1ThrottlingWaitTime,prepareEngineStep.result().millis(), sendSnapshotResult.sentOperations, sendSnapshotResult.tookTime.millis());try {future.onResponse(response);} finally {IOUtils.close(resources);}}, onFailure);} catch (Exception e) {IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));}}/*** Checks if we have a completed history of operations since the given starting seqno (inclusive).* This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock()}*/public boolean hasCompleteHistoryOperations(String reason, long startingSeqNo)  {return getEngine().hasCompleteOperationHistory(reason, startingSeqNo);}

从上面的代码可以看出,恢复主要分两个阶段,第一阶段恢复segment文件,第二阶段发送translog。这里有个关键的地方,在恢复前,首先需要获取translogView及segment snapshot,translogView的作用是保证当前时间点到恢复结束时间段的translog不被删除,segment snapshot的作用是保证当前时间点之前的segment文件不被删除。接下来看看两阶段恢复的具体执行逻辑。phase1:

/*** Perform phase1 of the recovery operations. Once this {@link IndexCommit}* snapshot has been performed no commit operations (files being fsync'd)* are effectively allowed on this index until all recovery phases are done* <p>* Phase1 examines the segment files on the target node and copies over the* segments that are missing. Only segments that have the same size and* checksum can be reused*/void phase1(IndexCommit snapshot, long startingSeqNo, IntSupplier translogOps, ActionListener<SendFileResult> listener) {cancellableThreads.checkForCancel();//拿到shard的存储信息final Store store = shard.store();try {StopWatch stopWatch = new StopWatch().start();final Store.MetadataSnapshot recoverySourceMetadata;try {// 拿到snapshot的metadatarecoverySourceMetadata = store.getMetadata(snapshot);} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {shard.failShard("recovery", ex);throw ex;}for (String name : snapshot.getFileNames()) {final StoreFileMetadata md = recoverySourceMetadata.get(name);if (md == null) {logger.info("Snapshot differs from actual index for file: {} meta: {}", name, recoverySourceMetadata.asMap());throw new CorruptIndexException("Snapshot differs from actual index - maybe index was removed metadata has " +recoverySourceMetadata.asMap().size() + " files", name);}}// 如果syncid相等,再继续比较下文档数,如果都相同则不用恢复if (canSkipPhase1(recoverySourceMetadata, request.metadataSnapshot()) == false) {final List<String> phase1FileNames = new ArrayList<>();final List<Long> phase1FileSizes = new ArrayList<>();final List<String> phase1ExistingFileNames = new ArrayList<>();final List<Long> phase1ExistingFileSizes = new ArrayList<>();// Total size of segment files that are recoveredlong totalSizeInBytes = 0;// Total size of segment files that were able to be re-usedlong existingTotalSizeInBytes = 0;// Generate a "diff" of all the identical, different, and missing// segment files on the target node, using the existing files on// the source node// 找出target和source有差别的segment// https://github.com/jiankunking/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/index/store/Store.java#L971final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot());for (StoreFileMetadata md : diff.identical) {phase1ExistingFileNames.add(md.name());phase1ExistingFileSizes.add(md.length());existingTotalSizeInBytes += md.length();if (logger.isTraceEnabled()) {logger.trace("recovery [phase1]: not recovering [{}], exist in local store and has checksum [{}]," +" size [{}]", md.name(), md.checksum(), md.length());}totalSizeInBytes += md.length();}List<StoreFileMetadata> phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size());phase1Files.addAll(diff.different);phase1Files.addAll(diff.missing);for (StoreFileMetadata md : phase1Files) {if (request.metadataSnapshot().asMap().containsKey(md.name())) {logger.trace("recovery [phase1]: recovering [{}], exists in local store, but is different: remote [{}], local [{}]",md.name(), request.metadataSnapshot().asMap().get(md.name()), md);} else {logger.trace("recovery [phase1]: recovering [{}], does not exist in remote", md.name());}phase1FileNames.add(md.name());phase1FileSizes.add(md.length());totalSizeInBytes += md.length();}logger.trace("recovery [phase1]: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]",phase1FileNames.size(), new ByteSizeValue(totalSizeInBytes),phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSizeInBytes));final StepListener<Void> sendFileInfoStep = new StepListener<>();final StepListener<Void> sendFilesStep = new StepListener<>();final StepListener<RetentionLease> createRetentionLeaseStep = new StepListener<>();final StepListener<Void> cleanFilesStep = new StepListener<>();cancellableThreads.checkForCancel();recoveryTarget.receiveFileInfo(phase1FileNames, phase1FileSizes, phase1ExistingFileNames,phase1ExistingFileSizes, translogOps.getAsInt(), sendFileInfoStep);// 将需要恢复的文件发送到target nodesendFileInfoStep.whenComplete(r ->sendFiles(store, phase1Files.toArray(new StoreFileMetadata[0]), translogOps, sendFilesStep), listener::onFailure);sendFilesStep.whenComplete(r -> createRetentionLease(startingSeqNo, createRetentionLeaseStep), listener::onFailure);createRetentionLeaseStep.whenComplete(retentionLease ->{final long lastKnownGlobalCheckpoint = shard.getLastKnownGlobalCheckpoint();assert retentionLease == null || retentionLease.retainingSequenceNumber() - 1 <= lastKnownGlobalCheckpoint: retentionLease + " vs " + lastKnownGlobalCheckpoint;// Establishes new empty translog on the replica with global checkpoint set to lastKnownGlobalCheckpoint. We want// the commit we just copied to be a safe commit on the replica, so why not set the global checkpoint on the replica// to the max seqno of this commit? Because (in rare corner cases) this commit might not be a safe commit here on// the primary, and in these cases the max seqno would be too high to be valid as a global checkpoint.cleanFiles(store, recoverySourceMetadata, translogOps, lastKnownGlobalCheckpoint, cleanFilesStep);},listener::onFailure);final long totalSize = totalSizeInBytes;final long existingTotalSize = existingTotalSizeInBytes;cleanFilesStep.whenComplete(r -> {final TimeValue took = stopWatch.totalTime();logger.trace("recovery [phase1]: took [{}]", took);listener.onResponse(new SendFileResult(phase1FileNames, phase1FileSizes, totalSize, phase1ExistingFileNames,phase1ExistingFileSizes, existingTotalSize, took));}, listener::onFailure);} else {logger.trace("skipping [phase1] since source and target have identical sync id [{}]", recoverySourceMetadata.getSyncId());// but we must still create a retention leasefinal StepListener<RetentionLease> createRetentionLeaseStep = new StepListener<>();createRetentionLease(startingSeqNo, createRetentionLeaseStep);createRetentionLeaseStep.whenComplete(retentionLease -> {final TimeValue took = stopWatch.totalTime();logger.trace("recovery [phase1]: took [{}]", took);listener.onResponse(new SendFileResult(Collections.emptyList(), Collections.emptyList(), 0L, Collections.emptyList(),Collections.emptyList(), 0L, took));}, listener::onFailure);}} catch (Exception e) {throw new RecoverFilesRecoveryException(request.shardId(), 0, new ByteSizeValue(0L), e);}}

从上面代码可以看出,phase1的具体逻辑是,首先拿到待恢复shard的metadataSnapshot从而得到recoverySourceSyncId,根据request拿到recoveryTargetSyncId,比较两边的syncid,如果相同再比较源和目标的文档数,如果也相同,说明在当前提交点之前源和目标的shard对应的segments都相同,因此不用恢复segment文件(canSkipPhase1方法中比对的)。如果两边的syncid不同,说明segment文件有差异,则需要找出所有有差异的文件进行恢复。通过比较recoverySourceMetadata和recoveryTargetSnapshot的差异性,可以找出所有有差别的segment文件。这块逻辑如下:

/*** Returns a diff between the two snapshots that can be used for recovery. The given snapshot is treated as the* recovery target and this snapshot as the source. The returned diff will hold a list of files that are:* <ul>* <li>identical: they exist in both snapshots and they can be considered the same ie. they don't need to be recovered</li>* <li>different: they exist in both snapshots but their they are not identical</li>* <li>missing: files that exist in the source but not in the target</li>* </ul>* This method groups file into per-segment files and per-commit files. A file is treated as* identical if and on if all files in it's group are identical. On a per-segment level files for a segment are treated* as identical iff:* <ul>* <li>all files in this segment have the same checksum</li>* <li>all files in this segment have the same length</li>* <li>the segments {@code .si} files hashes are byte-identical Note: This is a using a perfect hash function,* The metadata transfers the {@code .si} file content as it's hash</li>* </ul>* <p>* The {@code .si} file contains a lot of diagnostics including a timestamp etc. in the future there might be* unique segment identifiers in there hardening this method further.* <p>* The per-commit files handles very similar. A commit is composed of the {@code segments_N} files as well as generational files* like deletes ({@code _x_y.del}) or field-info ({@code _x_y.fnm}) files. On a per-commit level files for a commit are treated* as identical iff:* <ul>* <li>all files belonging to this commit have the same checksum</li>* <li>all files belonging to this commit have the same length</li>* <li>the segments file {@code segments_N} files hashes are byte-identical Note: This is a using a perfect hash function,* The metadata transfers the {@code segments_N} file content as it's hash</li>* </ul>* <p>* NOTE: this diff will not contain the {@code segments.gen} file. This file is omitted on recovery.*/public RecoveryDiff recoveryDiff(MetadataSnapshot recoveryTargetSnapshot) {final List<StoreFileMetadata> identical = new ArrayList<>();// 相同的file final List<StoreFileMetadata> different = new ArrayList<>();// 不同的filefinal List<StoreFileMetadata> missing = new ArrayList<>();// 缺失的filefinal Map<String, List<StoreFileMetadata>> perSegment = new HashMap<>();final List<StoreFileMetadata> perCommitStoreFiles = new ArrayList<>();for (StoreFileMetadata meta : this) {if (IndexFileNames.OLD_SEGMENTS_GEN.equals(meta.name())) { // legacycontinue; // we don't need that file at all}final String segmentId = IndexFileNames.parseSegmentName(meta.name());final String extension = IndexFileNames.getExtension(meta.name());if (IndexFileNames.SEGMENTS.equals(segmentId) ||DEL_FILE_EXTENSION.equals(extension) || LIV_FILE_EXTENSION.equals(extension)) {// only treat del files as per-commit files fnm files are generational but only for upgradable DVperCommitStoreFiles.add(meta);} else {perSegment.computeIfAbsent(segmentId, k -> new ArrayList<>()).add(meta);}}final ArrayList<StoreFileMetadata> identicalFiles = new ArrayList<>();for (List<StoreFileMetadata> segmentFiles : Iterables.concat(perSegment.values(), Collections.singleton(perCommitStoreFiles))) {identicalFiles.clear();boolean consistent = true;for (StoreFileMetadata meta : segmentFiles) {StoreFileMetadata storeFileMetadata = recoveryTargetSnapshot.get(meta.name());if (storeFileMetadata == null) {consistent = false;missing.add(meta);// 该segment在target node中不存在,则加入到missing} else if (storeFileMetadata.isSame(meta) == false) {consistent = false;different.add(meta);// 存在但不相同,则加入到different} else {identicalFiles.add(meta);// 存在且相同}}if (consistent) {identical.addAll(identicalFiles);} else {// make sure all files are added - this can happen if only the deletes are differentdifferent.addAll(identicalFiles);}}RecoveryDiff recoveryDiff = new RecoveryDiff(Collections.unmodifiableList(identical),Collections.unmodifiableList(different), Collections.unmodifiableList(missing));assert recoveryDiff.size() == this.metadata.size() - (metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) ? 1 : 0): "some files are missing recoveryDiff size: [" + recoveryDiff.size() + "] metadata size: [" +this.metadata.size() + "] contains  segments.gen: [" + metadata.containsKey(IndexFileNames.OLD_SEGMENTS_GEN) + "]";return recoveryDiff;}

这里将所有的segment file分为三类:identical(相同)、different(不同)、missing(target缺失)。然后将different和missing的segment files作为第一阶段需要恢复的文件发送到target node。发送完segment files后,源节点还会向目标节点发送消息以通知目标节点清理临时文件,然后也会发送消息通知目标节点打开引擎准备接收translog。

第二阶段的逻辑比较简单,只需将translog view到当前时间之间的所有translog发送给源节点即可。

第二阶段使用当前translog的快照,而不获取写锁(但是,translog快照是translog的时间点视图)。然后,它将每个translog操作发送到目标节点,以便将其重播到新的shard中。

    /*** Perform phase two of the recovery process.* <p>* Phase two uses a snapshot of the current translog *without* acquiring the write lock (however, the translog snapshot is* point-in-time view of the translog). It then sends each translog operation to the target node so it can be replayed into the new* shard.** @param startingSeqNo              the sequence number to start recovery from, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if all*                                   ops should be sent* @param endingSeqNo                the highest sequence number that should be sent* @param snapshot                   a snapshot of the translog* @param maxSeenAutoIdTimestamp     the max auto_id_timestamp of append-only requests on the primary* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it.* @param listener                   a listener which will be notified with the local checkpoint on the target.*/void phase2(final long startingSeqNo,final long endingSeqNo,final Translog.Snapshot snapshot,final long maxSeenAutoIdTimestamp,final long maxSeqNoOfUpdatesOrDeletes,final RetentionLeases retentionLeases,final long mappingVersion,final ActionListener<SendSnapshotResult> listener) throws IOException {if (shard.state() == IndexShardState.CLOSED) {throw new IndexShardClosedException(request.shardId());}logger.trace("recovery [phase2]: sending transaction log operations (from [" + startingSeqNo + "] to [" + endingSeqNo + "]");final StopWatch stopWatch = new StopWatch().start();final StepListener<Void> sendListener = new StepListener<>();final OperationBatchSender sender = new OperationBatchSender(startingSeqNo, endingSeqNo, snapshot, maxSeenAutoIdTimestamp,maxSeqNoOfUpdatesOrDeletes, retentionLeases, mappingVersion, sendListener);sendListener.whenComplete(ignored -> {final long skippedOps = sender.skippedOps.get();final int totalSentOps = sender.sentOps.get();final long targetLocalCheckpoint = sender.targetLocalCheckpoint.get();assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps + totalSentOps: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",snapshot.totalOperations(), snapshot.skippedOperations(), skippedOps, totalSentOps);stopWatch.stop();final TimeValue tookTime = stopWatch.totalTime();logger.trace("recovery [phase2]: took [{}]", tookTime);listener.onResponse(new SendSnapshotResult(targetLocalCheckpoint, totalSentOps, tookTime));}, listener::onFailure);sender.start();}

目标节点开始恢复

接收segment

对应上一小节源节点恢复的第一阶段,源节点将所有有差异的segment发送给目标节点,目标节点接收到后会将segment文件落盘。segment files的写入函数为RecoveryTarget.writeFileChunk:

真正执行的位置:MultiFileWriter.innerWriteFileChunk

public void writeFileChunk(StoreFileMetaData fileMetaData, long position, BytesReference content, boolean lastChunk, int totalTranslogOps) throws IOException {final Store store = store();final String name = fileMetaData.name();... ...if (position == 0) {indexOutput = openAndPutIndexOutput(name, fileMetaData, store);} else {indexOutput = getOpenIndexOutput(name); // 加一层前缀,组成临时文件}... ...while((scratch = iterator.next()) != null) { indexOutput.writeBytes(scratch.bytes, scratch.offset, scratch.length); // 写临时文件}... ...store.directory().sync(Collections.singleton(temporaryFileName));  // 这里会调用fsync落盘
}

打开引擎

经过上面的过程,目标节点完成了追数据的第一步。接收完segment后,目标节点打开shard对应的引擎准备接收translog,注意,这里打开引擎后,正在恢复的shard便可进行写入、删除(操作包括primary shard同步的请求和translog中的操作命令)。打开引擎的逻辑如下:

 /*** Opens the engine on top of the existing lucene engine and translog.* The translog is kept but its operations won't be replayed.*/public void openEngineAndSkipTranslogRecovery() throws IOException {assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);loadGlobalCheckpointToReplicationTracker();innerOpenEngineAndTranslog(replicationTracker);getEngine().skipTranslogRecovery();}private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) throws IOException {assert Thread.holdsLock(mutex) == false : "opening engine under mutex";if (state != IndexShardState.RECOVERING) {throw new IndexShardNotRecoveringException(shardId, state);}final EngineConfig config = newEngineConfig(globalCheckpointSupplier);// we disable deletes since we allow for operations to be executed against the shard while recovering// but we need to make sure we don't loose deletes until we are done recoveringconfig.setEnableGcDeletes(false);// 恢复过程中不删除translogupdateRetentionLeasesOnReplica(loadRetentionLeases());assert recoveryState.getRecoverySource().expectEmptyRetentionLeases() == false || getRetentionLeases().leases().isEmpty(): "expected empty set of retention leases with recovery source [" + recoveryState.getRecoverySource()+ "] but got " + getRetentionLeases();synchronized (engineMutex) {assert currentEngineReference.get() == null : "engine is running";verifyNotClosed();// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).final Engine newEngine = engineFactory.newReadWriteEngine(config);// 创建engineonNewEngine(newEngine);currentEngineReference.set(newEngine);// We set active because we are now writing operations to the engine; this way,// we can flush if we go idle after some time and become inactive.active.set(true);}// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.onSettingsChanged();assert assertSequenceNumbersInCommit();recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);}

接收并重放translog

打开引擎后,便可以根据translog中的命令进行相应的回放动作,回放的逻辑和正常的写入、删除类似,这里需要根据translog还原出操作类型和操作数据,并根据操作数据构建相应的数据对象,然后再调用上一步打开的engine执行相应的操作,这块逻辑如下:

IndexShard#runTranslogRecovery
=>
IndexShard#applyTranslogOperation

// 重放translog快照中的translog操作到当前引擎。
// 在成功回放每个translog操作后,会通知回调onOperationRecovered。/*** Replays translog operations from the provided translog {@code snapshot} to the current engine using the given {@code origin}.* The callback {@code onOperationRecovered} is notified after each translog operation is replayed successfully.*/int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operation.Origin origin,Runnable onOperationRecovered) throws IOException {int opsRecovered = 0;Translog.Operation operation;while ((operation = snapshot.next()) != null) {try {logger.trace("[translog] recover op {}", operation);Engine.Result result = applyTranslogOperation(engine, operation, origin);switch (result.getResultType()) {case FAILURE:throw result.getFailure();case MAPPING_UPDATE_REQUIRED:throw new IllegalArgumentException("unexpected mapping update: " + result.getRequiredMappingUpdate());case SUCCESS:break;default:throw new AssertionError("Unknown result type [" + result.getResultType() + "]");}opsRecovered++;onOperationRecovered.run();} catch (Exception e) {// TODO: Don't enable this leniency unless users explicitly opt-inif (origin == Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY && ExceptionsHelper.status(e) == RestStatus.BAD_REQUEST) {// mainly for MapperParsingException and Failure to detect xcontentlogger.info("ignoring recovery of a corrupt translog entry", e);} else {throw ExceptionsHelper.convertToRuntime(e);}}}return opsRecovered;}private Engine.Result applyTranslogOperation(Engine engine, Translog.Operation operation,Engine.Operation.Origin origin) throws IOException {// If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type.final VersionType versionType = (origin == Engine.Operation.Origin.PRIMARY) ? VersionType.EXTERNAL : null;final Engine.Result result;switch (operation.opType()) {// 还原出操作类型及操作数据并调用engine执行相应的动作case INDEX:final Translog.Index index = (Translog.Index) operation;// we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all// autoGeneratedID docs that are coming from the primary are updated correctly.result = applyIndexOperation(engine, index.seqNo(), index.primaryTerm(), index.version(),versionType, UNASSIGNED_SEQ_NO, 0, index.getAutoGeneratedIdTimestamp(), true, origin,new SourceToParse(shardId.getIndexName(), index.id(), index.source(),XContentHelper.xContentType(index.source()), index.routing()));break;case DELETE:final Translog.Delete delete = (Translog.Delete) operation;result = applyDeleteOperation(engine, delete.seqNo(), delete.primaryTerm(), delete.version(), delete.id(),versionType, UNASSIGNED_SEQ_NO, 0, origin);break;case NO_OP:final Translog.NoOp noOp = (Translog.NoOp) operation;result = markSeqNoAsNoop(engine, noOp.seqNo(), noOp.primaryTerm(), noOp.reason(), origin);break;default:throw new IllegalStateException("No operation defined for [" + operation + "]");}return result;}

通过上面的步骤,translog的重放完毕,此后需要做一些收尾的工作,包括,refresh让回放后的最新数据可见,打开translog gc:

 /*** perform the last stages of recovery once all translog operations are done.* note that you should still call {@link #postRecovery(String)}.*/public void finalizeRecovery() {recoveryState().setStage(RecoveryState.Stage.FINALIZE);Engine engine = getEngine();engine.refresh("recovery_finalization");engine.config().setEnableGcDeletes(true);}

到这里,replica shard恢复的两个阶段便完成了,由于此时shard还处于INITIALIZING状态,还需通知master节点启动已恢复的shard:

IndicesClusterStateService#RecoveryListener

        @Overridepublic void onRecoveryDone(final RecoveryState state, ShardLongFieldRange timestampMillisFieldRange) {shardStateAction.shardStarted(shardRouting,primaryTerm,"after " + state.getRecoverySource(),timestampMillisFieldRange,SHARD_STATE_ACTION_LISTENER);}

至此,shard recovery的所有流程都已完成。

小结

疑问点1:网上说的对不对?新版本中有没有更新?

到这里完整跟着腾讯云的文档走了一遍,主体的流程在Elasticsearch 8.0.0-SNAPSHOT中基本一样,只是部分方法有些调整。

所以下面这个流程还是正确的:

ES副本分片恢复主要涉及恢复的目标节点和源节点,目标节点即故障恢复的节点,源节点为提供恢复的节点。目标节点向源节点发送分片恢复请求,源节点接收到请求后主要分两阶段来处理。第一阶段,对需要恢复的shard创建snapshot,然后根据请求中的metadata对比如果 syncid 相同且 doc 数量相同则跳过,否则对比shard的segment文件差异,将有差异的segment文件发送给target node。第二阶段,为了保证target node数据的完整性,需要将本地的translog发送给target node,且对接收到的translog进行回放。整体流程如下图所示:
在这里插入图片描述

疑问点2:在分片恢复的时候,如果收到Api _forcemerge请求,这时候,会如何处理?

这部分等看/_forcemerge api的时候,再解答一下。

疑问点3:片恢复的第二阶段是同步translog,这一步会不会加锁?不加锁的话,如何确保是同步完成了?

完整性

首先,phase1阶段,保证了存量的历史数据可以恢复到从分片。phase1阶段完成后,从分片引擎打开,可以正常处理index、delete请求,而translog覆盖完了整个phase1阶段,因此在phase1阶段中的index/delete操作都将被记录下来,在phase2阶段进行translog回放时,副本分片正常的index和delete操作和translog是并行执行的,这就保证了恢复开始之前的数据、恢复中的数据都会完整的写入到副本分片,保证了数据的完整性。如下图所示:

在这里插入图片描述

一致性

由于phase1阶段完成后,从分片便可正常处理写入操作,而此时从分片的写入和phase2阶段的translog回放时并行执行的,如果translog的回放慢于正常的写入操作,那么可能会导致老的数据后写入,造成数据不一致。ES为了保证数据的一致性在进行写入操作时,会比较当前写入的版本和lucene文档版本号,如果当前版本更小,说明是旧数据则不会将文档写入lucene。相关代码如下:

// https://github.com/jiankunking/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java#L993
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {plan = IndexingStrategy.processAsStaleOp(index.version(), 0);
}

拓展:如何保证副分片和主分片一致

整理自:《Elasticsearch源码解析与优化实战》

索引恢复过程的一个难点在于如何维护主副分片的一致性。假设副分片恢复期间一直有写操作,如何实现一致呢?
我们先看看早期的做法:在2.0版本之前,副分片恢复要经历三个阶段。

  • phase1:将主分片的 Lucene 做快照,发送到 target。期间不阻塞索引操作,新增数据写到主分片的translog。
  • phase2:将主分片translog做快照,发送到target重放,期间不阻塞索引操作。
  • phase3:为主分片加写锁,将剩余的translog发送到target。此时数据量很小,写入过程的阻塞很短。

从理论上来说,只要流程上允许将写操作阻塞一段时间,实现主副一致是比较容易的。

但是后来(从2.0版本开始),也就是引入translog.view概念的同时,phase3被删除。

phase3被删除,这个阶段是重放操作(operations),同时防止新的写入 Engine。这是不必要的,因为自恢复开始,标准的 index 操作会发送所有的操作到正在恢复中的分片。重放恢复开始时获取的view中的所有操作足够保证不丢失任何操作。

阻塞写操作的phase3被删除,恢复期间没有任何写阻塞过程。接下来需要处理的就是解决phase1和phase2之间的写操作与phase2重放操作之间的时序和冲突问题。在副分片节点,phase1结束后,假如新增索引操作和 translog 重放操作并发执行,因为时序的关系会出现新老数据交替。如何实现主副分片一致呢?

假设在第一阶段执行期间,有客户端索引操作要求将docA的内容写为1,主分片执行了这个操作,而副分片由于尚未就绪所以没有执行。第二阶段期间客户端索引操作要求写 docA 的内容为2,此时副分片已经就绪,先执行将docA写为2的新增请求,然后又收到了从主分片所在节点发送过来的translog重复写docA为1的请求该如何处理?具体流程如下图所示。

在这里插入图片描述

答案是在写流程中做异常处理,通过版本号来过滤掉过期操作。写操作有三种类型:索引新文档、更新、删除。索引新文档不存在冲突问题,更新和删除操作采用相同的处理机制。每个操作都有一个版本号,这个版本号就是预期doc版本,它必须大于当前Lucene中的doc版本号,否则就放弃本次操作。对于更新操作来说,预期版本号是Lucene doc版本号+1。主分片节点写成功后新数据的版本号会放到写副本的请求中,这个请求中的版本号就是预期版本号。

这样,时序上存在错误的操作被忽略,对于特定doc,只有最新一次操作生效,保证了主副分片一致。

我们分别看一下写操作三种类型的处理机制。

1.索引新文档

不存在冲突问题,不需要处理。

2.更新

判断本次操作的版本号是否小于Lucene中doc的版本号,如果小于,则放弃本次操作。

Index、Delete都继承自Operation,每个Operation都有一个版本号,这个版本号就是doc版本号。对于副分片的写流程来说,正常情况下是主分片写成功后,相应doc写入的版本号被放到转发写副分片的请求中。对于更新来说,就是通过主分片将原doc版本号+1后转发到副分片实现的。在对比版本号的时候:

expectedVersion = 写副分片请求中的 version = 写主分片成功后的version

通过下面的方法判断当前操作的版本号是否低于Lucene中的版本号:

// VersionType#isVersionConflictForWritesEXTERNAL((byte) 1) {@Overridepublic boolean isVersionConflictForWrites(long currentVersion, long expectedVersion, boolean deleted) {if (currentVersion == Versions.NOT_FOUND) {return false;}if (expectedVersion == Versions.MATCH_ANY) {return true;}if (currentVersion >= expectedVersion) {return true;}return false;}}

如果translog重放的操作在写一条"老"数据,则compareOpToLuceneDocBasedOnSeqNo会返回OpVsLuceneDocStatus.OP_STALE_OR_EQUAL。

private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException {assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found";final OpVsLuceneDocStatus status;VersionValue versionValue = getVersionFromMap(op.uid().bytes());assert incrementVersionLookup();if (versionValue != null) {status = compareOpToVersionMapOnSeqNo(op.id(), op.seqNo(), op.primaryTerm(), versionValue);} else {// load from indexassert incrementIndexVersionLookup();try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) {final DocIdAndSeqNo docAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.getIndexReader(), op.uid());if (docAndSeqNo == null) {status = OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND;} else if (op.seqNo() > docAndSeqNo.seqNo) {status = OpVsLuceneDocStatus.OP_NEWER;} else if (op.seqNo() == docAndSeqNo.seqNo) {assert localCheckpointTracker.hasProcessed(op.seqNo()) :"local checkpoint tracker is not updated seq_no=" + op.seqNo() + " id=" + op.id();status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;} else {status = OpVsLuceneDocStatus.OP_STALE_OR_EQUAL;}}}return status;}

副分片在InternalEngine#index函数中通过plan判断是否写到Lucene:

// non-primary mode (i.e., replica or recovery)
final IndexingStrategy plan = indexingStrategyForOperation(index);

在 indexingStrategyForOperation函数中,plan的最终结果就是plan =IndexingStrategy.processButSkipLucene,后面会跳过写Lucene和translog的逻辑。

3. 删除

判断本次操作中的版本号是否小于Lucene中doc的版本号,如果小于,则放弃本次操作。

通过compareOpToLuceneDocBasedOnSeqNo方法判断本次操作是否小于Lucenne中doc的版本号,与Index操作时使用相同的比较函数。

类似的,在InternalEngine#delete函数中判断是否写到Lucene:

final DeletionStrategy plan = deletionStrategyForOperation(delete);

如果translog重放的是一个"老"的删除操作,则compareOpToLuceneDocBasedOnSeqNo会返回OpVsLuceneDocStatus.OP_STALE_OR_EQUAL。

plan的最终结果就是plan=DeletionStrategy.processButSkipLucene,后面会跳过Lucene删除的逻辑。

  /*** the status of the current doc version in lucene, compared to the version in an incoming* operation*/enum OpVsLuceneDocStatus {/** the op is more recent than the one that last modified the doc found in lucene*/OP_NEWER,/** the op is older or the same as the one that last modified the doc found in lucene*/OP_STALE_OR_EQUAL,/** no doc was found in lucene */LUCENE_DOC_NOT_FOUND}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/229268.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Docker之网络配置

目录 1.网络概念 网络相关的有ip,子网掩码,网关,DNS,端口号 1.1 ip是什么? ip是唯一定位一台网上计算机 Ip地址的分类: IPV4: 4字节32位整数&#xff0c;并分成4段8位的二进制数&#xff0c;每8位之间用圆点隔开&#xff0c;每8位整数可以转换为一个0~255的十进制整数 【例…

ios苹果app应用程序录屏开发有哪些难点和注意点?

首先&#xff0c;让我们简单了解一下iOS录屏的基本原理。iOS录屏是指将设备屏幕上的内容实时捕捉并保存为视频文件的过程。这在教学、演示和用户支持等场景中非常有用。iOS录屏可以通过使用ReplayKit框架来实现&#xff0c;该框架提供了一套API&#xff0c;用于捕捉屏幕上的内容…

【数据结构-单链表】(C语言版本)

今天分享的是数据结构有关单链表的操作和实践&#xff08;图解法&#xff0c;图变化更利于理解&#xff09; 记录宗旨&#x1f4dd;&#xff1a; 眼&#xff08;脑&#xff09;过千遍&#xff0c;不如手过一遍。 我们都知道单链表是一种常见的链表数据结构&#xff0c;由一系列…

SpringMVC-HelloWorld

一、SpringMVC简介 1.1 SpringMVC和三层架构 MVC是一种软件架构思想&#xff0c;将软件按照模型、视图和控制器三个部分划分。 M&#xff1a;model&#xff0c;模型层&#xff0c;指工程中的JavaBean&#xff0c;用于处理数据。JavaBean分为两类&#xff1a; 实体类Bean&…

KG+LLM(一)KnowGPT: Black-Box Knowledge Injection for Large Language Models

论文链接&#xff1a;2023.12-https://arxiv.org/pdf/2312.06185.pdf 1.Background & Motivation 目前生成式的语言模型&#xff0c;如ChatGPT等在通用领域获得了巨大的成功&#xff0c;但在专业领域&#xff0c;由于缺乏相关事实性知识&#xff0c;LLM往往会产生不准确的…

瑞吉外卖项目详细总结

文章目录 瑞吉外卖1.技术栈2.项目文件架构3.业务功能模块&#xff08;例子&#xff09;3.1管理员登录接口层(Controller)3.2管理员登录实现层(ServiceImpl)3.3管理员登录服务层&#xff08;Service&#xff09;3.4管理员登录Mapper层 4.公共模块4.1 BaseContext&#xff08;保存…

王力机器人安全门|用细节开拓高端精致家居生活

细微之处见风范,毫厘之优定乾坤。在追求高端品质的道路上,细节往往是最有力的诠释。如在入户门的选择方面,考虑到老人、孩子、宠物等每一位家庭成员不同需求的设计、科技运用才称得上是充满人性化、品质化的高端细节,幸福感直抵心灵。在该方面,王力机器人安全门做出了表率,每一…

【连接池】-从源码到适配(下),使用dynamic-datasource导致连接池没生效(升级版本)

写在前面 书接上文&#xff0c;连接池没生效&#xff0c;启用了一个什么默认的连接池。具体是什么&#xff0c;一起来看看源码吧。 目录 写在前面一、问题描述二、本地调试三、升级dynamic-datasource四、新的问题&#xff08;一&#xff09;数据源初始化问题&#xff08;二&am…

公司创建百度百科需要哪些内容?

一个公司或是一个品牌想要让自己更有身份&#xff0c;更有知名度&#xff0c;更有含金量&#xff0c;百度百科词条是必不可少的。通过百度百科展示公司的详细信息&#xff0c;有助于增强用户对公司的信任感&#xff0c;提高企业形象。通过百度百科展示公司的发展历程、领导团队…

ASP.Net实现汽车添加查询(三层架构,含照片)

演示功能&#xff1a; 点击启动生成页面 点击搜索模糊查询 点击添加跳转新界面 此处设置文本框多行 点击Button添加 步骤&#xff1a; 1、建文件 下图是三层架构列表&#xff0c;Models里面有模拟数据库中列的类&#xff0c;DAL中有DBHelper和service,BLL中有BllManager文件…

TiDB 7.5 LTS 发版丨提升规模化场景下关键应用的稳定性和成本的灵活性

互联网时代&#xff0c;数据的迅猛增长给数据库带来了可扩展性的挑战&#xff0c;Gen AI 带来的数据暴增更加剧了这种挑战。传统的数据分片已经不能承载新时代数据暴增的需求&#xff0c;更简单且具有前瞻性的方法则是采用原生分布式数据库来解决扩展性问题。在这种规模化场景的…

SpringValidation自定义注解以及分组校验

SpringValidation的参数校验使用可参考&#xff1a;【SpringMVC应用篇】Spring Validation 参数校验-CSDN博客 目录 1. 引入依赖 2. 自定义注解校验 2.1 创建Validation类 2.2 创建注解对象 2.3 使用注解 3. 分组校验 3.1 实体类内部定义接口 3.2 在参数上指定分组 1. …

CISSP 第1章:实现安全治理的原则和策略

作者&#xff1a;nothinghappend 链接&#xff1a;https://zhuanlan.zhihu.com/p/669881930 来源&#xff1a;知乎 著作权归作者所有。商业转载请联系作者获得授权&#xff0c;非商业转载请注明出处。 CIA CIA 三性&#xff1a; 机密性&#xff1a;和数据泄露有关。完整性…

存算分离降本增效,StarRocks 助力聚水潭 SaaS 业务服务化升级

作者&#xff1a;聚水潭数据研发负责人 溪竹 聚水潭是中国领先的 SaaS 软件服务商&#xff0c;核心产品是电商 ERP&#xff0c;协同350余家电商平台&#xff0c;为商家提供综合的信息化、数字化解决方案。公司是偏线下商家侧的 toB 服务商&#xff0c;员工人数超过3500&#xf…

C++初阶------------------入门C++

作者前言 &#x1f382; ✨✨✨✨✨✨&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f382; ​&#x1f382; 作者介绍&#xff1a; &#x1f382;&#x1f382; &#x1f382; &#x1f389;&#x1f389;&#x1f389…

带大家做一个,易上手的家常糖醋白菜

准备 如果是大白菜就一个 小白菜就要两个 因为白菜炒完之后会变少 将白菜叶剥开每叶分成三个小块 整个剥完之后 放入盆中清洗干净 调一个糖醋汁 一勺料酒 两勺生抽 三勺白砂糖 四勺香醋 起锅烧油 放两个干辣椒 辣椒炒一下 然后倒入白菜 翻炒直到油全部融入白菜 然后倒入…

Windows本地如何部署Apache服务器搭配内网穿透实现无公网IP远程访问?

文章目录 前言1.Apache服务安装配置1.1 进入官网下载安装包1.2 Apache服务配置 2.安装cpolar内网穿透2.1 注册cpolar账号2.2 下载cpolar客户端 3. 获取远程桌面公网地址3.1 登录cpolar web ui管理界面3.2 创建公网地址 4. 固定公网地址 前言 Apache作为全球使用较高的Web服务器…

自然语言处理1——探索自然语言处理的基础 - Python入门篇

目录 写在开头1. 介绍自然语言处理的基本概念1.1 NLP的核心目标1.2 常见的NLP任务1.3 应用场景详细介绍1.3.1 医疗保健1.3.2 金融领域1.3.3 教育领域1.3.4 社交媒体分析 2. Python中常用的自然语言处理库简介2.1 NLTK (Natural Language Toolkit)2.2 Spacy2.3 Transformers2.4 …

张量操作与线性回归

一、张量的操作&#xff1a;拼接、切分、索引和变换 &#xff08;1&#xff09;张量拼接与切分 1.1 torch.cat() 功能&#xff1a;将张量按维度dim进行拼接 • tensors: 张量序列 • dim : 要拼接的维度 torch.cat(tensors, dim0, outNone)函数用于沿着指定维度dim将多个张量…

ES6之生成器(Generator)

✨ 专栏介绍 在现代Web开发中&#xff0c;JavaScript已经成为了不可或缺的一部分。它不仅可以为网页增加交互性和动态性&#xff0c;还可以在后端开发中使用Node.js构建高效的服务器端应用程序。作为一种灵活且易学的脚本语言&#xff0c;JavaScript具有广泛的应用场景&#x…