From 0fd66d7e8aaeb0f8e2c60e0dc6a2c28310870730 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Fri, 15 Dec 2023 18:11:22 +0800 Subject: [PATCH] tsma optimization --- include/common/ttime.h | 1 + include/libs/catalog/catalog.h | 4 +- include/libs/function/functionMgt.h | 1 + include/libs/nodes/nodes.h | 1 + include/libs/nodes/plannodes.h | 1 + include/libs/nodes/querynodes.h | 1 + source/common/src/ttime.c | 2 +- source/dnode/mnode/impl/src/mndSma.c | 1 + source/libs/catalog/src/catalog.c | 40 ++++ source/libs/catalog/src/ctgCache.c | 19 +- source/libs/catalog/src/ctgRemote.c | 2 +- source/libs/function/src/functionMgt.c | 16 ++ source/libs/nodes/src/nodesCodeFuncs.c | 7 + source/libs/nodes/src/nodesUtilFuncs.c | 67 ++++++ source/libs/nodes/test/nodesTestMain.cpp | 58 +++++ source/libs/parser/inc/parUtil.h | 3 +- source/libs/parser/src/parTranslater.c | 71 +++++- source/libs/parser/src/parUtil.c | 12 ++ source/libs/planner/src/planLogicCreater.c | 1 + source/libs/planner/src/planOptimizer.c | 238 +++++++++++++++++++++ 20 files changed, 533 insertions(+), 13 deletions(-) diff --git a/include/common/ttime.h b/include/common/ttime.h index bd123c860e..d890b729d4 100644 --- a/include/common/ttime.h +++ b/include/common/ttime.h @@ -88,6 +88,7 @@ char getPrecisionUnit(int32_t precision); int64_t convertTimePrecision(int64_t ts, int32_t fromPrecision, int32_t toPrecision); int64_t convertTimeFromPrecisionToUnit(int64_t ts, int32_t fromPrecision, char toUnit); int32_t convertStringToTimestamp(int16_t type, char* inputData, int64_t timePrec, int64_t* timeVal); +int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t timePrecision); void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t ts, int32_t precision); diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index c004709728..46db0d19bd 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -123,7 +123,7 @@ typedef struct SMetaData { SArray* pTableTag; // pRes = SArray* SArray* pDnodeList; // pRes = SArray* SArray* pView; // pRes = SViewMeta* - SArray* pTableTsmas; // pRes = SArray* + SArray* pTableTsmas; // pRes = SArray SMetaRes* pSvrVer; // pRes = char* } SMetaData; @@ -410,6 +410,8 @@ int32_t catalogUpdateTSMA(SCatalog* pCtg, STableTSMAInfo** ppTsma); int32_t catalogRemoveTSMA(SCatalog* pCtg, const STableTSMAInfo* pTsma); +int32_t catalogGetTableTsmas(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pRes); + /** * Destroy catalog and relase all resources */ diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 8fd2fdd71c..7d90484fbf 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -275,6 +275,7 @@ char* fmGetFuncName(int32_t funcId); bool fmIsTSMASupportedFunc(func_id_t funcid); int32_t rewriteFuncsForTSMA(SNodeList* pFuncs); +int32_t getFuncId(const char* name); #ifdef __cplusplus } diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 891084419b..2a80a7ae23 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -169,6 +169,7 @@ int32_t nodesMsgToNode(const char* pStr, int32_t len, SNode** pNode); int32_t nodesNodeToSQL(SNode* pNode, char* buf, int32_t bufSize, int32_t* len); char* nodesGetNameFromColumnNode(SNode* pNode); int32_t nodesGetOutputNumFromSlotList(SNodeList* pSlots); +void nodesSortList(SNodeList** pList, bool (*)(SNode* pNode1, SNode* pNode2)); #ifdef __cplusplus } diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 4c98749fce..fe4e465e17 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -109,6 +109,7 @@ typedef struct SScanLogicNode { int8_t igExpired; int8_t igCheckUpdate; SArray* pSmaIndexes; + SArray* pTsmas; SNodeList* pGroupTags; bool groupSort; SNodeList* pTags; // for create stream diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 3ef8d34969..99fff8cdea 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -198,6 +198,7 @@ typedef struct SRealTableNode { double ratio; SArray* pSmaIndexes; int8_t cacheLastMode; + SArray* pTsmas; } SRealTableNode; typedef struct STempTableNode { diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index e77d08574e..52379f10d7 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -581,7 +581,7 @@ int32_t convertStringToTimestamp(int16_t type, char* inputData, int64_t timePrec return TSDB_CODE_SUCCESS; } -static int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t timePrecision) { +int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t timePrecision) { switch (unit) { case 's': if (val > INT64_MAX / MILLISECOND_PER_SECOND) { diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index aab3c4a1e8..dab5410b81 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -1603,6 +1603,7 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) { if (mndCheckCreateSmaReq(&createReq)) goto _OVER; + // TODO handle normal table pStb = mndAcquireStb(pMnode, createReq.stb); if (!pStb) { mError("tsma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 5d35037a3b..8abcce95ce 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1830,6 +1830,46 @@ _return: CTG_API_LEAVE(code); } +int32_t ctgGetTbTsmas(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName, SArray** ppRes) { + STableTSMAInfoRsp tsmasRsp = {0}; + int32_t code = ctgGetTbTSMAFromMnode(pCtg, pConn, pTableName, &tsmasRsp, NULL); + if (code == TSDB_CODE_MND_SMA_NOT_EXIST) { + code = 0; + goto _return; + } + CTG_ERR_JRET(code); + assert(tsmasRsp.pTsmas); + assert(tsmasRsp.pTsmas->size > 0); + *ppRes = tsmasRsp.pTsmas; + tsmasRsp.pTsmas = NULL; + + for (int32_t i = 0; i < (*ppRes)->size; ++i) { + CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, taosArrayGet((*ppRes), i), false)); + } + return TSDB_CODE_SUCCESS; + +_return: + if (tsmasRsp.pTsmas) { + tFreeTableTSMAInfoRsp(&tsmasRsp); + } + CTG_RET(code); +} + +int32_t catalogGetTableTsmas(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pRes) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pConn || NULL == pTableName || NULL == pRes) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + int32_t code = 0; + CTG_ERR_JRET(ctgGetTbTsmas(pCtg, pConn, (SName*)pTableName, pRes)); + +_return: + + CTG_API_LEAVE(code); +} + int32_t catalogClearCache(void) { CTG_API_ENTER_NOLOCK(); diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 4f8a8adbf8..6bc5f6cdbd 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -3274,12 +3274,18 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx CTG_CACHE_HIT_INC(CTG_CI_TBL_TSMA, 1); - STableTSMAInfoRsp rsp; - rsp.pTsmas = taosArrayInit(pCache->pTsmas->size, POINTER_BYTES); - if (!rsp.pTsmas) { + // TODO use construct and destructor pattern + STableTSMAInfoRsp *pRsp = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp)); + if (!pRsp) { ctgReleaseTSMAToCache(pCtg, dbCache, pCache); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } + pRsp->pTsmas = taosArrayInit(pCache->pTsmas->size, POINTER_BYTES); + if (!pRsp->pTsmas) { + ctgReleaseTSMAToCache(pCtg, dbCache, pCache); + taosMemoryFreeClear(pRsp); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } SMetaRes res = {0}; for (int32_t i = 0; i < pCache->pTsmas->size; ++i) { STSMACache *pTsmaOut = NULL; @@ -3287,12 +3293,13 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx code = ctgCloneTbTSMA(pTsmaCache, &pTsmaOut); if (code) { ctgReleaseTSMAToCache(pCtg, dbCache, pCache); - tFreeTableTSMAInfoRsp(&rsp); + tFreeTableTSMAInfoRsp(pRsp); + taosMemoryFreeClear(pRsp); CTG_ERR_RET(code); } - taosArrayPush(rsp.pTsmas, &pTsmaOut); + taosArrayPush(pRsp->pTsmas, &pTsmaOut); } - res.pRes = rsp.pTsmas; + res.pRes = pRsp; taosArrayPush(pCtx->pResList, &res); taosHashRelease(dbCache->tsmaCache, pCache); } diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 430fb31354..5166e1c58a 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -1485,7 +1485,7 @@ int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* na char* msg = NULL; int32_t msgLen = 0; int32_t reqType = TDMT_MND_GET_TABLE_TSMA; - SCtgTask* pTask = tReq->pTask; + SCtgTask* pTask = tReq ? tReq->pTask : NULL; void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(name, tbFName); diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 686ed4b7a8..c25c64bfa7 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -567,3 +567,19 @@ int32_t rewriteFuncsForTSMA(SNodeList* pFuncs) { } return code; } + +int32_t getFuncId(const char* name) { + if (NULL != gFunMgtService.pFuncNameHashTable) { + void* pVal = taosHashGet(gFunMgtService.pFuncNameHashTable, name, strlen(name)); + if (NULL != pVal) { + return *(int32_t*)pVal; + } + return -1; + } + for (int32_t i = 0; i < funcMgtBuiltinsNum; ++i) { + if (0 == strcmp(funcMgtBuiltins[i].name, name)) { + return i; + } + } + return -1; +} diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index cb0517bbc2..4091b2ea23 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -4432,6 +4432,10 @@ static int32_t realTableNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddTArray(pJson, jkRealTableSmaIndexes, tableIndexInfoToJson, pNode->pSmaIndexes); } + if (TSDB_CODE_SUCCESS == code) { + //TODO + //code = tjsonAddTArray(SJson *pJson, const char *pName, FToJson func, const SArray *pArray); + } return code; } @@ -4457,6 +4461,9 @@ static int32_t jsonToRealTableNode(const SJson* pJson, void* pObj) { code = tjsonToTArray(pJson, jkRealTableSmaIndexes, jsonToTableIndexInfo, &pNode->pSmaIndexes, sizeof(STableIndexInfo)); } + if (TSDB_CODE_SUCCESS == code) { + //TODO + } return code; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 3da5ba9370..8f0f4a47fa 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -2630,3 +2630,70 @@ bool nodesIsTableStar(SNode* pNode) { return (QUERY_NODE_COLUMN == nodeType(pNode)) && ('\0' != ((SColumnNode*)pNode)->tableAlias[0]) && (0 == strcmp(((SColumnNode*)pNode)->colName, "*")); } + +void nodesSortList(SNodeList** pList, bool (*comp)(SNode* pNode1, SNode* pNode2)) { + if ((*pList)->length == 1) return; + + uint32_t inSize = 1; + SListCell* pHead = (*pList)->pHead; + while (1) { + SListCell* p = pHead; + pHead = NULL; + SListCell* pTail = NULL; + + uint32_t nMerges = 0; + while (p) { + ++nMerges; + SListCell* q = p; + uint32_t pSize = 0; + for (uint32_t i = 0; i < inSize; ++i) { + ++pSize; + q = q->pNext; + if (!q) { + break; + } + } + + uint32_t qSize = inSize; + + while (pSize > 0 || (qSize > 0 && q)) { + SListCell* pCell; + if (pSize == 0) { + pCell = q; + q = q->pNext; + --qSize; + } else if (qSize == 0 || !q) { + pCell = p; + p = p->pNext; + --pSize; + } else if (!comp(q->pNode, p->pNode)) { + pCell = p; + p = p->pNext; + --pSize; + } else { + pCell = q; + q = q->pNext; + --qSize; + } + + if (pTail) { + pTail->pNext = pCell; + pCell->pPrev = pTail; + } else { + pHead = pCell; + pHead->pPrev = NULL; + } + pTail = pCell; + } + p = q; + } + pTail->pNext = NULL; + + if (nMerges <= 1) { + (*pList)->pHead = pHead; + (*pList)->pTail = pTail; + return; + } + inSize *= 2; + } +} diff --git a/source/libs/nodes/test/nodesTestMain.cpp b/source/libs/nodes/test/nodesTestMain.cpp index 356b13f4a7..4bfe807d06 100644 --- a/source/libs/nodes/test/nodesTestMain.cpp +++ b/source/libs/nodes/test/nodesTestMain.cpp @@ -56,6 +56,64 @@ TEST(NodesTest, traverseTest) { nodesDestroyNode(pRoot); } +bool compareValueNode(SNode* pNode1, SNode* pNode2) { + SValueNode* p1 = (SValueNode*)pNode1; + SValueNode* p2 = (SValueNode*)pNode2; + + return p1->datum.i < p2->datum.i; +} + +void assert_sort_result(SNodeList* pList) { + SNode* pNode; + int32_t i = 0; + FOREACH(pNode, pList) { + SValueNode* p = (SValueNode*)pNode; + ASSERT_EQ(p->datum.i, i++); + } + SListCell* pCell = pList->pHead; + ASSERT_TRUE(pCell->pPrev == NULL); + ASSERT_TRUE(pList->pTail->pNext == NULL); + int32_t len = 1; + while (pCell) { + if (pCell->pNext) { + ASSERT_TRUE(pCell->pNext->pPrev == pCell); + } + pCell = pCell->pNext; + if (pCell) len++; + } + ASSERT_EQ(len, pList->length); +} + +TEST(NodesTest, sort) { + SValueNode *vn1 = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); + vn1->datum.i = 4; + + SValueNode *vn2 = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); + vn2->datum.i = 3; + + SValueNode *vn3 = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); + vn3->datum.i = 2; + + SValueNode *vn4 = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); + vn4->datum.i = 1; + + SValueNode *vn5 = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); + vn5->datum.i = 0; + + SNodeList* l = NULL; + nodesListMakeAppend(&l, (SNode*)vn1); + nodesListMakeAppend(&l, (SNode*)vn2); + nodesListMakeAppend(&l, (SNode*)vn3); + nodesListMakeAppend(&l, (SNode*)vn4); + nodesListMakeAppend(&l, (SNode*)vn5); + + nodesSortList(&l, compareValueNode); + + assert_sort_result(l); + + nodesDestroyList(l); +} + int main(int argc, char* argv[]) { testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); diff --git a/source/libs/parser/inc/parUtil.h b/source/libs/parser/inc/parUtil.h index 4bd8b80b92..7009960819 100644 --- a/source/libs/parser/inc/parUtil.h +++ b/source/libs/parser/inc/parUtil.h @@ -110,7 +110,7 @@ typedef struct SParseMetaCache { SHashObj* pTableIndex; // key is tbFName, element is SArray* SHashObj* pTableCfg; // key is tbFName, element is STableCfg* SHashObj* pViews; // key is viewFName, element is SViewMeta* - SHashObj* pTableTSMAs; // key is tbFName, elements are SArray* + SHashObj* pTableTSMAs; // key is tbFName, elements are SArray SArray* pDnodes; // element is SEpSet bool dnodeRequired; } SParseMetaCache; @@ -170,6 +170,7 @@ int32_t getTableCfgFromCache(SParseMetaCache* pMetaCache, const SName* pName, ST int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes); void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request); SNode* createSelectStmtImpl(bool isDistinct, SNodeList* pProjectionList, SNode* pTable, SNodeList* pHint); +int32_t getTableTsmasFromCache(SParseMetaCache* pMetaCache, const SName* pTbName, SArray** pTsmas); /** * @brief return a - b with overflow check diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index cdd7f02511..da9a817fa1 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -707,6 +707,22 @@ static int32_t getDnodeList(STranslateContext* pCxt, SArray** pDnodes) { return code; } +static int32_t getTableTsmas(STranslateContext* pCxt, const SName* pName, SArray** ppTsmas) { + SParseContext* pParCxt = pCxt->pParseCxt; + int32_t code = 0; + if (pParCxt->async) { + code = getTableTsmasFromCache(pCxt->pMetaCache, pName, ppTsmas); + } else { + SRequestConnInfo conn = {.pTrans = pParCxt->pTransporter, + .requestId = pParCxt->requestId, + .requestObjRefId = pParCxt->requestRid, + .mgmtEps = pParCxt->mgmtEpSet}; + code = catalogGetTableTsmas(pParCxt->pCatalog, &conn, pName, ppTsmas); + } + if (code) parserError("0x%" PRIx64 " getDnodeList error, code:%s", pCxt->pParseCxt->requestId, tstrerror(code)); + return code; +} + static int32_t initTranslateContext(SParseContext* pParseCxt, SParseMetaCache* pMetaCache, STranslateContext* pCxt) { pCxt->pParseCxt = pParseCxt; pCxt->errCode = TSDB_CODE_SUCCESS; @@ -3612,13 +3628,23 @@ static int32_t setTableIndex(STranslateContext* pCxt, SName* pName, SRealTableNo if (pCxt->createStream || QUERY_SMA_OPTIMIZE_DISABLE == tsQuerySmaOptimize) { return TSDB_CODE_SUCCESS; } - if (isSelectStmt(pCxt->pCurrStmt) && NULL != ((SSelectStmt*)pCxt->pCurrStmt)->pWindow && + if (0 && isSelectStmt(pCxt->pCurrStmt) && NULL != ((SSelectStmt*)pCxt->pCurrStmt)->pWindow && QUERY_NODE_INTERVAL_WINDOW == nodeType(((SSelectStmt*)pCxt->pCurrStmt)->pWindow)) { return getTableIndex(pCxt, pName, &pRealTable->pSmaIndexes); } return TSDB_CODE_SUCCESS; } +static int32_t setTableTsmas(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) { + if (pCxt->createStream || QUERY_SMA_OPTIMIZE_DISABLE == tsQuerySmaOptimize) { + return TSDB_CODE_SUCCESS; + } + if (isSelectStmt(pCxt->pCurrStmt)) { + return getTableTsmas(pCxt, pName, &pRealTable->pTsmas); + } + return TSDB_CODE_SUCCESS; +} + static int32_t setTableCacheLastMode(STranslateContext* pCxt, SSelectStmt* pSelect) { if ((!pSelect->hasLastRowFunc && !pSelect->hasLastFunc) || QUERY_NODE_REAL_TABLE != nodeType(pSelect->pFromTable) || TSDB_SYSTEM_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType) { @@ -4211,6 +4237,9 @@ int32_t translateTable(STranslateContext* pCxt, SNode** pTable, SNode* pJoinPare if (TSDB_CODE_SUCCESS == code) { code = setTableIndex(pCxt, &name, pRealTable); } + if (TSDB_CODE_SUCCESS == code) { + code = setTableTsmas(pCxt, &name, pRealTable); + } } if (TSDB_CODE_SUCCESS == code) { pRealTable->table.precision = pRealTable->pMeta->tableInfo.precision; @@ -4934,8 +4963,8 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindo if (IS_CALENDAR_TIME_DURATION(pSliding->unit)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SLIDING_UNIT); } - if ((pSliding->datum.i < - convertTimeFromPrecisionToUnit(tsMinSlidingTime, TSDB_TIME_PRECISION_MILLI, pSliding->unit)) || + if ((pSliding->datum.i < convertTimePrecision(tsMinSlidingTime, TSDB_TIME_PRECISION_MILLI, + precision)) || (pInter->datum.i / pSliding->datum.i > INTERVAL_SLIDING_FACTOR)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL); } @@ -10371,6 +10400,18 @@ SNode* createColumnNodeWithName(const char* name) { return (SNode*)pCol; } +static bool sortFuncWithFuncId(SNode* pNode1, SNode* pNode2) { + SFunctionNode* pFunc1 = (SFunctionNode*)pNode1; + SFunctionNode* pFunc2 = (SFunctionNode*)pNode2; + return pFunc1->funcId < pFunc2->funcId; +} + +static bool sortColWithColId(SNode* pNode1, SNode* pNode2) { + SColumnNode* pCol1 = (SColumnNode*)pNode1; + SColumnNode* pCol2 = (SColumnNode*)pNode2; + return pCol1->colId < pCol2->colId; +} + static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq, STableMeta* pTableMeta) { int32_t code = TSDB_CODE_SUCCESS; @@ -10444,9 +10485,33 @@ translateTSMAFuncs(STranslateContext * pCxt, SCreateTSMAStmt* pStmt, STableMeta* break; } strcpy(pCol->colName, schema->name); + pCol->colId = schema->colId; + } + } + } else { + SNode* pNode; + FOREACH(pNode, pStmt->pOptions->pCols) { + SColumnNode* pCol = (SColumnNode*)pNode; + for (int32_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i) { + const SSchema* pSchema = &pTableMeta->schema[i]; + if (strcmp(pSchema->name, pCol->colName) == 0) { + pCol->colId = pSchema->colId; + break; + } } } } + nodesSortList(&pStmt->pOptions->pCols, sortColWithColId); + SNode* pNode; + FOREACH(pNode, pStmt->pOptions->pFuncs) { + SFunctionNode* pFunc = (SFunctionNode*)pNode; + int32_t funcId = getFuncId(pFunc->functionName); + if (funcId < 0) { + return TSDB_CODE_FUNC_NOT_BUILTIN_FUNTION; + } + pFunc->funcId = funcId; + } + nodesSortList(&pStmt->pOptions->pFuncs, sortFuncWithFuncId); if (TSDB_CODE_SUCCESS == code) { // assemble funcs with columns SNode *pNode1, *pNode2; diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 725989a432..e5a6d0dead 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -1158,6 +1158,18 @@ int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, return code; } +int32_t getTableTsmasFromCache(SParseMetaCache* pMetaCache, const SName* pTbName, SArray** pTsmas) { + char tbFName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(pTbName, tbFName); + STableTSMAInfoRsp *pTsmasRsp = NULL; + STableTSMAInfo* pTsma = NULL; + int32_t code = getMetaDataFromHash(tbFName, strlen(tbFName), pMetaCache->pTableTSMAs, (void**)&pTsmasRsp); + if (TSDB_CODE_SUCCESS == code && pTsmasRsp) { + *pTsmas = pTsmasRsp->pTsmas; + } + return TSDB_CODE_SUCCESS; +} + STableCfg* tableCfgDup(STableCfg* pCfg) { STableCfg* pNew = taosMemoryMalloc(sizeof(*pNew)); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index f618bece13..81316089b7 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -364,6 +364,7 @@ static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealT TSWAP(pScan->pVgroupList, pRealTable->pVgroupList); TSWAP(pScan->pSmaIndexes, pRealTable->pSmaIndexes); + TSWAP(pScan->pTsmas, pRealTable->pTsmas); pScan->tableId = pRealTable->pMeta->uid; pScan->stableId = pRealTable->pMeta->suid; pScan->tableType = pRealTable->pMeta->tableType; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 71bf89531a..afc5e54dc4 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -5767,6 +5767,243 @@ static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub return code; } +static bool tsmaOptMayBeOptimized(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { + SNode* pTmpNode; + SNodeList* pFuncs = NULL; + SScanLogicNode* pScan = (SScanLogicNode*)pNode; + SLogicNode* pParent = pScan->node.pParent; + SNode* pConds = pScan->node.pConditions; + + if (pScan->scanType != SCAN_TYPE_TABLE || !pParent || pConds) return false; + + if (!pScan->pTsmas || pScan->pTsmas->size <= 0) { + return false; + } + + switch (nodeType(pParent)) { + case QUERY_NODE_LOGIC_PLAN_WINDOW: { + SWindowLogicNode* pWindow = (SWindowLogicNode*)pParent; + // only time window interval supported + if (pWindow->winType != WINDOW_TYPE_INTERVAL) return false; + pFuncs = pWindow->pFuncs; + } break; + case QUERY_NODE_LOGIC_PLAN_AGG: { + SAggLogicNode* pAgg = (SAggLogicNode*)pParent; + // group/partition by normal cols not supported + if (pAgg->pGroupKeys) return false; + pFuncs = pAgg->pAggFuncs; + } break; + default: + return false; + } + // TODO may need to replace func conds in having + + assert(pFuncs); + FOREACH(pTmpNode, pFuncs) { + SFunctionNode* pFunc = (SFunctionNode*)pTmpNode; + if (!fmIsTSMASupportedFunc(pFunc->funcId) && !fmIsPseudoColumnFunc(pFunc->funcId)) { + return false; + } + } + + return true; + } + return false; +} + +typedef struct STSMAOptUsefulTsma { + const STableTSMAInfo* pTsma; + STimeWindow timeRange; // scan time range for this tsma +} STSMAOptUsefulTsma; + +typedef struct STSMAOptCtx { + // input + const SLogicNode* pParent; // Agg or Interval + const SNodeList* pAggFuncs; + const STimeWindow* pTimeRange; + const SArray* pTsmas; + SInterval* queryInterval; + + // output + SArray* usefulTsmas; // SArray, sorted by tsma interval from long to short +} STSMAOptCtx; + +static int32_t fillTSMAOptCtx (STSMAOptCtx* pTsmaOptCtx, SScanLogicNode* pScan) { + int32_t code = 0; + pTsmaOptCtx->pParent = pScan->node.pParent; + pTsmaOptCtx->pTsmas = pScan->pTsmas; + pTsmaOptCtx->pTimeRange = &pScan->scanRange; + pTsmaOptCtx->queryInterval = taosMemoryCalloc(1, sizeof(SInterval)); + if (!pTsmaOptCtx->queryInterval) return TSDB_CODE_OUT_OF_MEMORY; + + if (nodeType(pTsmaOptCtx->pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) { + SWindowLogicNode* pWindow = (SWindowLogicNode*)pTsmaOptCtx->pParent; + pTsmaOptCtx->queryInterval->interval = pWindow->interval; + pTsmaOptCtx->queryInterval->intervalUnit = pWindow->intervalUnit; + pTsmaOptCtx->queryInterval->offset = pWindow->offset; + pTsmaOptCtx->queryInterval->offsetUnit = pWindow->intervalUnit; + pTsmaOptCtx->queryInterval->sliding = pWindow->sliding; + pTsmaOptCtx->queryInterval->slidingUnit = pWindow->slidingUnit; + pTsmaOptCtx->queryInterval->precision = pWindow->node.precision; + pTsmaOptCtx->queryInterval->tz = tsTimezone; + pTsmaOptCtx->pAggFuncs = pWindow->pFuncs; + } else { + ASSERT(nodeType(pTsmaOptCtx->pParent) == QUERY_NODE_LOGIC_PLAN_AGG); + SAggLogicNode* pAgg = (SAggLogicNode*)pTsmaOptCtx->pParent; + pTsmaOptCtx->pAggFuncs = pAgg->pAggFuncs; + } + pTsmaOptCtx->usefulTsmas = taosArrayInit(pScan->pTsmas->size, sizeof(STSMAOptUsefulTsma)); + if (!pTsmaOptCtx->usefulTsmas) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + return code; +} + +static void clearTSMAOptCtx(STSMAOptCtx* pTsmaOptCtx) { + taosArrayDestroy(pTsmaOptCtx->usefulTsmas); + taosMemoryFreeClear(pTsmaOptCtx->queryInterval); +} + +static bool tsmaOptCheckValidInterval(int64_t tsmaInterval, int8_t tsmaIntevalUnit, const STSMAOptCtx* pTsmaOptCtx) { + if (!pTsmaOptCtx->queryInterval) return true; + + // TODO save tsmaInterval in table precision to avoid convertions + int32_t code = getDuration(tsmaInterval, tsmaIntevalUnit, &tsmaInterval, pTsmaOptCtx->queryInterval->precision); + ASSERT(code == TSDB_CODE_SUCCESS); + bool validInterval = pTsmaOptCtx->queryInterval->interval % tsmaInterval == 0; + bool validSliding = pTsmaOptCtx->queryInterval->sliding % tsmaInterval == 0; + bool validOffset = pTsmaOptCtx->queryInterval->offset % tsmaInterval == 0; + return validInterval && validSliding && validOffset; +} + +static bool tsmaOptCheckValidFuncs(const SArray* pTsmaFuncs, const SNodeList* pQueryFuncs) { + SNode* pNode; + int32_t tsmaColNum = 1; + bool failed = false, found = false; + int32_t firstFuncId = ((STableTSMAFuncInfo*)taosArrayGet(pTsmaFuncs, 0))->funcId; + // find col num + for (int32_t i = 1; i < pTsmaFuncs->size; ++i) { + STableTSMAFuncInfo* pTsmaFunc = taosArrayGet(pTsmaFuncs, i); + if (firstFuncId == pTsmaFunc->funcId) { + tsmaColNum++; + } else { + break; + } + } + + FOREACH(pNode, pQueryFuncs) { + SFunctionNode* pQueryFunc = (SFunctionNode*)pNode; + if (1 != pQueryFunc->pParameterList->length || + nodeType(pQueryFunc->pParameterList->pHead->pNode) != QUERY_NODE_COLUMN) { + failed = true; + break; + } + int32_t queryColId = ((SColumnNode*)pQueryFunc->pParameterList->pHead->pNode)->colId; + found = false; + // iterate funcs + for (int32_t i = 0; i < pTsmaFuncs->size; i += tsmaColNum) { + STableTSMAFuncInfo* pTsmaFuncInfo = taosArrayGet(pTsmaFuncs, i); + if (pQueryFunc->funcId < pTsmaFuncInfo->funcId) { + failed = true; + break; + } + if (pQueryFunc->funcId > pTsmaFuncInfo->funcId) { + continue; + } + + // iterate cols within a func + for (int32_t j = i; j < tsmaColNum; ++j) { + if (j > i) { + pTsmaFuncInfo = taosArrayGet(pTsmaFuncs, j); + } + if (queryColId < pTsmaFuncInfo->colId) { + failed = true; + break; + } + if (queryColId > pTsmaFuncInfo->colId) { + continue; + } + found= true; + break; + } + break; + } + if (failed || !found) { + break; + } + } + return found; +} + +static void tsmaOptFilterTsmas(STSMAOptCtx* pTsmaOptCtx) { + for (int32_t i = 0; i < pTsmaOptCtx->pTsmas->size; ++i) { + STableTSMAInfo* pTsma = taosArrayGetP(pTsmaOptCtx->pTsmas, i); + // filter with interval + if (!tsmaOptCheckValidInterval(pTsma->interval, pTsma->unit, pTsmaOptCtx)) { + continue; + } + // filter with funcs, note that tsma funcs has been sorted by funcId and ColId + if (!tsmaOptCheckValidFuncs(pTsma->pFuncs, pTsmaOptCtx->pAggFuncs)) { + continue; + } + STSMAOptUsefulTsma usefulTsma = {.pTsma = pTsma, .timeRange.skey = TSKEY_MIN, .timeRange.ekey = TSKEY_MAX}; + taosArrayPush(pTsmaOptCtx->usefulTsmas, &usefulTsma); + } +} + +static int tsmaInfoCompWithIntervalDesc(const void* pLeft, const void* pRight) { + const STSMAOptUsefulTsma* p = pLeft, *q = pRight; + int64_t pInterval = p->pTsma->interval, qInterval = q->pTsma->interval; + int32_t code = getDuration(pInterval, p->pTsma->unit, &pInterval, TSDB_TIME_PRECISION_MILLI); + ASSERT(code == TSDB_CODE_SUCCESS); + code = getDuration(qInterval, q->pTsma->unit, &qInterval, TSDB_TIME_PRECISION_MILLI); + ASSERT(code == TSDB_CODE_SUCCESS); + if (pInterval > qInterval) return -1; + if (pInterval < qInterval) return 1; + return 0; +} + +static void tsmaOptSplitWindows(STSMAOptCtx *pTsmaOptCtx) { + // head windows + if (pTsmaOptCtx->pTimeRange->skey != TSKEY_MIN) { + + } + + // normal biggest tsma windows + + // tail windows + if (pTsmaOptCtx->pTimeRange->ekey != TSKEY_MAX) { + + } +} + +static void tsmaOptGeneratePlan(STSMAOptCtx* pTsmaOptCtx) { + +} + +static int32_t tsmaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { + int32_t code = 0; + STSMAOptCtx tsmaOptCtx = {0}; + SScanLogicNode* pScan = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, tsmaOptMayBeOptimized); + if (!pScan) return code; + + fillTSMAOptCtx(&tsmaOptCtx, pScan); + // 1. extract useful tsmas + tsmaOptFilterTsmas(&tsmaOptCtx); + // 2. sort useful tsmas with interval + taosArraySort(tsmaOptCtx.usefulTsmas, tsmaInfoCompWithIntervalDesc); + // 3. generate and replace logic plans + // a. split windows + tsmaOptSplitWindows(&tsmaOptCtx); + // b. create logic plan + tsmaOptGeneratePlan(&tsmaOptCtx); + // c. rewrite agg funcs + // + clearTSMAOptCtx(&tsmaOptCtx); + return code; +} + // clang-format off static const SOptimizeRule optimizeRuleSet[] = { {.pName = "ScanPath", .optimizeFunc = scanPathOptimize}, @@ -5791,6 +6028,7 @@ static const SOptimizeRule optimizeRuleSet[] = { {.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize}, {.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize}, {.pName = "PartitionCols", .optimizeFunc = partitionColsOpt}, + {.pName = "Tsma", .optimizeFunc = tsmaOptimize}, }; // clang-format on