背景
在前文Starrocks 写入报错 primary key memory usage exceeds the limit中,可以通过ALTER TABLE xxxx DISTRIBUTED BY HASH(xx) BUCKETS 50;
来改变数据的分布状态,具体的执行过程是怎么样的呢?
分析
首先对应的g4文件中为 alterTableStatement ,这里最终的调用是 AlterJobExecutor.visitAlterTableStatement:
if (statement.hasSchemaChangeOp()) {Locker locker = new Locker();locker.lockTableWithIntensiveDbLock(db, table.getId(), LockType.WRITE);try {SchemaChangeHandler schemaChangeHandler = GlobalStateMgr.getCurrentState().getSchemaChangeHandler();assert table instanceof OlapTable;schemaChangeHandler.process(statement.getAlterClauseList(), db, (OlapTable) table);} catch (UserException e) {throw new AlterJobException(e.getMessage());} finally {locker.unLockTableWithIntensiveDbLock(db, table, LockType.WRITE);}isSynchronous = false;
schemaChangeHandler.process
会创建OptimizeJobV2
实例去优化对象,数据链路如下:
SchemaChangeHandler.process||\/
analyzeAndCreateJob||\/
createOptimizeTableJob||\/
OptimizeJobV2Builder.build()||\/
new OptimizeJobV2()
SchemaChangeHandler.process 会把当前的OptimizeJobV2
job 放入要执行的队列中,之后SchemaChangeHandler 以 alter_scheduler_interval_millisecond (10000ms)的轮询间隔从队列中取出要执行的任务,并调用run
方法.run
方法如下:
public synchronized void run() {if (isTimeout()) {cancelImpl("Timeout");return;}// create connectcontextcreateConnectContextIfNeeded();try {while (true) {JobState prevState = jobState;switch (prevState) {case PENDING:runPendingJob();break;case WAITING_TXN:runWaitingTxnJob();break;case RUNNING:runRunningJob();break;case FINISHED_REWRITING:runFinishedRewritingJob();break;default:break;}if (jobState == prevState) {break;} // else: handle the new state}} catch (AlterCancelException e) {cancelImpl(e.getMessage());}}
- PENDING
创建完任务初始状态就是PENDING
,所以调用runPendingJob()
方法,这里有几个关键点是
- 创建该Alter语句涉及到的所有的分区
- 检查改任务所涉及到表的状态,必须该表的tablet都为健康状态才可以进行下一步,否则设置该表的状态为
WAITING_STABLE
,并直接跳过该任务 - 会获取到在一个事务的ID
- 改变该作业的状态为
WAITING_TXN
- WAITING_TXN
如果任务所涉到的表为正常状态,则会进入runWaitingTxnJob()
方法,这里的几个关键点是
- 会等待在该任务对应的事务之前的事务都运行完才会执行该任务
- 每个分区建立一个任务,并把分区写入一个临时分区中
- 改变该作业的状态为
RUNNING
- RUNNING
如果任务正常运行的话,则会进入runRunningJob()
方法,这里的几个关键点是
- 等待所有的写入临时分区的任务完成
- 锁住该表所在库以及该表,并且是排他锁,所以读取该库的操作也是不可行的
- 替换临时分区到对应的分区上去
- 改变该作业的状态为
FINISHED