Merge pull request #12210 from taosdata/feature/TD-14481-3.0
feat: rollup sma optimization
This commit is contained in:
commit
61098d3ea7
|
@ -249,8 +249,10 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
|
||||||
pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
|
pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey);
|
||||||
pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
|
pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey);
|
||||||
lastKey = rowKey;
|
lastKey = rowKey;
|
||||||
++pCols->numOfRows;
|
if (pCols) {
|
||||||
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false);
|
++pCols->numOfRows;
|
||||||
|
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, false);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true);
|
tsdbAppendTableRowToCols(pTable, pCols, &pSchema, row, true);
|
||||||
}
|
}
|
||||||
|
@ -279,7 +281,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
if (lastKey != TSKEY_INITIAL_VAL) {
|
if (pCols && (lastKey != TSKEY_INITIAL_VAL)) {
|
||||||
++pCols->numOfRows;
|
++pCols->numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1638,7 +1638,7 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) {
|
||||||
tsdbWarn("vgId:%d tsma create msg received but deserialize failed since %s", REPO_ID(pTsdb), terrstr(terrno));
|
tsdbWarn("vgId:%d tsma create msg received but deserialize failed since %s", REPO_ID(pTsdb), terrstr(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbDebug("vgId:%d tsma create msg %s:%" PRIi64 " for table %" PRIi64 " received", REPO_ID(pTsdb),
|
tsdbDebug("vgId:%d tsma create msg %s:%" PRIi64 " for table %" PRIi64 " received", REPO_ID(pTsdb),
|
||||||
vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid, vCreateSmaReq.tSma.tableUid);
|
vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid, vCreateSmaReq.tSma.tableUid);
|
||||||
|
|
||||||
|
@ -2006,6 +2006,12 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg,
|
||||||
qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid,
|
qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid,
|
||||||
int8_t level) {
|
int8_t level) {
|
||||||
SArray *pResult = NULL;
|
SArray *pResult = NULL;
|
||||||
|
|
||||||
|
if (!taskInfo) {
|
||||||
|
tsdbDebug("vgId:%d no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, REPO_ID(pTsdb), level, suid);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), level, taskInfo,
|
tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), level, taskInfo,
|
||||||
suid);
|
suid);
|
||||||
|
|
||||||
|
@ -2071,10 +2077,18 @@ static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType
|
||||||
tsdbDebug("vgId:%d no rsma info for suid:%" PRIu64, REPO_ID(pTsdb), suid);
|
tsdbDebug("vgId:%d no rsma info for suid:%" PRIu64, REPO_ID(pTsdb), suid);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
if (!pRSmaInfo->taskInfo[0]) {
|
||||||
|
tsdbDebug("vgId:%d no rsma qTaskInfo for suid:%" PRIu64, REPO_ID(pTsdb), suid);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
|
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
|
||||||
// TODO: use the proper schema instead of 0, and cache STSchema in cache
|
// TODO: use the proper schema instead of 0, and cache STSchema in cache
|
||||||
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 0);
|
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 0);
|
||||||
|
if (!pTSchema) {
|
||||||
|
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[0], pTSchema, suid, uid, TSDB_RETENTION_L1);
|
tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[0], pTSchema, suid, uid, TSDB_RETENTION_L1);
|
||||||
tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[1], pTSchema, suid, uid, TSDB_RETENTION_L2);
|
tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[1], pTSchema, suid, uid, TSDB_RETENTION_L2);
|
||||||
taosMemoryFree(pTSchema);
|
taosMemoryFree(pTSchema);
|
||||||
|
|
|
@ -25,7 +25,7 @@ const SVnodeCfg vnodeCfgDefault = {
|
||||||
.isHeap = false,
|
.isHeap = false,
|
||||||
.isWeak = 0,
|
.isWeak = 0,
|
||||||
.tsdbCfg = {.precision = TSDB_TIME_PRECISION_MILLI,
|
.tsdbCfg = {.precision = TSDB_TIME_PRECISION_MILLI,
|
||||||
.update = 0,
|
.update = 1,
|
||||||
.compression = 2,
|
.compression = 2,
|
||||||
.slLevel = 5,
|
.slLevel = 5,
|
||||||
.days = 10,
|
.days = 10,
|
||||||
|
|
Loading…
Reference in New Issue