织梦做商城网站,网站推广运营公司,游戏代理免费,汉子由来 外国人做的网站背景
本文基于 Starrocks 3.1.7
结论
Starrocks 会启动一个线程周期性的去进行Compaction#xff0c;该周期间隔为 200 MS, 该Compaction以table的partition为切入点#xff0c;tablet(也就是bucket)为粒度进行task的创建。
分析
CompactionMgr start 方法会启动一个Com…背景
本文基于 Starrocks 3.1.7
结论
Starrocks 会启动一个线程周期性的去进行Compaction该周期间隔为 200 MS, 该Compaction以table的partition为切入点tablet(也就是bucket)为粒度进行task的创建。
分析
CompactionMgr start 方法会启动一个CompactionScheduler 用来启动一个 合并的周期性任务. 这里的周期会由 LOOP_INTERVAL_MS参数控制默认是 200ms. 然后每个周期内会调用 runOneCycle 方法: protected void runOneCycle() {cleanPartition();// Schedule compaction tasks only when this is a leader FE and all edit logs have finished replay.// In order to ensure that the input rowsets of compaction still exists when doing publishing version, it is// necessary to ensure that the compaction task of the same partition is executed serially, that is, the next// compaction task can be executed only after the status of the previous compaction task changes to visible or// canceled.if (stateMgr.isLeader() stateMgr.isReady() allCommittedCompactionsBeforeRestartHaveFinished()) {schedule();history.changeMaxSize(Config.lake_compaction_history_size);failHistory.changeMaxSize(Config.lake_compaction_fail_history_size);}}
cleanPartition 这里会清除无效的分区便于后续进行Compaction这里会有个 FE leader的判断这里所涉及到的GlobalStateMgr只是单个FE的状态,只有是leader节点才可以进行Compaction最主要的逻辑还是在schedule 方法中 for (IteratorMap.EntryPartitionIdentifier, CompactionJob iterator runningCompactions.entrySet().iterator();...if (job.isCompleted()) {job.getPartition().setMinRetainVersion(0);try {commitCompaction(partition, job);assert job.transactionHasCommitted();} catch (Exception e) {...}} else if (job.isFailed()) {job.getPartition().setMinRetainVersion(0);errorMsg Objects.requireNonNull(job.getFailMessage(), getFailMessage() is null);job.abort(); // Abort any executing task, if present.}if (errorMsg ! null) {iterator.remove();job.finish();failHistory.offer(CompactionRecord.build(job, errorMsg));compactionManager.enableCompactionAfter(partition, MIN_COMPACTION_INTERVAL_MS_ON_FAILURE);abortTransactionIgnoreException(partition.getDbId(), job.getTxnId(), errorMsg);continue;}...int index 0;int compactionLimit compactionTaskLimit();int numRunningTasks runningCompactions.values().stream().mapToInt(CompactionJob::getNumTabletCompactionTasks).sum();if (numRunningTasks compactionLimit) {return;}ListPartitionIdentifier partitions compactionManager.choosePartitionsToCompact(runningCompactions.keySet());while (numRunningTasks compactionLimit index partitions.size()) {PartitionIdentifier partition partitions.get(index);CompactionJob job startCompaction(partition);if (job null) {continue;}numRunningTasks job.getNumTabletCompactionTasks();runningCompactions.put(partition, job);if (LOG.isDebugEnabled()) {LOG.debug(Created new compaction job. partition{} txnId{}, partition, job.getTxnId());}} 选取正在进行的Compaction的job,如果该任务完成了compaction(每个tablets都完成了compaction) 但是事务没有提交则完成compaction事务的提交 否则如果任务失败了则abort该job。最终会把该任务从runnning队列中移除掉。如果是失败任务的话还会记录到failHistory中并会重新进行Compaction的任务的延迟提交延迟间隔为LOOP_INTERVAL_MS*10其中LOOP_INTERVAL_MS 为200ms 如果Compaction事务已经提交了则会记录到history中并会重新进行Compaction的任务的延迟提交延迟间隔为LOOP_INTERVAL_MS*2其中LOOP_INTERVAL_MS 为200ms 处理完正在运行的Compaction任务后会构建当前的Compaction任务 首先会通过compactionTaskLimit方法获取本次Compaction任务的个数限制如果lake_compaction_max_tasks大于等于0则会根据lake_compaction_max_tasks配置来否则会根据系统的BE数和CN数乘以16来计算。如果 运行的task以Tablets为粒度计数的大于了该compactionTaskLimit则此次Compaction结束,否则继续下一步compactionManager.choosePartitionsToCompact 从已有的分区中。并且排除掉 runningCompactions里正在运行的Compaction任务中涉及的partition。 choosePartitionsToCompact 涉及到Sorter默认ScoreSorter 和selectorScoreSelector ScoreSelector 会选择 lake_compaction_score_selector_min_score(默认为10)并且到了合并的时间的分区 ScoreSorter 会按照compactionScore 从高到低进行排序对于每一个被选出来的分区会进行调用startCompaction方法进行compaction任务的构建 这里会调用collectPartitionTablets方法用来选择tablet以及对应的该tablet对应的backend调用createCompactionTasks创建CompactionTask,这里有多少个backend就有多少个task 调用thrift rpc服务往对应的backend发送Compact请求,并组装成CompactionJob ListCompactionTask tasks new ArrayList();for (Map.EntryLong, ListLong entry : beToTablets.entrySet()) {ComputeNode node systemInfoService.getBackendOrComputeNode(entry.getKey());if (node null) {throw new UserException(Node entry.getKey() has been dropped);}LakeService service BrpcProxy.getLakeService(node.getHost(), node.getBrpcPort());CompactRequest request new CompactRequest();request.tabletIds entry.getValue();request.txnId txnId;request.version currentVersion;request.timeoutMs LakeService.TIMEOUT_COMPACT;CompactionTask task new CompactionTask(node.getId(), service, request);tasks.add(task);}return tasks;累计numRunningTasks计数便于控制Compaction的并发执行并且回放到 runningCompactions中
其他
前文提到的 一些 FE的配置 如lake_compaction_max_tasks 都是可以配置的 可以通过 命令* admin set frontend config (“lake_compaction_max_tasks” “0”);* 具体的参考ADMIN_SET_CONFIG, 注意 这个命令只是修改了当前内存中的变量的值如果需要永久的修改需要配置到fe.conf中