diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5a6fd90e37..a647740aa1 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -4211,6 +4211,7 @@ typedef struct { char tb[TSDB_TABLE_NAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN]; uint64_t suid; + uint64_t destTbUid; uint64_t dbId; int32_t version; int64_t interval; @@ -4224,6 +4225,7 @@ typedef struct { int32_t tSerializeTableTSMAInfoRsp(void* buf, int32_t bufLen, const STableTSMAInfoRsp* pRsp); int32_t tDeserializeTableTSMAInfoRsp(void* buf, int32_t bufLen, STableTSMAInfoRsp* pReq); +int32_t tCloneTbTSMAInfo(STableTSMAInfo* pInfo, STableTSMAInfo** pRes); void tFreeTableTSMAInfo(void* p); void tFreeAndClearTableTSMAInfo(void* p); void tFreeTableTSMAInfoRsp(STableTSMAInfoRsp* pRsp); diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 7d90484fbf..8520a2add9 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -273,9 +273,10 @@ bool fmIsInvertible(int32_t funcId); char* fmGetFuncName(int32_t funcId); -bool fmIsTSMASupportedFunc(func_id_t funcid); -int32_t rewriteFuncsForTSMA(SNodeList* pFuncs); -int32_t getFuncId(const char* name); +bool fmIsTSMASupportedFunc(func_id_t funcId); +int32_t fmCreateStateFuncs(SNodeList* pFuncs); +int32_t fmGetFuncId(const char* name); +bool fmIsMyStateFunc(int32_t funcId, int32_t stateFuncId); #ifdef __cplusplus } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 3d3a216588..915e001785 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -9970,6 +9970,7 @@ static int32_t tEncodeTableTSMAInfo(SEncoder* pEncoder, const STableTSMAInfo* pT if (tEncodeCStr(pEncoder, pTsmaInfo->tb) < 0) return -1; if (tEncodeCStr(pEncoder, pTsmaInfo->dbFName) < 0) return -1; if (tEncodeU64(pEncoder, pTsmaInfo->suid) < 0) return -1; + if (tEncodeU64(pEncoder, pTsmaInfo->destTbUid) < 0) return -1; if (tEncodeU64(pEncoder, pTsmaInfo->dbId) < 0) return -1; if (tEncodeI32(pEncoder, pTsmaInfo->version) < 0) return -1; if (tEncodeCStr(pEncoder, pTsmaInfo->targetTb) < 0) return -1; @@ -9993,6 +9994,7 @@ static int32_t tDecodeTableTSMAInfo(SDecoder* pDecoder, STableTSMAInfo* pTsmaInf if (tDecodeCStrTo(pDecoder, pTsmaInfo->tb) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTsmaInfo->dbFName) < 0) return -1; if (tDecodeU64(pDecoder, &pTsmaInfo->suid) < 0) return -1; + if (tDecodeU64(pDecoder, &pTsmaInfo->destTbUid) < 0) return -1; if (tDecodeU64(pDecoder, &pTsmaInfo->dbId) < 0) return -1; if (tDecodeI32(pDecoder, &pTsmaInfo->version) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTsmaInfo->targetTb) < 0) return -1; @@ -10081,6 +10083,22 @@ void tFreeAndClearTableTSMAInfo(void* p) { } } +int32_t tCloneTbTSMAInfo(STableTSMAInfo* pInfo, STableTSMAInfo** pRes) { + int32_t code = TSDB_CODE_SUCCESS; + if (NULL == pInfo) { + return TSDB_CODE_SUCCESS; + } + STableTSMAInfo* pRet = taosMemoryCalloc(1, sizeof(STableTSMAInfo)); + if (!pRet) return TSDB_CODE_OUT_OF_MEMORY; + + *pRet = *pInfo; + if (pInfo->pFuncs) { + pRet->pFuncs = taosArrayDup(pInfo->pFuncs, NULL); + } + *pRes = pRet; + return code; +} + void tFreeTableTSMAInfoRsp(STableTSMAInfoRsp *pRsp) { if (pRsp && pRsp->pTsmas) { taosArrayDestroyP(pRsp->pTsmas, tFreeAndClearTableTSMAInfo); diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index dab5410b81..40b1fdbdcb 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -1394,7 +1394,7 @@ static void initSMAObj(SCreateTSMACxt* pCxt) { pCxt->pSma->uid = mndGenerateUid(pCxt->pCreateSmaReq->name, TSDB_TABLE_FNAME_LEN); memcpy(pCxt->pSma->dstTbName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN); - pCxt->pSma->dstTbUid = mndGenerateUid(pCxt->pSma->dstTbName, TSDB_TABLE_FNAME_LEN); + pCxt->pSma->dstTbUid = 0; // not used pCxt->pSma->stbUid = pCxt->pSrcStb->uid; pCxt->pSma->dbUid = pCxt->pDb->uid; pCxt->pSma->interval = pCxt->pCreateSmaReq->interval; @@ -1561,8 +1561,8 @@ static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) { code = -1; goto _OVER; } else { - mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", - pCxt->pCreateSmaReq->name, sma.uid, sma.stbUid, sma.dstTbUid, sma.dstTbName, sma.dstVgId); + mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 " dstTb:%s dstVg:%d", pCxt->pCreateSmaReq->name, sma.uid, + sma.stbUid, sma.dstTbName, sma.dstVgId); code = 0; } @@ -1921,13 +1921,14 @@ static void mndCancelRetrieveTSMA(SMnode *pMnode, void *pIter) { taosMemoryFree(p); } -int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, STableTSMAInfo* pInfo) { +int32_t dumpTSMAInfoFromSmaObj(const SSmaObj* pSma, const SStbObj* pDestStb, STableTSMAInfo* pInfo) { int32_t code = 0; pInfo->interval = pSma->interval; pInfo->unit = pSma->intervalUnit; pInfo->tsmaId = pSma->uid; pInfo->version = pSma->version; pInfo->tsmaId = pSma->uid; + pInfo->destTbUid = pDestStb->uid; SName sName = {0}; tNameFromString(&sName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tstrncpy(pInfo->name, sName.tname, TSDB_TABLE_NAME_LEN); @@ -1987,13 +1988,21 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp continue; } + pStb = mndAcquireStb(pMnode, pSma->dstTbName); + if (!pStb) { + sdbRelease(pSdb, pSma); + continue; + } + STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo)); if (!pTsma) { terrno = TSDB_CODE_OUT_OF_MEMORY; + mndReleaseStb(pMnode, pStb); sdbRelease(pSdb, pSma); return code; } - terrno = dumpTSMAInfoFromSmaObj(pSma, pTsma); + terrno = dumpTSMAInfoFromSmaObj(pSma, pStb, pTsma); + mndReleaseStb(pMnode, pStb); if (terrno) { sdbRelease(pSdb, pSma); return code; @@ -2120,16 +2129,25 @@ int32_t mndValidateTSMAInfo(SMnode *pMnode, STSMAVersion *pTsmaVersions, int32_t continue; } + SStbObj* pDestStb = mndAcquireStb(pMnode, pSma->dstTbName); + if (!pDestStb) { + mInfo("tsma: %s.%" PRIx64 " dest stb: %s not found, maybe dropped", tsmaFName, pTsmaVer->tsmaId, pSma->dstTbName); + mndReleaseSma(pMnode, pSma); + continue; + } + // dump smaObj into rsp STableTSMAInfo * pInfo = NULL; pInfo = taosMemoryCalloc(1, sizeof(STableTSMAInfo)); - if (!pInfo || (terrno = dumpTSMAInfoFromSmaObj(pSma, pInfo))) { + if (!pInfo || (terrno = dumpTSMAInfoFromSmaObj(pSma, pDestStb, pInfo))) { mndReleaseSma(pMnode, pSma); + mndReleaseStb(pMnode, pDestStb); taosMemoryFreeClear(pInfo); goto _OVER; } taosArrayPush(hbRsp.pTsmas, pInfo); + mndReleaseStb(pMnode, pDestStb); mndReleaseSma(pMnode, pSma); } diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 7af89e181b..f8bc1e8c5c 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -1120,7 +1120,6 @@ int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* n int32_t ctgUpdateTbTSMAEnqueue(SCatalog* pCtg, STSMACache** pTsma, bool syncOp); int32_t ctgDropTSMAForTbEnqueue(SCatalog* pCtg, SName* pName, bool syncOp); int32_t ctgDropTbTSMAEnqueue(SCatalog* pCtg, const STSMACache* pTsma, bool syncOp); -int32_t ctgCloneTbTSMA(STSMACache* pTsmas, STSMACache** pRes); int32_t ctgOpDropTbTSMA(SCtgCacheOperation* operation); int32_t ctgOpUpdateTbTSMA(SCtgCacheOperation* operation); uint64_t ctgGetTbTSMACacheSize(STSMACache* pTsmaInfo); diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 44d3cbc61d..3e38de466f 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -2657,7 +2657,7 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf if (pOut->pTsmas && taosArrayGetSize(pOut->pTsmas) > 0) { for (int32_t i = 0; i < taosArrayGetSize(pOut->pTsmas); ++i) { STableTSMAInfo* pInfo = taosArrayGetP(pOut->pTsmas, i); - CTG_ERR_JRET(ctgCloneTbTSMA(pInfo, &pTsma)); + CTG_ERR_JRET(tCloneTbTSMAInfo(pInfo, &pTsma)); CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pTask->pJob->pCtg, &pTsma, false)); } } diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 6bc5f6cdbd..189a6ff3ee 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -3287,10 +3287,11 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } SMetaRes res = {0}; + // TODO if pCache->pTsmas is empty, maybe we should get tsmas from mnode for (int32_t i = 0; i < pCache->pTsmas->size; ++i) { STSMACache *pTsmaOut = NULL; STSMACache *pTsmaCache = taosArrayGetP(pCache->pTsmas, i); - code = ctgCloneTbTSMA(pTsmaCache, &pTsmaOut); + code = tCloneTbTSMAInfo(pTsmaCache, &pTsmaOut); if (code) { ctgReleaseTSMAToCache(pCtg, dbCache, pCache); tFreeTableTSMAInfoRsp(pRsp); diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 05518cae5c..f7f9a5032d 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -2382,21 +2382,6 @@ int32_t dupViewMetaFromRsp(SViewMetaRsp* pRsp, SViewMeta* pViewMeta) { } uint64_t ctgGetTbTSMACacheSize(STableTSMAInfo* pTsmaInfo) { + //TODO return 0; } - -int32_t ctgCloneTbTSMA(STSMACache* pInfo, STSMACache** pRes) { - int32_t code = TSDB_CODE_SUCCESS; - if (NULL == pInfo) { - return TSDB_CODE_SUCCESS; - } - STSMACache* pCache = taosMemoryCalloc(1, sizeof(STSMACache)); - if (!pCache) return TSDB_CODE_OUT_OF_MEMORY; - - *pCache = *pInfo; - if (pInfo->pFuncs) { - pCache->pFuncs = taosArrayDup(pInfo->pFuncs, NULL); - } - *pRes = pCache; - return code; -} diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index c25c64bfa7..c71f770ba0 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -538,11 +538,11 @@ static int32_t fmCreateStateFunc(const SFunctionNode* pFunc, SFunctionNode** pSt return TSDB_CODE_SUCCESS; } -bool fmIsTSMASupportedFunc(func_id_t funcid) { - return fmIsAggFunc(funcid) && !fmIsForbidStreamFunc(funcid); +bool fmIsTSMASupportedFunc(func_id_t funcId) { + return fmIsAggFunc(funcId) && !fmIsForbidStreamFunc(funcId); } -int32_t rewriteFuncsForTSMA(SNodeList* pFuncs) { +int32_t fmCreateStateFuncs(SNodeList* pFuncs) { int32_t code; SNode* pNode; char buf[128] = {0}; @@ -568,7 +568,7 @@ int32_t rewriteFuncsForTSMA(SNodeList* pFuncs) { return code; } -int32_t getFuncId(const char* name) { +int32_t fmGetFuncId(const char* name) { if (NULL != gFunMgtService.pFuncNameHashTable) { void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, name, strlen(name)); if (NULL != pVal) { @@ -583,3 +583,12 @@ int32_t getFuncId(const char* name) { } return -1; } + +bool fmIsMyStateFunc(int32_t funcId, int32_t stateFuncId) { + const SBuiltinFuncDefinition* pFunc = &funcMgtBuiltins[funcId]; + const SBuiltinFuncDefinition* pStateFunc = &funcMgtBuiltins[stateFuncId]; + if (!pFunc->pStateFunc) { + return false; + } + return strcmp(pFunc->pStateFunc, pStateFunc->name) == 0; +} diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index da9a817fa1..9e9183dee9 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -10448,7 +10448,7 @@ static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMC } if (code == TSDB_CODE_SUCCESS) - code = rewriteFuncsForTSMA(info.pFuncs); + code = fmCreateStateFuncs(info.pFuncs); if (code == TSDB_CODE_SUCCESS) { code = buildSampleAst(pCxt, &info, &pReq->ast, &pReq->astLen, &pReq->expr, &pReq->exprLen); @@ -10505,7 +10505,7 @@ translateTSMAFuncs(STranslateContext * pCxt, SCreateTSMAStmt* pStmt, STableMeta* SNode* pNode; FOREACH(pNode, pStmt->pOptions->pFuncs) { SFunctionNode* pFunc = (SFunctionNode*)pNode; - int32_t funcId = getFuncId(pFunc->functionName); + int32_t funcId = fmGetFuncId(pFunc->functionName); if (funcId < 0) { return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index afc5e54dc4..b30ebdc142 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -5802,6 +5802,7 @@ static bool tsmaOptMayBeOptimized(SLogicNode* pNode) { assert(pFuncs); FOREACH(pTmpNode, pFuncs) { SFunctionNode* pFunc = (SFunctionNode*)pTmpNode; + // TODO test other pseudo column funcs if (!fmIsTSMASupportedFunc(pFunc->funcId) && !fmIsPseudoColumnFunc(pFunc->funcId)) { return false; } @@ -5813,31 +5814,38 @@ static bool tsmaOptMayBeOptimized(SLogicNode* pNode) { } typedef struct STSMAOptUsefulTsma { - const STableTSMAInfo* pTsma; - STimeWindow timeRange; // scan time range for this tsma + const STableTSMAInfo* pTsma; // NULL if no tsma available, which will use original data for calculation + STimeWindow scanRange; // scan time range for this tsma + STimeWindow windowRange; // window range used for window filtering + SArray* pTsmaScanCols; // SArray index of tsmaFuncs array } STSMAOptUsefulTsma; typedef struct STSMAOptCtx { // input - const SLogicNode* pParent; // Agg or Interval - const SNodeList* pAggFuncs; - const STimeWindow* pTimeRange; - const SArray* pTsmas; - SInterval* queryInterval; + const SScanLogicNode* pScan; + const SLogicNode* pParent; // parent of Table Scan, Agg or Interval + const SNodeList* pAggFuncs; + const STimeWindow* pTimeRange; + const SArray* pTsmas; + SInterval* queryInterval; // not null with window logic node // output - SArray* usefulTsmas; // SArray, sorted by tsma interval from long to short + SArray* pUsefulTsmas; // SArray, sorted by tsma interval from long to short + SArray* pUsedTsmas; + SLogicSubplan* generatedSubPlans[2]; } STSMAOptCtx; static int32_t fillTSMAOptCtx (STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pScan) { int32_t code = 0; + pTsmaOptCtx->pScan = pScan; pTsmaOptCtx->pParent = pScan->node.pParent; pTsmaOptCtx->pTsmas = pScan->pTsmas; pTsmaOptCtx->pTimeRange = &pScan->scanRange; - pTsmaOptCtx->queryInterval = taosMemoryCalloc(1, sizeof(SInterval)); - if (!pTsmaOptCtx->queryInterval) return TSDB_CODE_OUT_OF_MEMORY; if (nodeType(pTsmaOptCtx->pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) { + pTsmaOptCtx->queryInterval = taosMemoryCalloc(1, sizeof(SInterval)); + if (!pTsmaOptCtx->queryInterval) return TSDB_CODE_OUT_OF_MEMORY; + SWindowLogicNode* pWindow = (SWindowLogicNode*)pTsmaOptCtx->pParent; pTsmaOptCtx->queryInterval->interval = pWindow->interval; pTsmaOptCtx->queryInterval->intervalUnit = pWindow->intervalUnit; @@ -5853,15 +5861,22 @@ static int32_t fillTSMAOptCtx (STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pScan) SAggLogicNode* pAgg = (SAggLogicNode*)pTsmaOptCtx->pParent; pTsmaOptCtx->pAggFuncs = pAgg->pAggFuncs; } - pTsmaOptCtx->usefulTsmas = taosArrayInit(pScan->pTsmas->size, sizeof(STSMAOptUsefulTsma)); - if (!pTsmaOptCtx->usefulTsmas) { + pTsmaOptCtx->pUsefulTsmas = taosArrayInit(pScan->pTsmas->size, sizeof(STSMAOptUsefulTsma)); + pTsmaOptCtx->pUsedTsmas = taosArrayInit(3, sizeof(STSMAOptUsefulTsma)); + if (!pTsmaOptCtx->pUsefulTsmas || !pTsmaOptCtx->pUsedTsmas) { code = TSDB_CODE_OUT_OF_MEMORY; } return code; } +static void tsmaOptFreeUsefulTsma(void* p) { + STSMAOptUsefulTsma* pTsma = p; + taosArrayDestroy(pTsma->pTsmaScanCols); +} + static void clearTSMAOptCtx(STSMAOptCtx* pTsmaOptCtx) { - taosArrayDestroy(pTsmaOptCtx->usefulTsmas); + taosArrayDestroyEx(pTsmaOptCtx->pUsefulTsmas, tsmaOptFreeUsefulTsma); + taosArrayDestroy(pTsmaOptCtx->pUsedTsmas); taosMemoryFreeClear(pTsmaOptCtx->queryInterval); } @@ -5869,7 +5884,10 @@ static bool tsmaOptCheckValidInterval(int64_t tsmaInterval, int8_t tsmaIntevalUn if (!pTsmaOptCtx->queryInterval) return true; // TODO save tsmaInterval in table precision to avoid convertions - int32_t code = getDuration(tsmaInterval, tsmaIntevalUnit, &tsmaInterval, pTsmaOptCtx->queryInterval->precision); + // TODO save the right unit + int32_t code = + getDuration(convertTimeFromPrecisionToUnit(tsmaInterval, pTsmaOptCtx->queryInterval->precision, tsmaIntevalUnit), + tsmaIntevalUnit, &tsmaInterval, pTsmaOptCtx->queryInterval->precision); ASSERT(code == TSDB_CODE_SUCCESS); bool validInterval = pTsmaOptCtx->queryInterval->interval % tsmaInterval == 0; bool validSliding = pTsmaOptCtx->queryInterval->sliding % tsmaInterval == 0; @@ -5877,7 +5895,7 @@ static bool tsmaOptCheckValidInterval(int64_t tsmaInterval, int8_t tsmaIntevalUn return validInterval && validSliding && validOffset; } -static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQueryFuncs) { +static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQueryFuncs, SArray* pTsmaScanCols) { SNode* pNode; int32_t tsmaColNum = 1; bool failed = false, found = false; @@ -5892,8 +5910,11 @@ static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQ } } + taosArrayClear(pTsmaScanCols); FOREACH(pNode, pQueryFuncs) { SFunctionNode* pQueryFunc = (SFunctionNode*)pNode; + // TODO handle _wstart + if (fmIsPseudoColumnFunc(pQueryFunc->funcId)) continue; if (1 != pQueryFunc->pParameterList->length || nodeType(pQueryFunc->pParameterList->pHead->pNode) != QUERY_NODE_COLUMN) { failed = true; @@ -5902,18 +5923,15 @@ static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQ int32_t queryColId = ((SColumnNode*)pQueryFunc->pParameterList->pHead->pNode)->colId; found = false; // iterate funcs + // TODO if func is count, skip checking cols for (int32_t i = 0; i < pTsmaFuncs->size; i += tsmaColNum) { STableTSMAFuncInfo* pTsmaFuncInfo = taosArrayGet(pTsmaFuncs, i); - if (pQueryFunc->funcId < pTsmaFuncInfo->funcId) { - failed = true; - break; - } - if (pQueryFunc->funcId > pTsmaFuncInfo->funcId) { + if (!fmIsMyStateFunc(pQueryFunc->funcId, pTsmaFuncInfo->funcId)) { continue; } // iterate cols within a func - for (int32_t j = i; j < tsmaColNum; ++j) { + for (int32_t j = i; j < tsmaColNum + i; ++j) { if (j > i) { pTsmaFuncInfo = taosArrayGet(pTsmaFuncs, j); } @@ -5925,6 +5943,7 @@ static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQ continue; } found= true; + taosArrayPush(pTsmaScanCols, &j); break; } break; @@ -5936,23 +5955,39 @@ static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQ return found; } -static void tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) { +static int32_t tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) { + STSMAOptUsefulTsma usefulTsma = {.pTsma = NULL, + .scanRange = {.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, + .windowRange = {.skey = TSKEY_MIN, .ekey = TSKEY_MAX}}; + SArray* pTsmaScanCols = NULL; + for (int32_t i = 0; i < pTsmaOptCtx->pTsmas->size; ++i) { + if (!pTsmaScanCols) { + pTsmaScanCols = taosArrayInit(pTsmaOptCtx->pAggFuncs->length, sizeof(int32_t)); + if (!pTsmaScanCols) return TSDB_CODE_OUT_OF_MEMORY; + } + STableTSMAInfo* pTsma = taosArrayGetP(pTsmaOptCtx->pTsmas, i); // filter with interval + // TODO unit not right if (!tsmaOptCheckValidInterval(pTsma->interval, pTsma->unit, pTsmaOptCtx)) { continue; } // filter with funcs, note that tsma funcs has been sorted by funcId and ColId - if (!tsmaOptCheckValidFuncs(pTsma->pFuncs, pTsmaOptCtx->pAggFuncs)) { + if (!tsmaOptCheckValidFuncs(pTsma->pFuncs, pTsmaOptCtx->pAggFuncs, pTsmaScanCols)) { continue; } - STSMAOptUsefulTsma usefulTsma = {.pTsma = pTsma, .timeRange.skey = TSKEY_MIN, .timeRange.ekey = TSKEY_MAX}; - taosArrayPush(pTsmaOptCtx->usefulTsmas, &usefulTsma); + usefulTsma.pTsma = pTsma; + usefulTsma.pTsmaScanCols = pTsmaScanCols; + pTsmaScanCols = NULL; + taosArrayPush(pTsmaOptCtx->pUsefulTsmas, &usefulTsma); } + if (pTsmaScanCols) taosArrayDestroy(pTsmaScanCols); + // TODO filter smaller tsmas that not aligned with the biggest tsma + return TSDB_CODE_SUCCESS; } -static int tsmaInfoCompWithIntervalDesc(const void* pLeft, const void* pRight) { +static int32_t tsmaInfoCompWithIntervalDesc(const void* pLeft, const void* pRight) { const STSMAOptUsefulTsma* p = pLeft, *q = pRight; int64_t pInterval = p->pTsma->interval, qInterval = q->pTsma->interval; int32_t code = getDuration(pInterval, p->pTsma->unit, &pInterval, TSDB_TIME_PRECISION_MILLI); @@ -5964,22 +5999,216 @@ static int tsmaInfoCompWithIntervalDesc(const void* pLeft, const void* pRight) { return 0; } -static void tsmaOptSplitWindows(STSMAOptCtx *pTsmaOptCtx) { - // head windows - if (pTsmaOptCtx->pTimeRange->skey != TSKEY_MIN) { +static const STSMAOptUsefulTsma* tsmaOptFindUsefulTsma(const SArray* pUsefulTsmas, int32_t startIdx, int64_t alignInterval, int8_t precision) { + int64_t tsmaInterval; + for (int32_t i = startIdx; i < pUsefulTsmas->size; ++i) { + const STSMAOptUsefulTsma* pUsefulTsma = taosArrayGet(pUsefulTsmas, i); + getDuration(pUsefulTsma->pTsma->interval, pUsefulTsma->pTsma->unit, &tsmaInterval, precision); + if (alignInterval % tsmaInterval == 0) { + return pUsefulTsma; + } + } + return NULL; +} +static void tsmaOptInitIntervalFromTsma(SInterval* pInterval, const STableTSMAInfo* pTsma, int8_t precision) { + pInterval->interval = pTsma->interval; + pInterval->intervalUnit = pTsma->unit; + pInterval->sliding = pTsma->interval; + pInterval->slidingUnit = pTsma->unit; + pInterval->offset = 0; + pInterval->offsetUnit = pTsma->unit; + pInterval->precision = precision; +} + +// TODO refactor, remove some params +static void tsmaOptSplitWindows(STSMAOptCtx* pTsmaOptCtx, const STimeWindow* pScanRange, + const STimeWindow* pWindowRange, uint32_t tsmaStartIdx) { + bool needTailWindow = false; + bool isSkeyAlignedWithTsma = true, isEkeyAlignedWithTsma = true; + int64_t winSkey = TSKEY_MIN, winEkey = TSKEY_MAX; + int64_t startOfSkeyFirstWin = pScanRange->skey, endOfSkeyFirstWin; + int64_t startOfEkeyFirstWin = pScanRange->ekey, endOfEkeyFirstWin; + int64_t tsmaInterval; + SInterval interval; + STimeWindow scanRange = *pScanRange; + STimeWindow windowRange = *pWindowRange; + const SInterval* pInterval = pTsmaOptCtx->queryInterval; + const STSMAOptUsefulTsma* pUsefulTsma = taosArrayGet(pTsmaOptCtx->pUsefulTsmas, tsmaStartIdx); + const STableTSMAInfo* pTsma = pUsefulTsma->pTsma; + + if (!pInterval) { + tsmaOptInitIntervalFromTsma(&interval, pTsma, 0); + pInterval = &interval; } - // normal biggest tsma windows + getDuration(pTsma->interval, pTsma->unit, &tsmaInterval, pTsmaOptCtx->queryInterval->precision); + + // check for head windows + if (pScanRange->skey != TSKEY_MIN) { + startOfSkeyFirstWin = taosTimeTruncate(pScanRange->skey, pInterval); + endOfSkeyFirstWin = + taosTimeAdd(startOfSkeyFirstWin, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + isSkeyAlignedWithTsma = ((pScanRange->skey - startOfSkeyFirstWin) % tsmaInterval == 0); + } else { + endOfSkeyFirstWin = TSKEY_MIN; + } + + // check for tail windows + if (pScanRange->ekey != TSKEY_MAX) { + startOfEkeyFirstWin = taosTimeTruncate(pScanRange->ekey, pInterval); + endOfEkeyFirstWin = taosTimeAdd(startOfEkeyFirstWin, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + if (startOfEkeyFirstWin > startOfSkeyFirstWin) { + needTailWindow = true; + // TODO add some notes + isEkeyAlignedWithTsma = ((pScanRange->ekey + 1 - startOfEkeyFirstWin) % tsmaInterval == 0); + } + } + + // add head tsma if possible + if (!isSkeyAlignedWithTsma) { + windowRange.ekey = + taosTimeAdd(startOfSkeyFirstWin, pInterval->interval * 2, pInterval->intervalUnit, pInterval->precision); + scanRange.ekey = TMIN(scanRange.ekey, windowRange.ekey); + const STSMAOptUsefulTsma* pTsmaFound = tsmaOptFindUsefulTsma( + pTsmaOptCtx->pUsefulTsmas, tsmaStartIdx + 1, pScanRange->skey - startOfSkeyFirstWin, pInterval->precision); + STSMAOptUsefulTsma usefulTsma = {.pTsma = pTsmaFound ? pTsmaFound->pTsma : NULL, + .scanRange = scanRange, + .windowRange = windowRange, + .pTsmaScanCols = pTsmaFound ? pTsmaFound->pTsmaScanCols : NULL}; + taosArrayPush(pTsmaOptCtx->pUsedTsmas, &usefulTsma); + } + + // the main tsma + if (endOfSkeyFirstWin < startOfEkeyFirstWin) { + scanRange.ekey = TMIN(pScanRange->ekey, endOfEkeyFirstWin); + if (!isSkeyAlignedWithTsma) { + scanRange.skey = endOfSkeyFirstWin; + windowRange.skey = scanRange.skey; + } + windowRange.ekey = pWindowRange->ekey; + if (!isEkeyAlignedWithTsma) { + windowRange.ekey = endOfEkeyFirstWin; + } + STSMAOptUsefulTsma usefulTsma = {.pTsma = pTsma, + .scanRange = scanRange, + .windowRange = windowRange, + .pTsmaScanCols = pUsefulTsma->pTsmaScanCols}; + taosArrayPush(pTsmaOptCtx->pUsedTsmas, &usefulTsma); + } + + // add tail tsma if possible + if (!isEkeyAlignedWithTsma && needTailWindow) { + scanRange.skey = startOfEkeyFirstWin; + scanRange.ekey = pScanRange->ekey; + windowRange.skey = startOfEkeyFirstWin; + windowRange.ekey = pWindowRange->ekey; + const STSMAOptUsefulTsma* pTsmaFound = tsmaOptFindUsefulTsma( + pTsmaOptCtx->pUsefulTsmas, tsmaStartIdx + 1, pScanRange->ekey + 1 - startOfEkeyFirstWin, pInterval->precision); + STSMAOptUsefulTsma usefulTsma = {.pTsma = pTsmaFound ? pTsmaFound->pTsma : NULL, + .scanRange = scanRange, + .windowRange = windowRange, + .pTsmaScanCols = pTsmaFound ? pTsmaFound->pTsmaScanCols : NULL}; + taosArrayPush(pTsmaOptCtx->pUsedTsmas, &usefulTsma); + } +} + +static void tsmaOptRewriteAggFuncs(STSMAOptCtx* pTsmaOptCtx) { +} + +SNodeList* tsmaOptCreateTsmaScanCols(const STSMAOptUsefulTsma* pTsma, const SNodeList* pAggFuncs) { + ASSERT(pTsma->pTsma); + ASSERT(pTsma->pTsmaScanCols); + int32_t code; + SNode* pNode; + SNodeList* pScanCols = NULL; + + int32_t i = 0; + + FOREACH(pNode, pAggFuncs) { + SFunctionNode* pFunc = (SFunctionNode*)pNode; + if (fmIsPseudoColumnFunc(pFunc->funcId)) { + continue; + } + const int32_t* idx = taosArrayGet(pTsma->pTsmaScanCols, i); + SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); + if (pCol) { + // TODO why 2? + pCol->colId = *idx + 2; + pCol->tableType = TSDB_SUPER_TABLE; + pCol->tableId = pTsma->pTsma->destTbUid; + pCol->colType = COLUMN_TYPE_COLUMN; + strcpy(pCol->tableName, pTsma->pTsma->targetTb); + strcpy(pCol->dbName, pTsma->pTsma->targetDbFName); + strcpy(pCol->colName, pFunc->node.aliasName); + strcpy(pCol->node.aliasName, pFunc->node.aliasName); + pCol->node.resType.type = TSDB_DATA_TYPE_BINARY; + code = nodesListMakeStrictAppend(&pScanCols, (SNode*)pCol); + } else { + code = TSDB_CODE_OUT_OF_MEMORY; + } + if (code) break; + ++i; + } + + if (code) { + nodesDestroyList(pScanCols); + pScanCols = NULL; + } + return pScanCols; +} + +static int32_t tsmaOptRewriteScan(STSMAOptCtx* pTsmaOptCtx, const STSMAOptUsefulTsma* pTsma) { + SNode* pNode; + int32_t code = 0; + SScanLogicNode* pScan = (SScanLogicNode*)nodesCloneNode((SNode*)pTsmaOptCtx->pScan); + if (!pScan) code = TSDB_CODE_OUT_OF_MEMORY; + if (code == TSDB_CODE_SUCCESS) { + nodesDestroyList(pScan->pScanCols); + + pScan->scanRange.skey = pTsma->scanRange.skey; + pScan->scanRange.ekey = pTsma->scanRange.ekey; + + if (pTsma->pTsma) { + ASSERT(pTsma->pTsmaScanCols); + pScan->pScanCols = tsmaOptCreateTsmaScanCols(pTsma, pTsmaOptCtx->pAggFuncs); + if (!pScan->pScanCols) code = TSDB_CODE_OUT_OF_MEMORY; + if (code == TSDB_CODE_SUCCESS) { + pScan->tableId = pTsma->pTsma->destTbUid; + pScan->tableType = TSDB_SUPER_TABLE; + strcpy(pScan->tableName.tname, pTsma->pTsma->targetTb); //TODO set dbName + } + } + } + if (code) { + nodesDestroyNode((SNode*)pScan); + } + return code; +} + +static void tsmaOptRewriteWindow(STSMAOptCtx* pTsmaOptCtx, const STSMAOptUsefulTsma* pTsma) { + int32_t code = 0; + SWindowLogicNode* pWindow = (SWindowLogicNode*)nodesCloneNode((SNode*)pTsmaOptCtx->pParent); + if (!pWindow) code = TSDB_CODE_OUT_OF_MEMORY; + + if (code == TSDB_CODE_SUCCESS) { - // tail windows - if (pTsmaOptCtx->pTimeRange->ekey != TSKEY_MAX) { - } } static void tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) { - + for (int32_t i = 0; i < pTsmaOptCtx->pUsedTsmas->size; ++i) { + STSMAOptUsefulTsma* pTsma = taosArrayGet(pTsmaOptCtx->pUsedTsmas, i); + tsmaOptRewriteScan(pTsmaOptCtx, pTsma); + ENodeType parentType = nodeType(pTsmaOptCtx->pParent); + if (QUERY_NODE_LOGIC_PLAN_WINDOW == parentType) { + tsmaOptRewriteWindow(pTsmaOptCtx, pTsma); + } else if (parentType == QUERY_NODE_LOGIC_PLAN_AGG) { + } else { + ASSERT(0); + } + //tsmaOptRewriteAggFuncs(pTsmaOptCtx, pTsma); + } } static int32_t tsmaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { @@ -5988,18 +6217,23 @@ static int32_t tsmaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan SScanLogicNode* pScan = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, tsmaOptMayBeOptimized); if (!pScan) return code; - fillTSMAOptCtx(&tsmaOptCtx, pScan); - // 1. extract useful tsmas - tsmaOptFilterTsmas(&tsmaOptCtx); - // 2. sort useful tsmas with interval - taosArraySort(tsmaOptCtx.usefulTsmas, tsmaInfoCompWithIntervalDesc); - // 3. generate and replace logic plans - // a. split windows - tsmaOptSplitWindows(&tsmaOptCtx); - // b. create logic plan - tsmaOptGeneratePlan(&tsmaOptCtx); - // c. rewrite agg funcs - // + code = fillTSMAOptCtx(&tsmaOptCtx, pScan); + if (code == TSDB_CODE_SUCCESS) { + // 1. extract useful tsmas + code = tsmaOptFilterTsmas(&tsmaOptCtx); + + if (code == TSDB_CODE_SUCCESS && tsmaOptCtx.pUsefulTsmas->size > 0) { + // 2. sort useful tsmas with interval + taosArraySort(tsmaOptCtx.pUsefulTsmas, tsmaInfoCompWithIntervalDesc); + // 3. generate and replace logic plans + // a. split windows + tsmaOptSplitWindows(&tsmaOptCtx, tsmaOptCtx.pTimeRange, &(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, 0); + // b. create logic plan + tsmaOptGeneratePlan(&tsmaOptCtx); + // c. rewrite agg funcs + tsmaOptRewriteAggFuncs(&tsmaOptCtx); + } + } clearTSMAOptCtx(&tsmaOptCtx); return code; }