support maxTsmaCalcDelay, maxTsmaNum, skip_tsma

This commit is contained in:
wangjiaming0909 2024-03-19 08:41:37 +00:00
parent 3566cac206
commit 7de0a34ae6
10 changed files with 47 additions and 14 deletions

View File

@ -202,6 +202,7 @@ extern char tsSmlTsDefaultName[];
extern int32_t tmqMaxTopicNum;
extern int32_t tmqRowSize;
extern int32_t tsMaxTsmaNum;
extern int32_t tsMaxTsmaCalcDelay;
// wal
extern int64_t tsWalFsyncDataSizeLimit;

View File

@ -399,6 +399,7 @@
#define TK_PARA_TABLES_SORT 610
#define TK_SMALLDATA_TS_SORT 611
#define TK_HASH_JOIN 612
#define TK_SKIP_TSMA 613
#define TK_NK_NIL 65535

View File

@ -142,6 +142,7 @@ typedef enum EHintOption {
HINT_PARA_TABLES_SORT,
HINT_SMALLDATA_TS_SORT,
HINT_HASH_JOIN,
HINT_SKIP_TSMA,
} EHintOption;
typedef struct SHintNode {

View File

@ -1134,6 +1134,7 @@ int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool
if (pIter == NULL) break;
if (pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName)) {
sdbRelease(pSdb, pSma);
continue;
}
@ -1149,6 +1150,7 @@ int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool
if (pVg == NULL) {
code = -1;
sdbRelease(pSdb, pSma);
sdbCancelFetch(pSdb, pIter);
return code;
}
info.epSet = mndGetVgroupEpset(pMnode, pVg);
@ -1158,6 +1160,7 @@ int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
sdbRelease(pSdb, pSma);
sdbCancelFetch(pSdb, pIter);
return code;
}
@ -1169,6 +1172,7 @@ int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool
code = -1;
taosMemoryFree(info.expr);
sdbRelease(pSdb, pSma);
sdbCancelFetch(pSdb, pIter);
return code;
}
@ -2137,6 +2141,7 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp
if (pIter == NULL) break;
if (pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName)) {
sdbRelease(pSdb, pSma);
continue;
}
@ -2164,6 +2169,7 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp
terrno = TSDB_CODE_OUT_OF_MEMORY;
mndReleaseStb(pMnode, pStb);
sdbRelease(pSdb, pSma);
sdbCancelFetch(pSdb, pIter);
return code;
}
pTsma->streamUid = streamId;
@ -2177,11 +2183,13 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp
if (pBaseTsma) mndReleaseSma(pMnode, pBaseTsma);
if (terrno) {
tFreeTableTSMAInfo(pTsma);
sdbCancelFetch(pSdb, pIter);
return code;
}
if (NULL == taosArrayPush(rsp->pTsmas, &pTsma)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tFreeTableTSMAInfo(pTsma);
sdbCancelFetch(pSdb, pIter);
return code;
}
*exist = true;
@ -2296,6 +2304,7 @@ int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t
if (pSma->uid != pTsmaVer->tsmaId) {
mDebug("tsma: %s.%" PRIx64 " tsmaId mismatch with current %" PRIx64, tsmaFName, pTsmaVer->tsmaId, pSma->uid);
terrno = mkNonExistTSMAInfo(pTsmaVer, &pTsmaInfo);
mndReleaseSma(pMnode, pSma);
if (terrno) goto _OVER;
taosArrayPush(hbRsp.pTsmas, &pTsmaInfo);
continue;

View File

@ -511,6 +511,14 @@ int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type) {
return pSdb->tableVer[type];
}
int32_t sdbGetValidSize(SSdb* pSdb, ESdbType type) {
bool countValid(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
int32_t* pInt = p1;
(*pInt) += 1;
return true;
}
int32_t sdbGetValidSize(SSdb* pSdb, ESdbType type) {
int32_t num = 0;
sdbTraverse(pSdb, type, countValid, &num, 0, 0);
return num;
}

View File

@ -2413,18 +2413,13 @@ bool hasOutOfDateTSMACache(SArray* pTsmas) {
bool isCtgTSMACacheOutOfDate(STSMACache* pTsmaCache) {
int64_t now = taosGetTimestampMs();
bool ret = !pTsmaCache->fillHistoryFinished || (30 * 1000 - pTsmaCache->delayDuration) < (now - pTsmaCache->reqTs);
bool ret = !pTsmaCache->fillHistoryFinished ||
(tsMaxTsmaCalcDelay * 1000 - pTsmaCache->delayDuration) < (now - pTsmaCache->reqTs);
if (ret) {
qDebug("tsma %s.%s in cache has been out of date, history finished: %d, remain valid after: %" PRId64
" passed: %" PRId64,
pTsmaCache->dbFName, pTsmaCache->name, pTsmaCache->fillHistoryFinished,
30 * 1000 - pTsmaCache->delayDuration, now - pTsmaCache->reqTs);
} else {
// TODO tsma remove log
qDebug("tsma %s.%s in cache has been out of date, history finished: %d, remain valid after: %" PRId64
" passed: %" PRId64,
pTsmaCache->dbFName, pTsmaCache->name, pTsmaCache->fillHistoryFinished,
30 * 1000 - pTsmaCache->delayDuration, now - pTsmaCache->reqTs);
tsMaxTsmaCalcDelay * 1000 - pTsmaCache->delayDuration, now - pTsmaCache->reqTs);
}
return ret;
}

View File

@ -459,6 +459,7 @@ bool addHintNodeToList(SAstCreateContext* pCxt, SNodeList** ppHintList, EHintOpt
int32_t paramNum) {
void* value = NULL;
switch (opt) {
case HINT_SKIP_TSMA:
case HINT_BATCH_SCAN:
case HINT_NO_BATCH_SCAN: {
if (paramNum > 0) {
@ -584,6 +585,14 @@ SNodeList* createHintNodeList(SAstCreateContext* pCxt, const SToken* pLiteral) {
}
opt = HINT_HASH_JOIN;
break;
case TK_SKIP_TSMA:
lastComma = false;
if (0 != opt || inParamList) {
quit = true;
break;
}
opt = HINT_SKIP_TSMA;
break;
case TK_NK_LP:
lastComma = false;
if (0 == opt || inParamList) {

View File

@ -223,6 +223,7 @@ static SKeyword keywordTable[] = {
{"SET", TK_SET},
{"SHOW", TK_SHOW},
{"SINGLE_STABLE", TK_SINGLE_STABLE},
{"SKIP_TSMA", TK_SKIP_TSMA},
{"SLIDING", TK_SLIDING},
{"SLIMIT", TK_SLIMIT},
{"SMA", TK_SMA},

View File

@ -5938,7 +5938,7 @@ static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) {
}
STableTSMAInfo* pTsma = taosArrayGetP(pTsmaOptCtx->pTsmas, i);
if (!pTsma->fillHistoryFinished || 30 * 1000 < (pTsma->rspTs - pTsma->reqTs) + pTsma->delayDuration) {
if (!pTsma->fillHistoryFinished || tsMaxTsmaCalcDelay * 1000 < (pTsma->rspTs - pTsma->reqTs) + pTsma->delayDuration) {
qInfo("tsma %s filtered out history: %d rspTs: %ld reqTs: %ld delay: %ld, rspTs - reqTs: %ld", pTsma->name,
pTsma->fillHistoryFinished, pTsma->rspTs, pTsma->reqTs, pTsma->delayDuration, pTsma->rspTs - pTsma->reqTs);
continue;
@ -6601,6 +6601,9 @@ static int32_t tsmaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan
SScanLogicNode* pScan = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, tsmaOptMayBeOptimized);
if (!pScan) return code;
SLogicNode* pRootNode = getLogicNodeRootNode((SLogicNode*)pScan);
if (getOptHint(pRootNode->pHint, HINT_SKIP_TSMA)) return code;
code = fillTSMAOptCtx(&tsmaOptCtx, pScan);
if (code == TSDB_CODE_SUCCESS) {
// 1. extract useful tsmas

View File

@ -757,9 +757,15 @@ class TDTestCase:
# self.test_query_with_drop_tsma()
# self.test_query_with_add_tag()
# self.test_union()
self.test_query_sub_table()
self.test_query_child_table()
self.test_skip_tsma_hint()
def test_query_sub_table(self):
def test_skip_tsma_hint(self):
ctxs = []
sql = 'select /*+ skip_tsma()*/avg(c1), avg(c2) from meters interval(5m)'
ctxs.append(TSMAQCBuilder().with_sql(sql).should_query_with_table('meters').get_qc())
def test_query_child_table(self):
sql = 'select avg(c1) from t1'
ctx = TSMAQCBuilder().with_sql(sql).should_query_with_tsma(
'e8945e7385834f8c22705546d4016539_t1', UsedTsma.TS_MIN, UsedTsma.TS_MAX, child_tb=True).get_qc()
@ -1143,7 +1149,6 @@ class TDTestCase:
tdSql.execute('drop tsma tsma1', queryTimes=1)
tdSql.execute('use test', queryTimes=1)
time.sleep(999999)
tdSql.execute(
'create tsma tsma1 on nsdb.meters function(avg(c1), avg(c2)) interval(10m)', queryTimes=1)
self.wait_for_tsma_calculation(