背景
本文基于Starrocks 3.1.7
结论
SR一致性检查主要是涉及两个部分:1. tablet元数据的一致性检查每间隔两个两个小时进行检查,检查TabletInvertedIndex LocalMetastore的tablet的一致性,如果tablet既不在当前catalog中也不在回收站里,就直接从当前的TabletInvertedIndex 删除掉2. tablet的数据的一致性检查从 23点到4点 一直循环进行 一致性检查,如果上一批还没完,则等待上一批的全部检查完才进行下一批的一致性检查,以db/table/partition/index/tablet这种层次 按照 lastCheckTime 从远到近排序,选择最多100个tablet进行一致性校验, 选择的tablet条件为:1. 不是 olap table以及不是物化视图的的 Normal表不校验2. 分区副本数据为1的不校验3. 已经校验的tablet不校验
其中涉及到的 变量为 consistency_check_end_time consistency_check_start_time 以及 MAX_JOB_NUM
分析
涉及到的数据链路如下:
getConsistencyChecker().start();||\/Daemon.runOneCycle||\/ConsistencyChecker.runAfterCatalogReady
主要的逻辑也是在runAfterCatalogReady中:
if (System.currentTimeMillis() - lastTabletMetaCheckTime > Config.consistency_tablet_meta_check_interval_ms) {checkTabletMetaConsistency();lastTabletMetaCheckTime = System.currentTimeMillis();}// for each round. try chose enough new tablets to check// only add new job when it's work timeif (itsTime() && getJobNum() == 0) {List<Long> chosenTabletIds = chooseTablets();for (Long tabletId : chosenTabletIds) {CheckConsistencyJob job = new CheckConsistencyJob(tabletId);addJob(job);}}jobsLock.writeLock().lock();try {// handle all jobsIterator<Map.Entry<Long, CheckConsistencyJob>> iterator = jobs.entrySet().iterator();while (iterator.hasNext()) {Map.Entry<Long, CheckConsistencyJob> entry = iterator.next();CheckConsistencyJob oneJob = entry.getValue();JobState state = oneJob.getState();switch (state) {case PENDING:if (!oneJob.sendTasks()) {clearJob(oneJob);iterator.remove();}break;case RUNNING:int res = oneJob.tryFinishJob();if (res == -1 || res == 1) {// cancelled or finishedclearJob(oneJob);iterator.remove();}break;default:break;}} // end while} finally {jobsLock.writeLock().unlock();}
- 进行tabllet元数据的检查,间隔为两个小时,主要是在checkTabletMetaConsistency方法中,该方法的主要就是判断如果tablet既不在当前catalog中也不在回收站
里,就直接从当前的TabletInvertedIndex 删除掉,逻辑比较清晰。 - tablet数据的一致性检查
- 首先是判断是否是处于一致性检查的时间断,consistency_check_start_time(默认是23),consistency_check_end_time(默认是4),也就是每天的23点到4点之间进行一致性检查 且等待上一批次的一致性检查任务都结束了才进行下一轮的检查
- 再次是选择进行一致性检查的tablet,见方法 chooseTablets ,这块逻辑也很清晰,以db/table/partition/index/tablet这种层次 按照 lastCheckTime 从
远到近排序,选择最多100个tablet进行一致性校验 - 组装成 CheckConsistencyJob ,并放入到jobs中
- 调用sendTasks方法,从jobs中 构造AgentBatchTask,并提交給backend,进行一致性检查
AgentBatchTask batchTask = new AgentBatchTask();...for (Replica replica : tablet.getImmutableReplicas()) {// 1. if state is CLONE, do not send task at this timeif (replica.getState() == ReplicaState.CLONE|| replica.getState() == ReplicaState.DECOMMISSION) {continue;}if (replica.getDataSize() > maxDataSize) {maxDataSize = replica.getDataSize();}CheckConsistencyTask task = new CheckConsistencyTask(null, replica.getBackendId(),tabletMeta.getDbId(),tabletMeta.getTableId(),tabletMeta.getPartitionId(),tabletMeta.getIndexId(),tabletId, checkedSchemaHash,checkedVersion);// add task to sendbatchTask.addTask(task);// init checksum as '-1'checksumMap.put(replica.getBackendId(), -1L);++sentTaskReplicaNum;}...for (AgentTask task : batchTask.getAllTasks()) {AgentTaskQueue.addTask(task);}AgentTaskExecutor.submit(batchTask);
- 构建AgentBatchTask实例,
- 对于每一个tablet的replica副本,构建CheckConsistencyTask,并加到AgentBatchTask中去,并在最后添加到AgentTaskQueue队列中,便于进行跟踪
- 调用AgentTaskExecutor.submit(batchTask)把任务提交到后端的backend中去执行,主要代码在AgentBatchTask.run方法中
public void run() {for (Long backendId : this.backendIdToTasks.keySet()) {BackendService.Client client = null;TNetworkAddress address = null;boolean ok = false;try {ComputeNode computeNode = GlobalStateMgr.getCurrentSystemInfo().getBackend(backendId);if (RunMode.getCurrentRunMode() == RunMode.SHARED_DATA && computeNode == null) {computeNode = GlobalStateMgr.getCurrentSystemInfo().getComputeNode(backendId);}if (computeNode == null || !computeNode.isAlive()) {continue;}String host = computeNode.getHost();int port = computeNode.getBePort();List<AgentTask> tasks = this.backendIdToTasks.get(backendId);// create AgentClientaddress = new TNetworkAddress(host, port);client = ClientPool.backendPool.borrowObject(address);List<TAgentTaskRequest> agentTaskRequests = new LinkedList<TAgentTaskRequest>();for (AgentTask task : tasks) {agentTaskRequests.add(toAgentTaskRequest(task));}client.submit_tasks(agentTaskRequests);
- 根据每个tablet的backend的host和port,构建Thrift client,并调用toAgentTaskRequest把每个AgentTask转换为thridt格式的请求
- 调用client.submit_tasks提交到backend执行CHECK_CONSISTENCY任务