From 8e56d9a359a5e79bbbc5a45115ebbe61870f3162 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Mon, 19 Feb 2024 16:04:55 +0800 Subject: [PATCH] fix tsmas --- include/common/tmsg.h | 2 +- include/util/taoserror.h | 2 + source/dnode/mnode/impl/src/mndSma.c | 4 +- source/dnode/vnode/src/tq/tqUtil.c | 15 +++- source/libs/catalog/src/ctgAsync.c | 19 ++++- source/libs/catalog/src/ctgCache.c | 22 +++-- source/libs/catalog/src/ctgUtil.c | 20 ++++- source/libs/function/src/functionMgt.c | 5 +- source/libs/parser/src/parTranslater.c | 52 ++++++++---- source/libs/planner/src/planOptimizer.c | 40 +++------ source/util/src/terror.c | 2 + tests/system-test/2-query/tsma.py | 106 ++++++++++++++++++++++-- 12 files changed, 215 insertions(+), 74 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 70d16d9e1c..b0f227c950 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -4226,7 +4226,7 @@ typedef struct { int64_t streamUid; int64_t reqTs; int64_t rspTs; - int64_t delayDuration; + int64_t delayDuration; // ms bool fillHistoryFinished; } STableTSMAInfo; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index d5a6933a36..8330974344 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -823,6 +823,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3103) #define TSDB_CODE_TSMA_INVALID_PTR TAOS_DEF_ERROR_CODE(0, 0x3104) #define TSDB_CODE_TSMA_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x3105) +#define TSDB_CODE_TSMA_INVALID_TB TAOS_DEF_ERROR_CODE(0, 0x3106) +#define TSDB_CODE_TSMA_INVALID_INTERVAL TAOS_DEF_ERROR_CODE(0, 0x3107) //rsma #define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150) diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 51d2b8a607..de51c62de9 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -1449,7 +1449,7 @@ static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) { pCxt->pCreateStreamReq->fillNullCols = NULL; pCxt->pCreateStreamReq->igUpdate = 0; // TODO what's this tiemstamp? - pCxt->pCreateStreamReq->lastTs = 1704442278000; + pCxt->pCreateStreamReq->lastTs = 1755442278000; pCxt->pCreateStreamReq->ast = strdup(pCxt->pCreateSmaReq->ast); pCxt->pCreateStreamReq->sql = strdup(pCxt->pCreateSmaReq->sql); @@ -2163,7 +2163,7 @@ static int32_t mkNonExistTSMAInfo(const STSMAVersion *pTsmaVer, STableTSMAInfo * tstrncpy(pInfo->tb, pTsmaVer->tbName, TSDB_TABLE_NAME_LEN); tstrncpy(pInfo->name, pTsmaVer->name, TSDB_TABLE_NAME_LEN); pInfo->dbId = pTsmaVer->dbId; - pInfo->ast = "dummy";// TODO could be freed + pInfo->ast = taosMemoryCalloc(1, 1); *ppTsma = pInfo; return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 3e6a3d34fa..8099d6a6f2 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -531,6 +531,11 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b *fhFinished = !HAS_RELATED_FILLHISTORY_TASK(pTask); int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); + if (ver == -1) { + ver = pTask->chkInfo.processedVer; + } else { + ver--; + } SVersionRange verRange = {0}; walReaderValidVersionRange(pTask->exec.pWalReader, &verRange.minVer, &verRange.maxVer); @@ -549,9 +554,13 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b cur = pReader->pHead->head.ingestTs; } - code = walFetchHead(pReader, verRange.maxVer); - if (code == TSDB_CODE_SUCCESS) { - latest = pReader->pHead->head.ingestTs; + if (ver == verRange.maxVer) { + latest = cur; + } else { + code = walFetchHead(pReader, verRange.maxVer); + if (code == TSDB_CODE_SUCCESS) { + latest = pReader->pHead->head.ingestTs; + } } if (pDelay != NULL) { // delay in ms diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 110f5ce308..e6f5826a41 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -783,6 +783,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const } if (tbTsmaNum > 0) { + // TODO when create recursive tsma, avoid get tb tsma task CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_TSMA, pReq->pTableTSMAs, NULL)); } if (tsmaNum > 0) { @@ -2705,6 +2706,10 @@ int32_t ctgLaunchGetTSMATask(SCtgTask* pTask) { SCtgTaskReq tReq = {.pTask = pTask, .msgIdx = 0}; taosArrayPush(pCtx->pResList, &(SMetaRes){0}); CTG_ERR_RET(ctgGetTbTSMAFromMnode(pCtg, pConn, pTsmaName, NULL, &tReq, TDMT_MND_GET_TSMA)); + } else { + TSWAP(pTask->res, pCtx->pResList); + CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0)); + return TSDB_CODE_SUCCESS; } return 0; @@ -2785,6 +2790,7 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx); SArray* pTsmas = NULL; SMetaRes* pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx); + SHashObj* pVgHash = NULL; SCtgDBCache* pDbCache = NULL; STableTSMAInfo* pTsma = NULL; SRequestConnInfo* pConn = &pTask->pJob->conn; @@ -2849,8 +2855,9 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmas, tsmaIdx); if (pTsmaInfo->rspTs == 0) pTsmaInfo->fillHistoryFinished = true; pTsmaInfo->rspTs = taosGetTimestampMs(); - pTsmaInfo->delayDuration = MAX(pRsp->progressDelay, pTsmaInfo->delayDuration); + pTsmaInfo->delayDuration = TMAX(pRsp->progressDelay, pTsmaInfo->delayDuration); pTsmaInfo->fillHistoryFinished = pTsmaInfo->fillHistoryFinished && pRsp->fillHisFinished; + qDebug("received stream progress for tsma %s rsp history: %d vnode: %d", pTsmaInfo->name, pRsp->fillHisFinished, pRsp->subFetchIdx); if (atomic_add_fetch_32(&pFetch->finishedSubFetchNum, 1) == pFetch->subFetchNum) { // subfetch all finished @@ -2873,19 +2880,20 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf STableTSMAInfoRsp* pTsmas = pRes->pRes; int32_t subFetchIdx = 0; pFetch->vgNum = taosHashGetSize(pOut->dbVgroup->vgHash); + TSWAP(pOut->dbVgroup->vgHash, pVgHash); for (int32_t i = 0; i < taosArrayGetSize(pTsmas->pTsmas); ++i) { STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmas->pTsmas, i); - SVgroupInfo* pVgInfo = taosHashIterate(pOut->dbVgroup->vgHash, NULL); + SVgroupInfo* pVgInfo = taosHashIterate(pVgHash, NULL); while (pVgInfo) { // make StreamProgressReq, send it SStreamProgressReq req = {.fetchIdx = pFetch->fetchIdx, - .streamId = pTsma->streamUid, + .streamId = pTsmaInfo->streamUid, .subFetchIdx = subFetchIdx++, .vgId = pVgInfo->vgId}; CTG_ERR_JRET(ctgGetStreamProgressFromVnode(pCtg, pConn, pTbName, pVgInfo, NULL, tReq, &req)); pFetch->subFetchNum++; hasSubFetch = true; - pVgInfo = taosHashIterate(pOut->dbVgroup->vgHash, pVgInfo); + pVgInfo = taosHashIterate(pVgHash, pVgInfo); } } } break; @@ -2901,6 +2909,9 @@ _return: tFreeTableTSMAInfo(pTsma); pTsma = NULL; } + if (pVgHash) { + taosHashCleanup(pVgHash); + } if (code) { SMetaRes* pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx); pRes->code = code; diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index eb1ce8d074..6c06dc8001 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -3330,7 +3330,13 @@ int32_t ctgGetTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, SName* pTsmaNam CTG_RET(code); } - void * pIter = taosHashIterate(pDbCache->tsmaCache, NULL); + void *pIter = taosHashIterate(pDbCache->tsmaCache, NULL); + res.pRes = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp)); + if (!res.pRes) CTG_RET(TSDB_CODE_OUT_OF_MEMORY); + STableTSMAInfoRsp* pRsp = res.pRes; + pRsp->pTsmas = taosArrayInit(1, POINTER_BYTES); + if (!pRsp->pTsmas) CTG_RET(TSDB_CODE_OUT_OF_MEMORY); + while (pIter && !found) { SCtgTSMACache* pCtgCache = pIter; CTG_LOCK(CTG_READ, &pCtgCache->tsmaLock); @@ -3348,8 +3354,8 @@ int32_t ctgGetTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, SName* pTsmaNam pIter = taosHashIterate(pDbCache->tsmaCache, pIter); } taosHashCancelIterate(pDbCache->tsmaCache, pIter); - if (found) { - res.pRes = pTsmaOut; + if (found && code == TSDB_CODE_SUCCESS) { + taosArrayPush(pRsp->pTsmas, &pTsmaOut); taosArrayPush(pCtx->pResList, &res); } @@ -3510,6 +3516,10 @@ int32_t ctgWriteTbTSMAToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam for (int32_t i = 0; i < pCache->pTsmas->size; ++i) { STableTSMAInfo* pInfo = taosArrayGetP(pCache->pTsmas, i); if (pInfo->tsmaId == pTsmaCache->tsmaId) { + ctgDebug("tsma: %s removed from cache, history from %d to %d, reqTs from %" PRId64 " to %" PRId64 + "rspTs from %" PRId64 " to %" PRId64 " delay from %" PRId64 " to %" PRId64, + pInfo->name, pInfo->fillHistoryFinished, pTsmaCache->fillHistoryFinished, pInfo->reqTs, + pTsmaCache->reqTs, pInfo->rspTs, pTsmaCache->rspTs, pInfo->delayDuration, pTsmaCache->delayDuration); cacheSize = ctgGetTbTSMACacheSize(pInfo); taosArrayRemove(pCache->pTsmas, i); atomic_sub_fetch_64(&dbCache->dbCacheSize, cacheSize); @@ -3564,8 +3574,7 @@ int32_t ctgOpDropTbTSMA(SCtgCacheOperation *operation) { for (int32_t i = 0; i < pCtgCache->pTsmas->size; ++i) { pCache = taosArrayGetP(pCtgCache->pTsmas, i); cacheSize += ctgGetTbTSMACacheSize(pCache); - CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSearchCompare, - ctgTSMAVersionSearchCompare)); + ctgMetaRentRemove(&msg->pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSearchCompare, ctgTSMAVersionSearchCompare); CTG_DB_NUM_DEC(CTG_CI_TBL_TSMA); } taosArrayDestroyP(pCtgCache->pTsmas, tFreeAndClearTableTSMAInfo); @@ -3586,8 +3595,7 @@ int32_t ctgOpDropTbTSMA(SCtgCacheOperation *operation) { continue; } cacheSize = ctgGetTbTSMACacheSize(pCache); - CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSearchCompare, - ctgTSMAVersionSearchCompare)); + ctgMetaRentRemove(&msg->pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSearchCompare, ctgTSMAVersionSearchCompare); taosArrayRemove(pCtgCache->pTsmas, i); tFreeAndClearTableTSMAInfo(pCache); CTG_DB_NUM_DEC(CTG_CI_TBL_TSMA); diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index ee85880d38..87d0ca090f 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -636,6 +636,12 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) { } break; } + case TDMT_VND_GET_STREAM_PROGRESS: { + if (pCtx->out) { + taosMemoryFreeClear(pCtx->out); + } + break; + } default: qError("invalid reqType %d", pCtx->reqType); break; @@ -2406,7 +2412,19 @@ bool hasOutOfDateTSMACache(SArray* pTsmas) { bool isCtgTSMACacheOutOfDate(STSMACache* pTsmaCache) { int64_t now = taosGetTimestampMs(); - return !pTsmaCache->fillHistoryFinished || (30 * 1000 - pTsmaCache->delayDuration) < (now - pTsmaCache->reqTs); + bool ret = !pTsmaCache->fillHistoryFinished || (30 * 1000 - pTsmaCache->delayDuration) < (now - pTsmaCache->reqTs); + if (ret) { + qDebug("tsma %s.%s in cache has been out of date, history finished: %d, remain valid after: %" PRId64 + " passed: %" PRId64, + pTsmaCache->dbFName, pTsmaCache->name, pTsmaCache->fillHistoryFinished, + 30 * 1000 - pTsmaCache->delayDuration, now - pTsmaCache->reqTs); + } else { + qDebug("tsma %s.%s in cache has been out of date, history finished: %d, remain valid after: %" PRId64 + " passed: %" PRId64, + pTsmaCache->dbFName, pTsmaCache->name, pTsmaCache->fillHistoryFinished, + 30 * 1000 - pTsmaCache->delayDuration, now - pTsmaCache->reqTs); + } + return ret; } int32_t ctgAddTSMAFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fetchIdx, int32_t resIdx, diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 242ddd16dd..b14aada9d1 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -629,5 +629,8 @@ bool fmIsMyStateFunc(int32_t funcId, int32_t stateFuncId) { if (!pFunc->pStateFunc) { return false; } - return strcmp(pFunc->pStateFunc, pStateFunc->name) == 0; + if (strcmp(pFunc->pStateFunc, pStateFunc->name) == 0) return true; + int32_t stateMergeFuncId = fmGetFuncId(pFunc->pStateFunc); + const SBuiltinFuncDefinition* pStateMergeFunc = &funcMgtBuiltins[stateMergeFuncId]; + return strcmp(pStateFunc->name, pStateMergeFunc->pMergeFunc) == 0; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 4a8ab3786e..f4b60c38b7 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -10597,24 +10597,39 @@ static int32_t rewriteTSMAFuncs(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, return code; } -static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq) { +static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq, SName* useTbName) { SName name; tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, &name), pReq->name); memset(&name, 0, sizeof(SName)); - tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name), pReq->stb); + toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, useTbName); + tNameExtractFullName(useTbName, pReq->stb); pReq->igExists = pStmt->ignoreExists; pReq->interval = ((SValueNode*)pStmt->pOptions->pInterval)->datum.i; pReq->intervalUnit = ((SValueNode*)pStmt->pOptions->pInterval)->unit; +#define TSMA_MIN_INTERVAL_MS 1 // 1ms +#define TSMA_MAX_INTERVAL_MS (60 * 60 * 1000) // 1h + if (pReq->interval > TSMA_MAX_INTERVAL_MS || pReq->interval < TSMA_MIN_INTERVAL_MS) { + return TSDB_CODE_TSMA_INVALID_INTERVAL; + } + int32_t code = TSDB_CODE_SUCCESS; STableMeta* pTableMeta = NULL; - //TODO 在使用该tableName时, 如果确定其其实是tsma name, 那么避免将此作为tbname进行catalog 获取. - STableTSMAInfo *pRecursiveTsma = NULL; - int32_t numOfCols = 0, numOfTags = 0; - SSchema* pCols = NULL, *pTags = NULL; + // TODO 在使用该tableName时, 如果确定其其实是tsma name, 那么避免将此作为tbname进行catalog 获取. + STableTSMAInfo* pRecursiveTsma = NULL; + int32_t numOfCols = 0, numOfTags = 0; + SSchema * pCols = NULL, *pTags = NULL; if (pStmt->pOptions->recursiveTsma) { - code = getTsma(pCxt, &name, &pRecursiveTsma); + // useTbName is base tsma name + code = getTsma(pCxt, useTbName, &pRecursiveTsma); + if (code == TSDB_CODE_SUCCESS) { + SValueNode* pInterval = (SValueNode*)pStmt->pOptions->pInterval; + if (pRecursiveTsma->interval < pInterval->datum.i && pInterval->datum.i % pRecursiveTsma->interval == 0) { + } else { + code = TSDB_CODE_TSMA_INVALID_PARA; + } + } if (code == TSDB_CODE_SUCCESS) { SNode* pNode; if (TSDB_CODE_SUCCESS != nodesStringToNode(pRecursiveTsma->ast, &pNode)) { @@ -10627,13 +10642,13 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm nodesListMakeStrictAppend(&pStmt->pOptions->pFuncs, nodesCloneNode(pNode)); } nodesDestroyNode((SNode*)pSelect); + memset(useTbName, 0, sizeof(SName)); + tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pRecursiveTsma->tb, useTbName), pReq->stb); + numOfCols = pRecursiveTsma->pUsedCols->size; // TODO merge pUsedCols and pTags with one SSchema array + numOfTags = pRecursiveTsma->pTags->size; + pCols = pRecursiveTsma->pUsedCols->pData; + pTags = pRecursiveTsma->pTags->pData; } - memset(&name, 0, sizeof(SName)); - tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pRecursiveTsma->targetTb, &name), pReq->stb); - numOfCols = pRecursiveTsma->pUsedCols->size; // TODO merge pUsedCols and pTags with one SSchema array - numOfTags = pRecursiveTsma->pTags->size; - pCols = pRecursiveTsma->pUsedCols->pData; - pTags = pRecursiveTsma->pTags->pData; } else { code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta); if (TSDB_CODE_SUCCESS == code) { @@ -10643,6 +10658,8 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm pTags = pTableMeta->schema + numOfCols; if (pTableMeta->tableType == TSDB_NORMAL_TABLE) { pReq->normSourceTbUid = pTableMeta->uid; + } else if (pTableMeta->tableType == TSDB_CHILD_TABLE) { + code = TSDB_CODE_TSMA_INVALID_TB; } } } @@ -10682,13 +10699,12 @@ static int32_t translateCreateTSMA(STranslateContext* pCxt, SCreateTSMAStmt* pSt int32_t code = doTranslateValue(pCxt, (SValueNode*)pStmt->pOptions->pInterval); SMCreateSmaReq smaReq = {0}; + SName useTbName = {0}; if (code == TSDB_CODE_SUCCESS) { - code = buildCreateTSMAReq(pCxt, pStmt, &smaReq); + code = buildCreateTSMAReq(pCxt, pStmt, &smaReq, &useTbName); } if ( TSDB_CODE_SUCCESS == code) { - SName name; - toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, &name); - code = collectUseTable(&name, pCxt->pTargetTables); + code = collectUseTable(&useTbName, pCxt->pTargetTables); } if (TSDB_CODE_SUCCESS == code) { // TODO replace with tsma serialization func @@ -10897,7 +10913,7 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { case QUERY_NODE_SHOW_CREATE_TSMA_STMT: break; case QUERY_NODE_DROP_TSMA_STMT: - code =translateDropTSMA(pCxt, (SDropTSMAStmt*)pNode); + code = translateDropTSMA(pCxt, (SDropTSMAStmt*)pNode); break; default: break; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 657b3038ab..068ec94268 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -5886,18 +5886,7 @@ static bool tsmaOptCheckValidInterval(int64_t tsmaInterval, int8_t tsmaIntevalUn static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQueryFuncs, SArray* pTsmaScanCols) { SNode* pNode; - int32_t tsmaColNum = 1; bool failed = false, found = false; - int32_t firstFuncId = ((STableTSMAFuncInfo*)taosArrayGet(pTsmaFuncs, 0))->funcId; - // find col num - for (int32_t i = 1; i < pTsmaFuncs->size; ++i) { - STableTSMAFuncInfo* pTsmaFunc = taosArrayGet(pTsmaFuncs, i); - if (firstFuncId == pTsmaFunc->funcId) { - tsmaColNum++; - } else { - break; - } - } taosArrayClear(pTsmaScanCols); FOREACH(pNode, pQueryFuncs) { @@ -5911,30 +5900,23 @@ static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQ } int32_t queryColId = ((SColumnNode*)pQueryFunc->pParameterList->pHead->pNode)->colId; found = false; + int32_t notMyStateFuncId = 0; // iterate funcs - // TODO if func is count, skip checking cols - for (int32_t i = 0; i < pTsmaFuncs->size; i += tsmaColNum) { + // TODO if func is count, skip checking cols, test count(*) + for (int32_t i = 0; i < pTsmaFuncs->size; i++) { STableTSMAFuncInfo* pTsmaFuncInfo = taosArrayGet(pTsmaFuncs, i); + if (pTsmaFuncInfo->funcId == notMyStateFuncId) continue; + if (!fmIsMyStateFunc(pQueryFunc->funcId, pTsmaFuncInfo->funcId)) { + notMyStateFuncId = pTsmaFuncInfo->funcId; continue; } - // iterate cols within a func - for (int32_t j = i; j < tsmaColNum + i; ++j) { - if (j > i) { - pTsmaFuncInfo = taosArrayGet(pTsmaFuncs, j); - } - if (queryColId < pTsmaFuncInfo->colId) { - failed = true; - break; - } - if (queryColId > pTsmaFuncInfo->colId) { - continue; - } - found = true; - taosArrayPush(pTsmaScanCols, &j); - break; + if (queryColId != pTsmaFuncInfo->colId) { + continue; } + found = true; + taosArrayPush(pTsmaScanCols, &i); break; } if (failed || !found) { @@ -5955,6 +5937,7 @@ static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) { } STableTSMAInfo* pTsma = taosArrayGetP(pTsmaOptCtx->pTsmas, i); + if (!pTsma->fillHistoryFinished || 30 * 1000 < (pTsma->rspTs - pTsma->reqTs) + pTsma->delayDuration) continue; // filter with interval // TODO unit not right if (!tsmaOptCheckValidInterval(pTsma->interval, pTsma->unit, pTsmaOptCtx)) { @@ -6576,6 +6559,7 @@ static int32_t tsmaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan } } clearTSMAOptCtx(&tsmaOptCtx); + // TODO if any error occured, we should eat the error, skip the optimization, query with original table return code; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 95b1f40ab4..16238d335a 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -686,6 +686,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_ENV, "Invalid tsma env") TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_STAT, "Invalid tsma state") TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_PTR, "Invalid tsma pointer") TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_PARA, "Invalid tsma parameters") +TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_TB, "Invalid table to create tsma, only stable or normal table allowed") +TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_INTERVAL, "Invalid tsma interval, 1ms ~ 1h is allowed") //rsma TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env") diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index a44d728c38..d33b04b3cb 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -1,5 +1,6 @@ from os import name from random import randrange +from socket import TIPC_ADDR_NAMESEQ import taos import time import threading @@ -190,7 +191,7 @@ class TSMATester: tdLog.exit("comparing tsma res for: %s got differnt rows of result: without tsma: %d, with tsma: %d" % (sql, len(no_tsma_res), len(tsma_res))) for row_no_tsma, row_tsma in zip(no_tsma_res, tsma_res): if row_no_tsma != row_tsma: - tdLog.exit("comparing tsma res for: %s got different row data: no tsma row: %s, tsma row: %s" % (sql, str(row_no_tsma), str(row_tsma))) + tdLog.exit("comparing tsma res for: %s got different row data: no tsma row: %s, tsma row: %s \nno tsma res: %s \n tsma res: %s" % (sql, str(row_no_tsma), str(row_tsma), str(no_tsma_res), str(tsma_res))) tdLog.info('result check succeed for sql: %s. \n tsma-res: %s. \nno_tsma-res: %s' % (sql, str(tsma_res), str(no_tsma_res))) def check_sql(self, sql: str, expect: TSMAQueryContext): @@ -210,7 +211,7 @@ class TSMATestSQLGenerator: def __init__(self, opts: TSMATesterSQLGeneratorOptions): self.db_name_: str = '' self.tb_name_: str = '' - self.ts_scan_range_: List[float] = [UsedTsma.TS_MIN, UsedTsma.TS_MAX] + self.ts_scan_range_: List[float] = [float(UsedTsma.TS_MIN), float(UsedTsma.TS_MAX)] self.agg_funcs_: List[str] = [] self.tsmas_: List[TSMA] = [] ## currently created tsmas self.opts_: TSMATesterSQLGeneratorOptions = opts @@ -407,17 +408,32 @@ class TDTestCase: rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],\ startTs=paraDict["startTs"],tsStep=paraDict["tsStep"]) self.init_normal_tb(tdSql, paraDict['dbName'], 'norm_tb', paraDict['rowsPerTbl'], paraDict['startTs'], paraDict['tsStep']) - + + def wait_for_tsma_calculation(self, func_list: list, db: str, tb: str, interval: str, tsma_name: str): + while True: + sql = 'select %s from %s.%s interval(%s)' % (', '.join(func_list), db, tb, interval) + tdLog.debug('waiting for tsma %s to be useful with sql %s' % (tsma_name, sql)) + ctx: TSMAQueryContext = self.tsma_tester.get_tsma_query_ctx(sql) + if ctx.has_tsma(): + if ctx.used_tsmas[0].name == tsma_name + UsedTsma.TSMA_RES_STB_POSTFIX: + break + else: + time.sleep(1) + else: + time.sleep(1) + def create_tsma(self, tsma_name: str, db: str, tb: str, func_list: list, interval: str): tdSql.execute('use %s' % db) sql = "CREATE TSMA %s ON %s.%s FUNCTION(%s) INTERVAL(%s)" % (tsma_name, db, tb, ','.join(func_list), interval) tdSql.execute(sql, queryTimes=1) - - def create_recursive_tsma(self, base_tsma_name: str, new_tsma_name: str, db: str, interval: str): + self.wait_for_tsma_calculation(func_list, db, tb, interval, tsma_name) + + def create_recursive_tsma(self, base_tsma_name: str, new_tsma_name: str, db: str, interval: str, tb_name: str): tdSql.execute('use %s' % db, queryTimes=1) sql = 'CREATE RECURSIVE TSMA %s ON %s.%s INTERVAL(%s)' % (new_tsma_name, db, base_tsma_name, interval) tdSql.execute(sql, queryTimes=1) + self.wait_for_tsma_calculation(['avg(c1)'], db, tb_name, interval, new_tsma_name) def drop_tsma(self, tsma_name: str, db: str): sql = 'DROP TSMA %s.%s' % (db, tsma_name) @@ -438,17 +454,20 @@ class TDTestCase: self.tsma_tester.check_sql(ctx.sql, ctx) def test_query_with_tsma(self): - self.init_data() self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m') self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '30m') - self.create_recursive_tsma('tsma1', 'tsma3', 'test', '20m') - self.create_recursive_tsma('tsma2', 'tsma4', 'test', '1h') + #self.create_recursive_tsma('tsma1', 'tsma3', 'test', '20m', 'meters') + #self.create_recursive_tsma('tsma2', 'tsma4', 'test', '1h', 'meters') self.create_tsma('tsma5', 'test', 'norm_tb', ['avg(c1)', 'avg(c2)'], '10m') - ## why need 5s, calculation not finished yet. + ## why need 10s, filling history not finished yet + #ctx = TSMAQCBuilder().with_sql('select avg(c1) from meters').should_query_with_table('meters', UsedTsma.TS_MIN, UsedTsma.TS_MAX).get_qc() + #self.tsma_tester.check_sql(ctx.sql, ctx) time.sleep(5) #time.sleep(9999999) self.test_query_with_tsma_interval() self.test_query_with_tsma_agg() + ## self.test_query_with_drop_tsma() + ## self.test_query_with_add_tag() def test_query_with_tsma_interval(self): self.check(self.test_query_with_tsma_interval_no_partition) @@ -550,8 +569,77 @@ class TDTestCase: return [] def run(self): + self.init_data() + #time.sleep(999999) + self.test_create_tsma() + #self.test_drop_tsma() + self.test_tb_ddl_with_created_tsma() self.test_query_with_tsma() #time.sleep(999999) + + def test_create_tsma(self): + self.test_create_tsma_on_stable() + self.test_create_tsma_on_norm_table() + self.test_create_tsma_on_child_table() + self.test_create_recursive_tsma() + ## self.test_drop_stable() + ## self.test_drop_ctable() + ## self.test_drop_db() + + def test_tb_ddl_with_created_tsma(self): + tdSql.execute('create database nsdb precision "ns"', queryTimes=1) + tdSql.execute('use nsdb', queryTimes=1) + tdSql.execute('create table meters(ts timestamp, c1 int, c2 int) tags(t1 int, t2 int)', queryTimes=1) + self.create_tsma('tsma1', 'nsdb', 'meters', ['avg(c1)', 'avg(c2)'], '5m') + ## drop column, drop tag + tdSql.error('alter table meters drop column c1', -2147482637) + tdSql.error('alter table meters drop tag t1', -2147482637) + tdSql.error('alter table meters drop tag t2', -2147482637) # Stream must be dropped first + tdSql.execute('drop tsma tsma1', queryTimes=1) + + ## add tag + tdSql.execute('alter table meters add tag t3 int', queryTimes=1) + tdSql.execute('alter table meters drop tag t3', queryTimes=1) + tdSql.execute('drop database nsdb') + + ## test_drop stream + + def test_create_tsma_on_stable(self): + tdSql.execute('create database nsdb precision "ns"', queryTimes=1) + tdSql.execute('use nsdb', queryTimes=1) + tdSql.execute('create table meters(ts timestamp, c1 int, c2 int) tags(t1 int, t2 int)', queryTimes=1) + self.create_tsma('tsma1', 'nsdb', 'meters', ['avg(c1)', 'avg(c2)'], '5m') + tdSql.error('create tsma tsma2 on meters function(avg(c1), avg(c2)) interval(2h)', -2147471097) ## Invalid tsma interval, 1ms ~ 1h is allowed + tdSql.error('create tsma tsma2 on meters function(avg(c1), avg(c2)) interval(3601s)', -2147471097) + tdSql.error('create tsma tsma2 on meters function(avg(c1), avg(c2)) interval(3600001a)', -2147471097) + tdSql.error('create tsma tsma2 on meters function(avg(c1), avg(c2)) interval(3600001000u)', -2147471097) + tdSql.error('create tsma tsma2 on meters function(avg(c1), avg(c2)) interval(999999b)', -2147471097) + tdSql.error('create tsma tsma2 on meters function(avg(c1), avg(c2)) interval(999u)', -2147471097) + + tdSql.execute('drop tsma tsma1') + + tdSql.error('create tsma tsma1 on test.meters function(avg(c1), avg(c2)) interval(2h)', -2147471097) + tdSql.execute('drop database nsdb') + + def test_create_tsma_on_norm_table(self): + pass + + def test_create_tsma_on_child_table(self): + tdSql.error('create tsma tsma1 on test.t1 function(avg(c1), avg(c2)) interval(1m)', -2147471098) ## Invalid table to create tsma, only stable or normal table allowed + + def test_create_recursive_tsma(self): + tdSql.execute('use test') + self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m') + sql = 'create recursive tsma tsma2 on tsma1 interval(1m)' + tdSql.error(sql, -2147471099) ## invalid tsma parameter + sql = 'create recursive tsma tsma2 on tsma1 interval(7m)' + tdSql.error(sql, -2147471099) ## invalid tsma parameter + sql = 'create recursive tsma tsma2 on tsma1 interval(11m)' + tdSql.error(sql, -2147471099) ## invalid tsma parameter + self.create_recursive_tsma('tsma1', 'tsma2', 'test', '20m', 'meters') + + tdSql.execute('drop tsma tsma2', queryTimes=1) + tdSql.execute('drop tsma tsma1', queryTimes=1) def stop(self): tdSql.close()