From 31dc5533900b1008e44d342fef6231b953ab2b46 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 9 Jun 2022 13:24:04 +0800 Subject: [PATCH 1/4] feat: sma index optimize --- include/common/tmsg.h | 21 +- include/libs/catalog/catalog.h | 2 +- source/libs/catalog/src/catalog.c | 6 +- source/libs/parser/inc/parUtil.h | 4 +- source/libs/parser/src/parTranslater.c | 319 ++++++++++++++++++- source/libs/parser/src/parUtil.c | 4 + source/libs/parser/test/mockCatalogService.h | 1 + source/libs/planner/inc/planInt.h | 6 +- source/libs/planner/src/planLogicCreater.c | 43 ++- source/libs/planner/src/planOptimizer.c | 4 +- source/libs/planner/src/planSpliter.c | 47 +-- source/libs/planner/src/planner.c | 8 +- source/libs/planner/test/planOtherTest.cpp | 2 +- source/libs/planner/test/planTestUtil.cpp | 40 ++- 14 files changed, 404 insertions(+), 103 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4a3c4b0c3f..87b94d139c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2493,14 +2493,14 @@ int32_t tSerializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq int32_t tDeserializeSTableIndexReq(void* buf, int32_t bufLen, STableIndexReq* pReq); typedef struct { - int8_t intervalUnit; - int8_t slidingUnit; - int64_t interval; - int64_t offset; - int64_t sliding; - int64_t dstTbUid; - int32_t dstVgId; // for stream - char* expr; + int8_t intervalUnit; + int8_t slidingUnit; + int64_t interval; + int64_t offset; + int64_t sliding; + int64_t dstTbUid; + int32_t dstVgId; + char* expr; } STableIndexInfo; typedef struct { @@ -2510,7 +2510,6 @@ typedef struct { int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp); int32_t tDeserializeSTableIndexRsp(void* buf, int32_t bufLen, STableIndexRsp* pRsp); - typedef struct { int8_t mqMsgType; int32_t code; @@ -2751,8 +2750,8 @@ typedef struct { char* msg; } SVDeleteReq; -int32_t tSerializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq); -int32_t tDeserializeSVDeleteReq(void *buf, int32_t bufLen, SVDeleteReq *pReq); +int32_t tSerializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq); +int32_t tDeserializeSVDeleteReq(void* buf, int32_t bufLen, SVDeleteReq* pReq); typedef struct { int64_t affectedRows; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 21fee3cc3e..11241c3ae3 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -272,7 +272,7 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo); -int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* tbFName, SArray** pRes); +int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pName, SArray** pRes); int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 0f6a79c14c..d9f927abf0 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1136,13 +1136,15 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps CTG_API_LEAVE(ctgGetIndexInfoFromMnode(CTG_PARAMS_LIST(), indexName, pInfo, NULL)); } -int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* tbFName, SArray** pRes) { +int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pName, SArray** pRes) { CTG_API_ENTER(); - if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == tbFName || NULL == pRes) { + if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pName || NULL == pRes) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } + char tbFName[TSDB_TABLE_FNAME_LEN] = {0}; + tNameExtractFullName(pName, tbFName); CTG_API_LEAVE(ctgGetTbIndexFromMnode(CTG_PARAMS_LIST(), tbFName, pRes, NULL)); } diff --git a/source/libs/parser/inc/parUtil.h b/source/libs/parser/inc/parUtil.h index 0351023f5b..42719d95bd 100644 --- a/source/libs/parser/inc/parUtil.h +++ b/source/libs/parser/inc/parUtil.h @@ -46,6 +46,7 @@ typedef struct SParseMetaCache { SHashObj* pDbInfo; // key is tbFName, element is SDbInfo* SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass SHashObj* pUdf; // key is funcName, element is SFuncInfo* + SHashObj* pSmaIndex; // key is tbFName, element is SArray* } SParseMetaCache; int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...); @@ -58,7 +59,7 @@ int32_t getNumOfColumns(const STableMeta* pTableMeta); int32_t getNumOfTags(const STableMeta* pTableMeta); STableComInfo getTableInfo(const STableMeta* pTableMeta); STableMeta* tableMetaDup(const STableMeta* pTableMeta); -int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag **ppTag, SMsgBuf* pMsgBuf); +int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, SMsgBuf* pMsgBuf); int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen); @@ -84,6 +85,7 @@ int32_t getDbCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDb int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, const char* pDbFName, AUTH_TYPE type, bool* pPass); int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo); +int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes); #ifdef __cplusplus } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index f77bbb34f3..9777712134 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -25,6 +25,9 @@ #include "tglobal.h" #include "ttime.h" +#define SMA_TABLE_NAME "#sma_table" +#define SMA_COL_NAME_PREFIX "#sma_col_" + #define generateDealNodeErrMsg(pCxt, code, ...) \ (pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, code, ##__VA_ARGS__), DEAL_RES_ERROR) @@ -260,6 +263,28 @@ static int32_t getUdfInfo(STranslateContext* pCxt, SFunctionNode* pFunc) { return code; } +static int32_t getTableIndex(STranslateContext* pCxt, const char* pDbName, const char* pTableName, SArray** pIndexes) { + SParseContext* pParCxt = pCxt->pParseCxt; + SName name; + toName(pParCxt->acctId, pDbName, pTableName, &name); + int32_t code = TSDB_CODE_SUCCESS; + if (pParCxt->async) { + code = getTableIndexFromCache(pCxt->pMetaCache, &name, pIndexes); + } else { + code = collectUseDatabase(&name, pCxt->pDbs); + if (TSDB_CODE_SUCCESS == code) { + code = collectUseTable(&name, pCxt->pTables); + } + if (TSDB_CODE_SUCCESS == code) { + code = catalogGetTableIndex(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, &name, pIndexes); + } + } + if (TSDB_CODE_SUCCESS != code) { + parserError("getTableIndex error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pDbName, pTableName); + } + return code; +} + static int32_t initTranslateContext(SParseContext* pParseCxt, SParseMetaCache* pMetaCache, STranslateContext* pCxt) { pCxt->pParseCxt = pParseCxt; pCxt->errCode = TSDB_CODE_SUCCESS; @@ -334,6 +359,10 @@ static bool isIndefiniteRowsFunc(const SNode* pNode) { return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsIndefiniteRowsFunc(((SFunctionNode*)pNode)->funcId)); } +static bool isVectorFunc(const SNode* pNode) { + return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsVectorFunc(((SFunctionNode*)pNode)->funcId)); +} + static bool isDistinctOrderBy(STranslateContext* pCxt) { return (SQL_CLAUSE_ORDER_BY == pCxt->currClause && pCxt->pCurrSelectStmt->isDistinct); } @@ -1787,7 +1816,7 @@ static int64_t getMonthsFromTimeVal(int64_t val, int32_t fromPrecision, char uni return -1; } -static int32_t checkIntervalWindow(STranslateContext* pCxt, SNode* pWhere, SIntervalWindowNode* pInterval) { +static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode* pInterval) { uint8_t precision = ((SColumnNode*)pInterval->pCol)->node.resType.precision; SValueNode* pInter = (SValueNode*)pInterval->pInterval; @@ -1829,7 +1858,15 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SNode* pWhere, SInte } } - return translateFill(pCxt, pWhere, pInterval); + return TSDB_CODE_SUCCESS; +} + +static int32_t translateIntervalWindow(STranslateContext* pCxt, SSelectStmt* pSelect, SIntervalWindowNode* pInterval) { + int32_t code = checkIntervalWindow(pCxt, pInterval); + if (TSDB_CODE_SUCCESS == code) { + code = translateFill(pCxt, pSelect->pWhere, pInterval); + } + return code; } static EDealRes checkStateExpr(SNode* pNode, void* pContext) { @@ -1851,13 +1888,13 @@ static EDealRes checkStateExpr(SNode* pNode, void* pContext) { return DEAL_RES_CONTINUE; } -static int32_t checkStateWindow(STranslateContext* pCxt, SStateWindowNode* pState) { +static int32_t translateStateWindow(STranslateContext* pCxt, SStateWindowNode* pState) { nodesWalkExprPostOrder(pState->pExpr, checkStateExpr, pCxt); // todo check for "function not support for state_window" return pCxt->errCode; } -static int32_t checkSessionWindow(STranslateContext* pCxt, SSessionWindowNode* pSession) { +static int32_t translateSessionWindow(STranslateContext* pCxt, SSessionWindowNode* pSession) { if ('y' == pSession->pGap->unit || 'n' == pSession->pGap->unit || 0 == pSession->pGap->datum.i) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SESSION_GAP); } @@ -1868,14 +1905,14 @@ static int32_t checkSessionWindow(STranslateContext* pCxt, SSessionWindowNode* p return TSDB_CODE_SUCCESS; } -static int32_t checkWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { +static int32_t translateSpecificWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { switch (nodeType(pSelect->pWindow)) { case QUERY_NODE_STATE_WINDOW: - return checkStateWindow(pCxt, (SStateWindowNode*)pSelect->pWindow); + return translateStateWindow(pCxt, (SStateWindowNode*)pSelect->pWindow); case QUERY_NODE_SESSION_WINDOW: - return checkSessionWindow(pCxt, (SSessionWindowNode*)pSelect->pWindow); + return translateSessionWindow(pCxt, (SSessionWindowNode*)pSelect->pWindow); case QUERY_NODE_INTERVAL_WINDOW: - return checkIntervalWindow(pCxt, pSelect->pWhere, (SIntervalWindowNode*)pSelect->pWindow); + return translateIntervalWindow(pCxt, pSelect, (SIntervalWindowNode*)pSelect->pWindow); default: break; } @@ -1889,7 +1926,7 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { pCxt->currClause = SQL_CLAUSE_WINDOW; int32_t code = translateExpr(pCxt, &pSelect->pWindow); if (TSDB_CODE_SUCCESS == code) { - code = checkWindow(pCxt, pSelect); + code = translateSpecificWindow(pCxt, pSelect); } return code; } @@ -1965,6 +2002,270 @@ static int32_t rewriteTimelineFunc(STranslateContext* pCxt, SSelectStmt* pSelect nodesWalkSelectStmt(pSelect, SQL_CLAUSE_FROM, rewriteTimelineFuncImpl, pCxt); return pCxt->errCode; } +#if 0 +static bool mayBeApplySmaIndex(SSelectStmt* pSelect) { + if (NULL == pSelect->pWindow || QUERY_NODE_INTERVAL_WINDOW != nodeType(pSelect->pWindow) || + NULL != ((SIntervalWindowNode*)pSelect->pWindow)->pFill || + QUERY_NODE_REAL_TABLE != nodeType(pSelect->pFromTable) || NULL != pSelect->pWhere || + NULL != pSelect->pPartitionByList || NULL != pSelect->pGroupByList || NULL != pSelect->pHaving) { + return false; + } + return true; +} + +static bool equalIntervalWindow(SIntervalWindowNode* pInterval, SNode* pWhere, STableIndexInfo* pIndex) { + int64_t interval = ((SValueNode*)pInterval->pInterval)->datum.i; + int8_t intervalUnit = ((SValueNode*)pInterval->pInterval)->unit; + int64_t offset = (NULL != pInterval->pOffset ? ((SValueNode*)pInterval->pOffset)->datum.i : 0); + int64_t sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : interval); + int8_t slidingUnit = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : intervalUnit); + if (interval != pIndex->interval || intervalUnit != pIndex->intervalUnit || offset != pIndex->offset || + sliding != pIndex->sliding || slidingUnit != pIndex->slidingUnit) { + return false; + } + // todo + if (NULL != pWhere) { + return false; + } + return true; +} + +typedef struct SSmaIndexMatchFuncsCxt { + int32_t errCode; + uint64_t tableId; + SNodeList* pSmaFuncs; + SNodeList* pUseFuncs; + SNodeList* pUseCols; + SArray* pUseMap; + bool match; +} SSmaIndexMatchFuncsCxt; + +static SColumnNode* createColumnFromSmaFunc(uint64_t tableId, int32_t index, SExprNode* pSmaFunc) { + SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == pCol) { + return NULL; + } + pCol->tableId = tableId; + pCol->tableType = TSDB_SUPER_TABLE; + pCol->colId = index + 2; // skip timestamp primary col + pCol->colType = COLUMN_TYPE_COLUMN; + snprintf(pCol->colName, sizeof(pCol->colName), SMA_COL_NAME_PREFIX "%d", pCol->colId); + strcpy(pCol->tableName, SMA_TABLE_NAME); + strcpy(pCol->tableAlias, SMA_TABLE_NAME); + pCol->node.resType = pSmaFunc->resType; + strcpy(pCol->node.aliasName, pSmaFunc->aliasName); + return pCol; +} + +static int32_t collectSmaFunc(SSmaIndexMatchFuncsCxt* pCxt, int32_t index, SNode* pSmaFunc) { + if (NULL == pCxt->pUseMap) { + int32_t nfuncs = LIST_LENGTH(pCxt->pSmaFuncs); + pCxt->pUseMap = taosArrayInit(nfuncs, sizeof(int32_t)); + if (NULL == pCxt->pUseMap) { + return TSDB_CODE_OUT_OF_MEMORY; + } + int32_t initPos = -1; + for (int32_t i = 0; i < nfuncs; ++i) { + taosArrayPush(pCxt->pUseMap, &initPos); + } + } + int32_t pos = *(int32_t*)taosArrayGet(pCxt->pUseMap, index); + if (pos < 0) { + pos = LIST_LENGTH(pCxt->pUseFuncs); + taosArraySet(pCxt->pUseMap, index, &pos); + int32_t code = + nodesListMakeStrictAppend(&pCxt->pUseCols, createColumnFromSmaFunc(pCxt->tableId, index, (SExprNode*)pSmaFunc)); + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pCxt->pUseFuncs, nodesCloneNode(pSmaFunc)); + } + return code; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t findSmaFunc(SSmaIndexMatchFuncsCxt* pCxt, SNode* pNode, bool* pFound) { + if (!isAggFunc(pNode)) { + return TSDB_CODE_SUCCESS; + } + + int32_t index = 0; + SNode* pSmaFunc = NULL; + FOREACH(pSmaFunc, pCxt->pSmaFuncs) { + if (nodesEqualNode(pSmaFunc, pNode)) { + *pFound = true; + return collectSmaFunc(pCxt, index, pSmaFunc); + } + ++index; + } + return TSDB_CODE_SUCCESS; +} + +static EDealRes matchSmaFuncsImpl(SNode* pNode, void* pContext) { + SSmaIndexMatchFuncsCxt* pCxt = pContext; + + bool found = false; + pCxt->errCode = findSmaFunc(pCxt, pNode, &found); + if (TSDB_CODE_SUCCESS != pCxt->errCode) { + return DEAL_RES_ERROR; + } + + if (found) { + pCxt->match = true; + return DEAL_RES_IGNORE_CHILD; + } + + if (isVectorFunc(pNode)) { + pCxt->match = false; + return DEAL_RES_END; + } + + return DEAL_RES_CONTINUE; +} + +static int32_t matchSmaFuncs(SSelectStmt* pSelect, STableIndexInfo* pIndex, SNodeList* pSmaFuncs, SNodeList** pFuncs, + SNodeList** pCols) { + SSmaIndexMatchFuncsCxt cxt = {.errCode = TSDB_CODE_SUCCESS, + .tableId = pIndex->dstTbUid, + .pSmaFuncs = pSmaFuncs, + .pUseFuncs = NULL, + .pUseCols = NULL, + .pUseMap = NULL, + .match = false}; + nodesWalkExprs(pSelect->pProjectionList, matchSmaFuncsImpl, &cxt); + if (TSDB_CODE_SUCCESS == cxt.errCode && cxt.match) { + *pFuncs = cxt.pUseFuncs; + *pCols = cxt.pUseCols; + } else { + nodesDestroyList(cxt.pUseFuncs); + nodesDestroyList(cxt.pUseCols); + } + taosArrayDestroy(cxt.pUseMap); + return cxt.errCode; +} + +static int32_t couldApplySmaIndex(STranslateContext* pCxt, SSelectStmt* pSelect, STableIndexInfo* pIndex, + SNodeList** pFuncs, SNodeList** pCols) { + if (!equalIntervalWindow((SIntervalWindowNode*)pSelect->pWindow, pSelect->pWhere, pIndex)) { + return TSDB_CODE_SUCCESS; + } + SNodeList* pSmaFuncs = NULL; + int32_t code = nodesStringToList(pIndex->expr, &pSmaFuncs); + if (TSDB_CODE_SUCCESS == code) { + pCxt->currClause = SQL_CLAUSE_SELECT; + code = translateExprList(pCxt, pSmaFuncs); + } + if (TSDB_CODE_SUCCESS == code) { + code = matchSmaFuncs(pSelect, pIndex, pSmaFuncs, pFuncs, pCols); + } + nodesDestroyList(pSmaFuncs); + return code; +} + +typedef struct SSmaIndexRewriteFuncsCxt { + int32_t errCode; + SNodeList* pFuncs; + SNodeList* pCols; +} SSmaIndexRewriteFuncsCxt; + +static EDealRes rewriteFuncBySmaIndex(SNode** pNode, void* pContext) { + if (isAggFunc(*pNode)) { + SSmaIndexRewriteFuncsCxt* pCxt = pContext; + SNode* pFunc; + int32_t index = 0; + FOREACH(pFunc, pCxt->pFuncs) { + if (nodesEqualNode(pFunc, *pNode)) { + SNode* pNew = nodesCloneNode(nodesListGetNode(pCxt->pCols, index)); + if (NULL == pNew) { + pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; + return DEAL_RES_ERROR; + } + nodesDestroyNode(*pNode); + *pNode = pNew; + return DEAL_RES_IGNORE_CHILD; + } + ++index; + } + } + return DEAL_RES_CONTINUE; +} + +static int32_t rewriteTableBySmaIndex(SSelectStmt* pSelect, STableIndexInfo* pMatchIndex) { + nodesDestroyNode(pSelect->pFromTable); + SRealTableNode* pRealTable = nodesMakeNode(QUERY_NODE_REAL_TABLE); + if (NULL == pRealTable) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pRealTable->table.singleTable = true; + strcpy(pRealTable->table.tableName, SMA_TABLE_NAME); + pRealTable->pMeta = taosMemoryCalloc(1, sizeof(STableMeta)); + if (NULL == pRealTable->pMeta) { + nodesDestroyNode(pRealTable); + return TSDB_CODE_OUT_OF_MEMORY; + } + pRealTable->pMeta->vgId = pMatchIndex->dstVgId; + pRealTable->pMeta->uid = pMatchIndex->dstTbUid; + pRealTable->pVgroupList = taosMemoryCalloc(1, sizeof(SVgroupsInfo)); + if (NULL == pRealTable->pVgroupList) { + nodesDestroyNode(pRealTable); + return TSDB_CODE_OUT_OF_MEMORY; + } + // todo + pSelect->pFromTable = (SNode*)pRealTable; + return TSDB_CODE_SUCCESS; +} + +static int32_t rewriteSelectBySmaIndex(SSelectStmt* pSelect, STableIndexInfo* pMatchIndex, SNodeList* pFuncs, + SNodeList* pCols) { + SSmaIndexRewriteFuncsCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pFuncs = pFuncs, .pCols = pCols}; + nodesRewriteExprs(pSelect->pProjectionList, rewriteFuncBySmaIndex, &cxt); + if (TSDB_CODE_SUCCESS == cxt.errCode) { + cxt.errCode = rewriteTableBySmaIndex(pSelect, pMatchIndex); + } + return cxt.errCode; +} + +static int32_t attemptApplySmaIndexImpl(STranslateContext* pCxt, SSelectStmt* pSelect, SArray* pIndexes) { + if (NULL == pIndexes) { + return TSDB_CODE_SUCCESS; + } + int32_t nindexes = taosArrayGetSize(pIndexes); + for (int32_t i = 0; i < nindexes; ++i) { + STableIndexInfo* pIndex = taosArrayGet(pIndexes, i); + SNodeList* pFuncs = NULL; + SNodeList* pCols = NULL; + if (TSDB_CODE_SUCCESS != couldApplySmaIndex(pCxt, pSelect, pIndex, &pFuncs, &pCols)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (NULL != pFuncs) { + int32_t code = rewriteSelectBySmaIndex(pSelect, pIndex, pFuncs, pCols); + nodesDestroyList(pFuncs); + nodesDestroyList(pCols); + return code; + } + } + return TSDB_CODE_SUCCESS; +} + +static void destroySmaIndex(void* p) { taosMemoryFree(((STableIndexInfo*)p)->expr); } + +static int32_t attemptApplySmaIndex(STranslateContext* pCxt, SSelectStmt* pSelect) { + SRealTableNode* pRealTable = (SRealTableNode*)pSelect->pFromTable; + SArray* pIndexes = NULL; + int32_t code = getTableIndex(pCxt, pRealTable->table.dbName, pRealTable->table.tableName, &pIndexes); + if (TSDB_CODE_SUCCESS == code) { + code = attemptApplySmaIndexImpl(pCxt, pSelect, pIndexes); + } + taosArrayDestroyEx(pIndexes, destroySmaIndex); + return code; +} + +static int32_t attemptApplyIndex(STranslateContext* pCxt, SSelectStmt* pSelect) { + // if (mayBeApplySmaIndex(pSelect)) { + // return attemptApplySmaIndex(pCxt, pSelect); + // } + return TSDB_CODE_SUCCESS; +} +#endif static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) { pCxt->pCurrSelectStmt = pSelect; diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 147ff933fb..b4564e967d 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -836,3 +836,7 @@ int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFun memcpy(pInfo, *pRes, sizeof(SFuncInfo)); return TSDB_CODE_SUCCESS; } + +int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes) { + return TSDB_CODE_PAR_INTERNAL_ERROR; +} diff --git a/source/libs/parser/test/mockCatalogService.h b/source/libs/parser/test/mockCatalogService.h index 133a355c59..fafad81e95 100644 --- a/source/libs/parser/test/mockCatalogService.h +++ b/source/libs/parser/test/mockCatalogService.h @@ -57,6 +57,7 @@ class MockCatalogService { void createSubTable(const std::string& db, const std::string& stbname, const std::string& tbname, int16_t vgid); void showTables() const; void createFunction(const std::string& func, int8_t funcType, int8_t outputType, int32_t outputLen, int32_t bufSize); + void createSmaIndex(const SMCreateSmaReq* pReq); int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const; int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const; diff --git a/source/libs/planner/inc/planInt.h b/source/libs/planner/inc/planInt.h index 1a8c7657df..6cfef2a7fb 100644 --- a/source/libs/planner/inc/planInt.h +++ b/source/libs/planner/inc/planInt.h @@ -38,9 +38,9 @@ extern "C" { int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...); int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList); -int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode); -int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode); -int32_t splitLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SLogicSubplan** pLogicSubplan); +int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan); +int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan); +int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan); int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan); int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index a9f3909af6..5ae4687686 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1138,11 +1138,40 @@ static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogi return TSDB_CODE_FAILED; } -int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode) { - SLogicPlanContext cxt = {.pPlanCxt = pCxt}; - int32_t code = createQueryLogicNode(&cxt, pCxt->pAstRoot, pLogicNode); - if (TSDB_CODE_SUCCESS != code) { - return code; - } - return TSDB_CODE_SUCCESS; +static void doSetLogicNodeParent(SLogicNode* pNode, SLogicNode* pParent) { + pNode->pParent = pParent; + SNode* pChild; + FOREACH(pChild, pNode->pChildren) { doSetLogicNodeParent((SLogicNode*)pChild, pNode); } +} + +static void setLogicNodeParent(SLogicNode* pNode) { doSetLogicNodeParent(pNode, NULL); } + +int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) { + SLogicPlanContext cxt = {.pPlanCxt = pCxt}; + + SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN); + if (NULL == pSubplan) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pSubplan->id.queryId = pCxt->queryId; + pSubplan->id.groupId = 1; + pSubplan->id.subplanId = 1; + + int32_t code = createQueryLogicNode(&cxt, pCxt->pAstRoot, &pSubplan->pNode); + if (TSDB_CODE_SUCCESS == code) { + setLogicNodeParent(pSubplan->pNode); + if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pSubplan->pNode)) { + pSubplan->subplanType = SUBPLAN_TYPE_MODIFY; + } else { + pSubplan->subplanType = SUBPLAN_TYPE_SCAN; + } + } + + if (TSDB_CODE_SUCCESS == code) { + *pLogicSubplan = pSubplan; + } else { + nodesDestroyNode(pSubplan); + } + + return code; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 8346aca76f..f252fe7265 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -773,4 +773,6 @@ static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicNode* pLogicNode) { return TSDB_CODE_SUCCESS; } -int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode) { return applyOptimizeRule(pCxt, pLogicNode); } +int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) { + return applyOptimizeRule(pCxt, pLogicSubplan->pNode); +} diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index e5c81eda4b..97bb9389f2 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -915,14 +915,6 @@ static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) { return TSDB_CODE_SUCCESS; } -static void doSetLogicNodeParent(SLogicNode* pNode, SLogicNode* pParent) { - pNode->pParent = pParent; - SNode* pChild; - FOREACH(pChild, pNode->pChildren) { doSetLogicNodeParent((SLogicNode*)pChild, pNode); } -} - -static void setLogicNodeParent(SLogicNode* pNode) { doSetLogicNodeParent(pNode, NULL); } - static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { TSWAP(((SScanLogicNode*)pNode)->pVgroupList, pSubplan->pVgroupList); @@ -933,37 +925,10 @@ static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) { FOREACH(pChild, pNode->pChildren) { setVgroupsInfo((SLogicNode*)pChild, pSubplan); } } -int32_t splitLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SLogicSubplan** pLogicSubplan) { - SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN); - if (NULL == pSubplan) { - return TSDB_CODE_OUT_OF_MEMORY; +int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) { + if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pLogicSubplan->pNode)) { + setVgroupsInfo(pLogicSubplan->pNode, pLogicSubplan); + return TSDB_CODE_SUCCESS; } - - pSubplan->pNode = nodesCloneNode(pLogicNode); - if (NULL == pSubplan->pNode) { - nodesDestroyNode(pSubplan); - return TSDB_CODE_OUT_OF_MEMORY; - } - - pSubplan->id.queryId = pCxt->queryId; - pSubplan->id.groupId = 1; - setLogicNodeParent(pSubplan->pNode); - - int32_t code = TSDB_CODE_SUCCESS; - if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pLogicNode)) { - pSubplan->subplanType = SUBPLAN_TYPE_MODIFY; - TSWAP(((SVnodeModifyLogicNode*)pLogicNode)->pDataBlocks, ((SVnodeModifyLogicNode*)pSubplan->pNode)->pDataBlocks); - setVgroupsInfo(pSubplan->pNode, pSubplan); - } else { - pSubplan->subplanType = SUBPLAN_TYPE_SCAN; - code = applySplitRule(pCxt, pSubplan); - } - - if (TSDB_CODE_SUCCESS == code) { - *pLogicSubplan = pSubplan; - } else { - nodesDestroyNode(pSubplan); - } - - return code; -} \ No newline at end of file + return applySplitRule(pCxt, pLogicSubplan); +} diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 1921b16388..83657d27d0 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -26,16 +26,15 @@ static void dumpQueryPlan(SQueryPlan* pPlan) { } int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNodeList) { - SLogicNode* pLogicNode = NULL; SLogicSubplan* pLogicSubplan = NULL; SQueryLogicPlan* pLogicPlan = NULL; - int32_t code = createLogicPlan(pCxt, &pLogicNode); + int32_t code = createLogicPlan(pCxt, &pLogicSubplan); if (TSDB_CODE_SUCCESS == code) { - code = optimizeLogicPlan(pCxt, pLogicNode); + code = optimizeLogicPlan(pCxt, pLogicSubplan); } if (TSDB_CODE_SUCCESS == code) { - code = splitLogicPlan(pCxt, pLogicNode, &pLogicSubplan); + code = splitLogicPlan(pCxt, pLogicSubplan); } if (TSDB_CODE_SUCCESS == code) { code = scaleOutLogicPlan(pCxt, pLogicSubplan, &pLogicPlan); @@ -47,7 +46,6 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo dumpQueryPlan(*pPlan); } - nodesDestroyNode(pLogicNode); nodesDestroyNode(pLogicSubplan); nodesDestroyNode(pLogicPlan); terrno = code; diff --git a/source/libs/planner/test/planOtherTest.cpp b/source/libs/planner/test/planOtherTest.cpp index d5d37fda64..04d76a8b91 100644 --- a/source/libs/planner/test/planOtherTest.cpp +++ b/source/libs/planner/test/planOtherTest.cpp @@ -42,7 +42,7 @@ TEST_F(PlanOtherTest, createStreamUseSTable) { TEST_F(PlanOtherTest, createSmaIndex) { useDb("root", "test"); - run("create sma index index1 on t1 function(max(c1), min(c3 + 10), sum(c4)) interval(10s)"); + run("CREATE SMA INDEX idx1 ON t1 FUNCTION(MAX(c1), MIN(c3 + 10), SUM(c4)) INTERVAL(10s)"); } TEST_F(PlanOtherTest, explain) { diff --git a/source/libs/planner/test/planTestUtil.cpp b/source/libs/planner/test/planTestUtil.cpp index f5c8b58e43..4e89f9d008 100644 --- a/source/libs/planner/test/planTestUtil.cpp +++ b/source/libs/planner/test/planTestUtil.cpp @@ -104,13 +104,12 @@ class PlannerTestBaseImpl { SPlanContext cxt = {0}; setPlanContext(pQuery, &cxt); - SLogicNode* pLogicNode = nullptr; - doCreateLogicPlan(&cxt, &pLogicNode); - - doOptimizeLogicPlan(&cxt, pLogicNode); - SLogicSubplan* pLogicSubplan = nullptr; - doSplitLogicPlan(&cxt, pLogicNode, &pLogicSubplan); + doCreateLogicPlan(&cxt, &pLogicSubplan); + + doOptimizeLogicPlan(&cxt, pLogicSubplan); + + doSplitLogicPlan(&cxt, pLogicSubplan); SQueryLogicPlan* pLogicPlan = nullptr; doScaleOutLogicPlan(&cxt, pLogicSubplan, &pLogicPlan); @@ -164,13 +163,12 @@ class PlannerTestBaseImpl { SPlanContext cxt = {0}; setPlanContext(stmtEnv_.pQuery_, &cxt); - SLogicNode* pLogicNode = nullptr; - doCreateLogicPlan(&cxt, &pLogicNode); - - doOptimizeLogicPlan(&cxt, pLogicNode); - SLogicSubplan* pLogicSubplan = nullptr; - doSplitLogicPlan(&cxt, pLogicNode, &pLogicSubplan); + doCreateLogicPlan(&cxt, &pLogicSubplan); + + doOptimizeLogicPlan(&cxt, pLogicSubplan); + + doSplitLogicPlan(&cxt, pLogicSubplan); SQueryLogicPlan* pLogicPlan = nullptr; doScaleOutLogicPlan(&cxt, pLogicSubplan, &pLogicPlan); @@ -324,19 +322,19 @@ class PlannerTestBaseImpl { res_.ast_ = toString(pQuery->pRoot); } - void doCreateLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode) { - DO_WITH_THROW(createLogicPlan, pCxt, pLogicNode); - res_.rawLogicPlan_ = toString((SNode*)(*pLogicNode)); + void doCreateLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) { + DO_WITH_THROW(createLogicPlan, pCxt, pLogicSubplan); + res_.rawLogicPlan_ = toString((SNode*)(*pLogicSubplan)); } - void doOptimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode) { - DO_WITH_THROW(optimizeLogicPlan, pCxt, pLogicNode); - res_.optimizedLogicPlan_ = toString((SNode*)pLogicNode); + void doOptimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) { + DO_WITH_THROW(optimizeLogicPlan, pCxt, pLogicSubplan); + res_.optimizedLogicPlan_ = toString((SNode*)pLogicSubplan); } - void doSplitLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SLogicSubplan** pLogicSubplan) { - DO_WITH_THROW(splitLogicPlan, pCxt, pLogicNode, pLogicSubplan); - res_.splitLogicPlan_ = toString((SNode*)(*pLogicSubplan)); + void doSplitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) { + DO_WITH_THROW(splitLogicPlan, pCxt, pLogicSubplan); + res_.splitLogicPlan_ = toString((SNode*)(pLogicSubplan)); } void doScaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan) { From 317cc8fc13fcbd6e82ab4714e883f21bfff30840 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 10 Jun 2022 10:16:18 +0800 Subject: [PATCH 2/4] feat: sma index optimize --- include/libs/nodes/plannodes.h | 1 + include/libs/nodes/querynodes.h | 1 + include/util/tjson.h | 15 +- source/libs/nodes/src/nodesCodeFuncs.c | 82 ++++++++++ source/libs/parser/inc/parUtil.h | 3 +- source/libs/parser/src/parAstParser.c | 4 + source/libs/parser/src/parTranslater.c | 82 +++------- source/libs/parser/src/parUtil.c | 40 ++++- source/libs/parser/test/mockCatalog.cpp | 6 + .../libs/parser/test/mockCatalogService.cpp | 59 ++++++- source/libs/parser/test/mockCatalogService.h | 1 + source/libs/planner/src/planOptimizer.c | 144 ++++++++++++++++-- source/libs/planner/test/planOtherTest.cpp | 2 + source/libs/planner/test/planTestUtil.cpp | 3 + source/util/src/tjson.c | 53 ++++++- 15 files changed, 394 insertions(+), 102 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 00380724b3..b62854db43 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -62,6 +62,7 @@ typedef struct SScanLogicNode { int64_t watermark; int16_t tsColId; double filesFactor; + SArray* pSmaIndexes; } SScanLogicNode; typedef struct SJoinLogicNode { diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index c2aa86e89f..33cc4a9c86 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -144,6 +144,7 @@ typedef struct SRealTableNode { SVgroupsInfo* pVgroupList; char qualDbName[TSDB_DB_NAME_LEN]; // SHOW qualDbName.TABLES double ratio; + SArray* pSmaIndexes; } SRealTableNode; typedef struct STempTableNode { diff --git a/include/util/tjson.h b/include/util/tjson.h index 84f7b81726..df433227ca 100644 --- a/include/util/tjson.h +++ b/include/util/tjson.h @@ -17,16 +17,17 @@ #define _TD_UTIL_JSON_H_ #include "os.h" +#include "tarray.h" #ifdef __cplusplus extern "C" { #endif -#define tjsonGetNumberValue(pJson, pName, val, code) \ - do { \ - uint64_t _tmp = 0; \ +#define tjsonGetNumberValue(pJson, pName, val, code) \ + do { \ + uint64_t _tmp = 0; \ code = tjsonGetBigIntValue(pJson, pName, &_tmp); \ - val = _tmp; \ + val = _tmp; \ } while (0) typedef void SJson; @@ -66,18 +67,20 @@ typedef int32_t (*FToJson)(const void* pObj, SJson* pJson); int32_t tjsonAddObject(SJson* pJson, const char* pName, FToJson func, const void* pObj); int32_t tjsonAddItem(SJson* pJson, FToJson func, const void* pObj); int32_t tjsonAddArray(SJson* pJson, const char* pName, FToJson func, const void* pArray, int32_t itemSize, int32_t num); +int32_t tjsonAddTArray(SJson* pJson, const char* pName, FToJson func, const SArray* pArray); typedef int32_t (*FToObject)(const SJson* pJson, void* pObj); int32_t tjsonToObject(const SJson* pJson, const char* pName, FToObject func, void* pObj); int32_t tjsonMakeObject(const SJson* pJson, const char* pName, FToObject func, void** pObj, int32_t objSize); int32_t tjsonToArray(const SJson* pJson, const char* pName, FToObject func, void* pArray, int32_t itemSize); +int32_t tjsonToTArray(const SJson* pJson, const char* pName, FToObject func, SArray** pArray, int32_t itemSize); char* tjsonToString(const SJson* pJson); char* tjsonToUnformattedString(const SJson* pJson); -SJson* tjsonParse(const char* pStr); -bool tjsonValidateJson(const char* pJson); +SJson* tjsonParse(const char* pStr); +bool tjsonValidateJson(const char* pJson); const char* tjsonGetError(); #ifdef __cplusplus diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 0735092c1a..48840c4a8b 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2809,10 +2809,85 @@ static int32_t jsonToTableNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkTableIndexInfoIntervalUnit = "IntervalUnit"; +static const char* jkTableIndexInfoSlidingUnit = "SlidingUnit"; +static const char* jkTableIndexInfoInterval = "Interval"; +static const char* jkTableIndexInfoOffset = "Offset"; +static const char* jkTableIndexInfoSliding = "Sliding"; +static const char* jkTableIndexInfoDstTbUid = "DstTbUid"; +static const char* jkTableIndexInfoDstVgId = "DstVgId"; +static const char* jkTableIndexInfoEpSet = "EpSet"; +static const char* jkTableIndexInfoExpr = "Expr"; + +static int32_t tableIndexInfoToJson(const void* pObj, SJson* pJson) { + const STableIndexInfo* pNode = (const STableIndexInfo*)pObj; + + int32_t code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoIntervalUnit, pNode->intervalUnit); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoSlidingUnit, pNode->slidingUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoInterval, pNode->interval); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoOffset, pNode->offset); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoSliding, pNode->sliding); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoDstTbUid, pNode->dstTbUid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoDstVgId, pNode->dstVgId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkTableIndexInfoEpSet, epSetToJson, &pNode->epSet); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddStringToObject(pJson, jkTableIndexInfoExpr, pNode->expr); + } + + return code; +} + +static int32_t jsonToTableIndexInfo(const SJson* pJson, void* pObj) { + STableIndexInfo* pNode = (STableIndexInfo*)pObj; + + int32_t code = tjsonGetTinyIntValue(pJson, jkTableIndexInfoIntervalUnit, &pNode->intervalUnit); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkTableIndexInfoSlidingUnit, &pNode->slidingUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkTableIndexInfoInterval, &pNode->interval); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkTableIndexInfoOffset, &pNode->offset); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkTableIndexInfoSliding, &pNode->sliding); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkTableIndexInfoDstTbUid, &pNode->dstTbUid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkTableIndexInfoDstVgId, &pNode->dstVgId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToObject(pJson, jkTableIndexInfoEpSet, jsonToEpSet, &pNode->epSet); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonDupStringValue(pJson, jkTableIndexInfoExpr, &pNode->expr); + } + + return code; +} + static const char* jkRealTableMetaSize = "MetaSize"; static const char* jkRealTableMeta = "Meta"; static const char* jkRealTableVgroupsInfoSize = "VgroupsInfoSize"; static const char* jkRealTableVgroupsInfo = "VgroupsInfo"; +static const char* jkRealTableSmaIndexes = "SmaIndexes"; static int32_t realTableNodeToJson(const void* pObj, SJson* pJson) { const SRealTableNode* pNode = (const SRealTableNode*)pObj; @@ -2830,6 +2905,9 @@ static int32_t realTableNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkRealTableVgroupsInfo, vgroupsInfoToJson, pNode->pVgroupList); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddTArray(pJson, jkRealTableSmaIndexes, tableIndexInfoToJson, pNode->pSmaIndexes); + } return code; } @@ -2851,6 +2929,10 @@ static int32_t jsonToRealTableNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonMakeObject(pJson, jkRealTableVgroupsInfo, jsonToVgroupsInfo, (void**)&pNode->pVgroupList, objSize); } + if (TSDB_CODE_SUCCESS == code) { + code = + tjsonToTArray(pJson, jkRealTableSmaIndexes, jsonToTableIndexInfo, &pNode->pSmaIndexes, sizeof(STableIndexInfo)); + } return code; } diff --git a/source/libs/parser/inc/parUtil.h b/source/libs/parser/inc/parUtil.h index 42719d95bd..fb67a35368 100644 --- a/source/libs/parser/inc/parUtil.h +++ b/source/libs/parser/inc/parUtil.h @@ -46,7 +46,7 @@ typedef struct SParseMetaCache { SHashObj* pDbInfo; // key is tbFName, element is SDbInfo* SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass SHashObj* pUdf; // key is funcName, element is SFuncInfo* - SHashObj* pSmaIndex; // key is tbFName, element is SArray* + SHashObj* pTableIndex; // key is tbFName, element is SArray* } SParseMetaCache; int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...); @@ -76,6 +76,7 @@ int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pD SParseMetaCache* pMetaCache); int32_t reserveUserAuthInCacheExt(const char* pUser, const SName* pName, AUTH_TYPE type, SParseMetaCache* pMetaCache); int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache); +int32_t reserveTableIndexInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache); int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta); int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo); int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup); diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index 6555ec3a7d..6acc4575e7 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -129,6 +129,10 @@ static int32_t collectMetaKeyFromRealTableImpl(SCollectMetaKeyCxt* pCxt, SRealTa if (TSDB_CODE_SUCCESS == code) { code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pCxt->pMetaCache); } + if (TSDB_CODE_SUCCESS == code) { + code = reserveTableIndexInCache(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, + pCxt->pMetaCache); + } return code; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 90da79266f..c2fa1ffd22 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -258,24 +258,19 @@ static int32_t getUdfInfo(STranslateContext* pCxt, SFunctionNode* pFunc) { return code; } -static int32_t getTableIndex(STranslateContext* pCxt, const char* pDbName, const char* pTableName, SArray** pIndexes) { +static int32_t getTableIndex(STranslateContext* pCxt, const SName* pName, SArray** pIndexes) { SParseContext* pParCxt = pCxt->pParseCxt; - SName name; - toName(pParCxt->acctId, pDbName, pTableName, &name); - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = collectUseDatabase(pName, pCxt->pDbs); + if (TSDB_CODE_SUCCESS == code) { + code = collectUseTable(pName, pCxt->pTables); + } if (pParCxt->async) { - code = getTableIndexFromCache(pCxt->pMetaCache, &name, pIndexes); + code = getTableIndexFromCache(pCxt->pMetaCache, pName, pIndexes); } else { - code = collectUseDatabase(&name, pCxt->pDbs); - if (TSDB_CODE_SUCCESS == code) { - code = collectUseTable(&name, pCxt->pTables); - } - if (TSDB_CODE_SUCCESS == code) { - code = catalogGetTableIndex(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, &name, pIndexes); - } + code = catalogGetTableIndex(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pIndexes); } if (TSDB_CODE_SUCCESS != code) { - parserError("getTableIndex error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pDbName, pTableName); + parserError("getTableIndex error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname); } return code; } @@ -853,9 +848,9 @@ static EDealRes translateComparisonOperator(STranslateContext* pCxt, SOperatorNo } if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) { SNodeListNode* pRight = (SNodeListNode*)pOp->pRight; - bool first = true; - SDataType targetDt = {0}; - SNode* pNode = NULL; + bool first = true; + SDataType targetDt = {0}; + SNode* pNode = NULL; FOREACH(pNode, pRight->pNodeList) { SDataType dt = ((SExprNode*)pNode)->resType; if (first) { @@ -1409,6 +1404,9 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pRealTable->table.tableName); } code = setTableVgroupList(pCxt, &name, pRealTable); + if (TSDB_CODE_SUCCESS == code) { + code = getTableIndex(pCxt, &name, &pRealTable->pSmaIndexes); + } } pRealTable->table.precision = pRealTable->pMeta->tableInfo.precision; pRealTable->table.singleTable = isSingleTable(pRealTable); @@ -2018,33 +2016,8 @@ static int32_t rewriteTimelineFunc(STranslateContext* pCxt, SSelectStmt* pSelect nodesWalkSelectStmt(pSelect, SQL_CLAUSE_FROM, rewriteTimelineFuncImpl, pCxt); return pCxt->errCode; } -#if 0 -static bool mayBeApplySmaIndex(SSelectStmt* pSelect) { - if (NULL == pSelect->pWindow || QUERY_NODE_INTERVAL_WINDOW != nodeType(pSelect->pWindow) || - NULL != ((SIntervalWindowNode*)pSelect->pWindow)->pFill || - QUERY_NODE_REAL_TABLE != nodeType(pSelect->pFromTable) || NULL != pSelect->pWhere || - NULL != pSelect->pPartitionByList || NULL != pSelect->pGroupByList || NULL != pSelect->pHaving) { - return false; - } - return true; -} -static bool equalIntervalWindow(SIntervalWindowNode* pInterval, SNode* pWhere, STableIndexInfo* pIndex) { - int64_t interval = ((SValueNode*)pInterval->pInterval)->datum.i; - int8_t intervalUnit = ((SValueNode*)pInterval->pInterval)->unit; - int64_t offset = (NULL != pInterval->pOffset ? ((SValueNode*)pInterval->pOffset)->datum.i : 0); - int64_t sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : interval); - int8_t slidingUnit = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : intervalUnit); - if (interval != pIndex->interval || intervalUnit != pIndex->intervalUnit || offset != pIndex->offset || - sliding != pIndex->sliding || slidingUnit != pIndex->slidingUnit) { - return false; - } - // todo - if (NULL != pWhere) { - return false; - } - return true; -} +#if 0 typedef struct SSmaIndexMatchFuncsCxt { int32_t errCode; @@ -2056,10 +2029,10 @@ typedef struct SSmaIndexMatchFuncsCxt { bool match; } SSmaIndexMatchFuncsCxt; -static SColumnNode* createColumnFromSmaFunc(uint64_t tableId, int32_t index, SExprNode* pSmaFunc) { +static int32_t smaOptCreateSmaCol(SNode* pSmaFunc, int32_t index, SNode** pOutput) { SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN); if (NULL == pCol) { - return NULL; + return TSDB_CODE_SUCCESS; } pCol->tableId = tableId; pCol->tableType = TSDB_SUPER_TABLE; @@ -2070,7 +2043,7 @@ static SColumnNode* createColumnFromSmaFunc(uint64_t tableId, int32_t index, SEx strcpy(pCol->tableAlias, SMA_TABLE_NAME); pCol->node.resType = pSmaFunc->resType; strcpy(pCol->node.aliasName, pSmaFunc->aliasName); - return pCol; + return TSDB_CODE_SUCCESS; } static int32_t collectSmaFunc(SSmaIndexMatchFuncsCxt* pCxt, int32_t index, SNode* pSmaFunc) { @@ -2262,25 +2235,6 @@ static int32_t attemptApplySmaIndexImpl(STranslateContext* pCxt, SSelectStmt* pS return TSDB_CODE_SUCCESS; } -static void destroySmaIndex(void* p) { taosMemoryFree(((STableIndexInfo*)p)->expr); } - -static int32_t attemptApplySmaIndex(STranslateContext* pCxt, SSelectStmt* pSelect) { - SRealTableNode* pRealTable = (SRealTableNode*)pSelect->pFromTable; - SArray* pIndexes = NULL; - int32_t code = getTableIndex(pCxt, pRealTable->table.dbName, pRealTable->table.tableName, &pIndexes); - if (TSDB_CODE_SUCCESS == code) { - code = attemptApplySmaIndexImpl(pCxt, pSelect, pIndexes); - } - taosArrayDestroyEx(pIndexes, destroySmaIndex); - return code; -} - -static int32_t attemptApplyIndex(STranslateContext* pCxt, SSelectStmt* pSelect) { - // if (mayBeApplySmaIndex(pSelect)) { - // return attemptApplySmaIndex(pCxt, pSelect); - // } - return TSDB_CODE_SUCCESS; -} #endif static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) { diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 41bb0481ad..5bde056b5c 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -807,6 +807,42 @@ int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFun return code; } -int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes) { - return TSDB_CODE_PAR_INTERNAL_ERROR; +static void destroySmaIndex(void* p) { taosMemoryFree(((STableIndexInfo*)p)->expr); } + +static SArray* smaIndexesDup(SArray* pSrc) { + SArray* pDst = taosArrayDup(pSrc); + if (NULL == pDst) { + return NULL; + } + int32_t size = taosArrayGetSize(pDst); + for (int32_t i = 0; i < size; ++i) { + ((STableIndexInfo*)taosArrayGet(pDst, i))->expr = NULL; + } + for (int32_t i = 0; i < size; ++i) { + STableIndexInfo* pIndex = taosArrayGet(pDst, i); + pIndex->expr = taosMemoryStrDup(((STableIndexInfo*)taosArrayGet(pSrc, i))->expr); + if (NULL == pIndex->expr) { + taosArrayDestroyEx(pDst, destroySmaIndex); + return NULL; + } + } + return pDst; +} + +int32_t reserveTableIndexInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) { + return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableIndex); +} + +int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes) { + char fullName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(pName, fullName); + SArray* pSmaIndexes = NULL; + int32_t code = getMetaDataFromHash(fullName, strlen(fullName), pMetaCache->pTableIndex, (void**)&pSmaIndexes); + if (TSDB_CODE_SUCCESS == code) { + *pIndexes = smaIndexesDup(pSmaIndexes); + if (NULL == *pIndexes) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + return code; } diff --git a/source/libs/parser/test/mockCatalog.cpp b/source/libs/parser/test/mockCatalog.cpp index c7f0990132..0b2aacf5ef 100644 --- a/source/libs/parser/test/mockCatalog.cpp +++ b/source/libs/parser/test/mockCatalog.cpp @@ -214,6 +214,11 @@ int32_t __catalogRefreshGetTableMeta(SCatalog* pCatalog, void* pTransporter, con int32_t __catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) { return 0; } +int32_t __catalogGetTableIndex(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SName* pName, + SArray** pRes) { + return g_mockCatalogService->catalogGetTableIndex(pName, pRes); +} + void initMetaDataEnv() { g_mockCatalogService.reset(new MockCatalogService()); @@ -230,6 +235,7 @@ void initMetaDataEnv() { stub.set(catalogGetUdfInfo, __catalogGetUdfInfo); stub.set(catalogRefreshGetTableMeta, __catalogRefreshGetTableMeta); stub.set(catalogRemoveTableMeta, __catalogRemoveTableMeta); + stub.set(catalogGetTableIndex, __catalogGetTableIndex); // { // AddrAny any("libcatalog.so"); // std::map result; diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index 0c37c875c0..c730653e5e 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -149,6 +149,20 @@ class MockCatalogServiceImpl { return TSDB_CODE_SUCCESS; } + int32_t catalogGetTableIndex(const SName* pTableName, SArray** pIndexes) const { + char tbFName[TSDB_TABLE_FNAME_LEN] = {0}; + tNameExtractFullName(pTableName, tbFName); + auto it = index_.find(tbFName); + if (index_.end() == it) { + return TSDB_CODE_SUCCESS; + } + *pIndexes = taosArrayInit(it->second.size(), sizeof(STableIndexInfo)); + for (const auto& index : it->second) { + taosArrayPush(*pIndexes, &index); + } + return TSDB_CODE_SUCCESS; + } + int32_t catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const { int32_t code = getAllTableMeta(pCatalogReq->pTableMeta, &pMetaData->pTableMeta); if (TSDB_CODE_SUCCESS == code) { @@ -176,7 +190,7 @@ class MockCatalogServiceImpl { int32_t numOfColumns, int32_t numOfTags) { builder_ = TableBuilder::createTableBuilder(tableType, numOfColumns, numOfTags); meta_[db][tbname] = builder_->table(); - meta_[db][tbname]->schema->uid = id_++; + meta_[db][tbname]->schema->uid = getNextId(); return *(builder_.get()); } @@ -187,14 +201,11 @@ class MockCatalogServiceImpl { } meta_[db][tbname].reset(new MockTableMeta()); meta_[db][tbname]->schema = table.release(); - meta_[db][tbname]->schema->uid = id_++; + meta_[db][tbname]->schema->uid = getNextId(); meta_[db][tbname]->schema->tableType = TSDB_CHILD_TABLE; SVgroupInfo vgroup = {vgid, 0, 0, {0}, 0}; - addEpIntoEpSet(&vgroup.epSet, "dnode_1", 6030); - addEpIntoEpSet(&vgroup.epSet, "dnode_2", 6030); - addEpIntoEpSet(&vgroup.epSet, "dnode_3", 6030); - vgroup.epSet.inUse = 0; + genEpSet(&vgroup.epSet); meta_[db][tbname]->vgs.emplace_back(vgroup); // super table @@ -268,10 +279,39 @@ class MockCatalogServiceImpl { udf_.insert(std::make_pair(func, info)); } + void createSmaIndex(const SMCreateSmaReq* pReq) { + STableIndexInfo info; + info.intervalUnit = pReq->intervalUnit; + info.slidingUnit = pReq->slidingUnit; + info.interval = pReq->interval; + info.offset = pReq->offset; + info.sliding = pReq->sliding; + info.dstTbUid = getNextId(); + info.dstVgId = pReq->dstVgId; + genEpSet(&info.epSet); + info.expr = strdup(pReq->expr); + auto it = index_.find(pReq->stb); + if (index_.end() == it) { + index_.insert(std::make_pair(std::string(pReq->stb), std::vector{info})); + } else { + it->second.push_back(info); + } + } + private: typedef std::map> TableMetaCache; typedef std::map DbMetaCache; typedef std::map> UdfMetaCache; + typedef std::map> IndexMetaCache; + + uint64_t getNextId() { return id_++; } + + void genEpSet(SEpSet* pEpSet) { + addEpIntoEpSet(pEpSet, "dnode_1", 6030); + addEpIntoEpSet(pEpSet, "dnode_2", 6030); + addEpIntoEpSet(pEpSet, "dnode_3", 6030); + pEpSet->inUse = 0; + } std::string toDbname(const std::string& dbFullName) const { std::string::size_type n = dbFullName.find("."); @@ -467,6 +507,7 @@ class MockCatalogServiceImpl { std::unique_ptr builder_; DbMetaCache meta_; UdfMetaCache udf_; + IndexMetaCache index_; }; MockCatalogService::MockCatalogService() : impl_(new MockCatalogServiceImpl()) {} @@ -490,6 +531,8 @@ void MockCatalogService::createFunction(const std::string& func, int8_t funcType impl_->createFunction(func, funcType, outputType, outputLen, bufSize); } +void MockCatalogService::createSmaIndex(const SMCreateSmaReq* pReq) { impl_->createSmaIndex(pReq); } + int32_t MockCatalogService::catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const { return impl_->catalogGetTableMeta(pTableName, pTableMeta); } @@ -510,6 +553,10 @@ int32_t MockCatalogService::catalogGetUdfInfo(const std::string& funcName, SFunc return impl_->catalogGetUdfInfo(funcName, pInfo); } +int32_t MockCatalogService::catalogGetTableIndex(const SName* pTableName, SArray** pIndexes) const { + return impl_->catalogGetTableIndex(pTableName, pIndexes); +} + int32_t MockCatalogService::catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const { return impl_->catalogGetAllMeta(pCatalogReq, pMetaData); } diff --git a/source/libs/parser/test/mockCatalogService.h b/source/libs/parser/test/mockCatalogService.h index fafad81e95..c4ab091b7a 100644 --- a/source/libs/parser/test/mockCatalogService.h +++ b/source/libs/parser/test/mockCatalogService.h @@ -64,6 +64,7 @@ class MockCatalogService { int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const; int32_t catalogGetDBVgInfo(const char* pDbFName, SArray** pVgList) const; int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const; + int32_t catalogGetTableIndex(const SName* pTableName, SArray** pIndexes) const; int32_t catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const; private: diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index f252fe7265..76338eb491 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -32,8 +32,7 @@ typedef struct SOptimizeContext { bool optimized; } SOptimizeContext; -typedef int32_t (*FMatch)(SOptimizeContext* pCxt, SLogicNode* pLogicNode); -typedef int32_t (*FOptimize)(SOptimizeContext* pCxt, SLogicNode* pLogicNode); +typedef int32_t (*FOptimize)(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan); typedef struct SOptimizeRule { char* pName; @@ -109,7 +108,6 @@ static bool osdMayBeOptimized(SLogicNode* pNode) { } if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent)) { return true; - // return (WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType); } return !osdHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys); } @@ -231,9 +229,9 @@ static void setScanWindowInfo(SScanLogicNode* pScan) { } } -static int32_t osdOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { +static int32_t osdOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { SOsdInfo info = {0}; - int32_t code = osdMatch(pCxt, pLogicNode, &info); + int32_t code = osdMatch(pCxt, pLogicSubplan->pNode, &info); if (TSDB_CODE_SUCCESS == code && info.pScan) { setScanWindowInfo((SScanLogicNode*)info.pScan); } @@ -635,8 +633,8 @@ static int32_t cpdPushCondition(SOptimizeContext* pCxt, SLogicNode* pLogicNode) return code; } -static int32_t cpdOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { - return cpdPushCondition(pCxt, pLogicNode); +static int32_t cpdOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { + return cpdPushCondition(pCxt, pLogicSubplan->pNode); } static bool opkIsPrimaryKeyOrderBy(SNodeList* pSortKeys) { @@ -745,26 +743,142 @@ static int32_t opkOptimizeImpl(SOptimizeContext* pCxt, SSortLogicNode* pSort) { return code; } -static int32_t opkOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { - SSortLogicNode* pSort = (SSortLogicNode*)optFindPossibleNode(pLogicNode, opkSortMayBeOptimized); +static int32_t opkOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { + SSortLogicNode* pSort = (SSortLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, opkSortMayBeOptimized); if (NULL == pSort) { return TSDB_CODE_SUCCESS; } return opkOptimizeImpl(pCxt, pSort); } -static const SOptimizeRule optimizeRuleSet[] = {{.pName = "OptimizeScanData", .optimizeFunc = osdOptimize}, - {.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize}, - {.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize}}; +static int32_t smaOptFindSmaFunc(SNodeList* pSmaFuncs, SNode* pFunc, SNode** pCol, bool* pFound) { + int32_t index = 0; + SNode* pSmaFunc = NULL; + FOREACH(pSmaFunc, pSmaFuncs) { + if (nodesEqualNode(pSmaFunc, pFunc)) { + *pFound = true; + return collectSmaFunc(pCxt, index, pSmaFunc); + } + ++index; + } + return TSDB_CODE_SUCCESS; +} + +static bool smaOptCouldApplyIndex(SScanLogicNode* pScan, STableIndexInfo* pIndex) { + if (pScan->interval != pIndex->interval || pScan->intervalUnit != pIndex->intervalUnit || + pScan->offset != pIndex->offset || pScan->sliding != pIndex->sliding || + pScan->slidingUnit != pIndex->slidingUnit) { + return false; + } + // todo time range + SNodeList* pFuncs = NULL; + SNode* pFunc = NULL; + FOREACH(pFunc, ((SWindowLogicNode*)pScan->node.pParent)->pFuncs) {} + + return true; +} + +static bool smaOptMayBeOptimized(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode)) { + return false; + } + + SScanLogicNode* pScan = (SScanLogicNode*)pNode; + if (0 == pScan->interval || NULL == pScan->pSmaIndexes || NULL != pScan->node.pConditions) { + return false; + } + + int32_t size = taosArrayGetSize(pScan->pSmaIndexes); + for (int32_t i = 0; i < size; ++i) { + STableIndexInfo* pIndex = taosArrayGet(pScan->pSmaIndexes, i); + } + + return false; +} + +static int32_t smaOptCreateMerge(SNodeList* pTargets) { + SMergeLogicNode* pMerge = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE); + if (NULL == pMerge) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pMerge->node.precision = pPartChild->precision; + pMerge->pMergeKeys = pMergeKeys; + + int32_t code = TSDB_CODE_SUCCESS; + pMerge->pInputs = nodesCloneList(pPartChild->pTargets); + pMerge->node.pTargets = nodesCloneList(pTargets); + if (NULL == pMerge->node.pTargets || NULL == pMerge->pInputs) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + if (TSDB_CODE_SUCCESS == code) { + if (NULL == pSubplan) { + code = nodesListMakeAppend(&pSplitNode->pChildren, pMerge); + } else { + code = splReplaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge); + } + } + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyNode(pMerge); + } + return code; +} + +static int32_t smaOptCreateSmaScan(SScanLogicNode* pScan, STableIndexInfo* pIndex, SNodeList* pCols, + SScanLogicNode** pOutput) { + SScanLogicNode* pSmaScan = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SCAN); + if (NULL == pSmaScan) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pSmaScan->pScanCols = pCols; + pSmaScan->tableType = TSDB_SUPER_TABLE; + pSmaScan->tableId = pIndex->dstTbUid; + pSmaScan->stableId = pIndex->dstTbUid; + pSmaScan->scanType = SCAN_TYPE_TABLE; + pSmaScan->scanSeq[0] = pScan->scanSeq[0]; + pSmaScan->scanSeq[1] = pScan->scanSeq[1]; + pSmaScan->scanRange = pScan->scanRange; + pSmaScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD; + + pSmaScan->pVgroupList = taosMemoryCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo)); + if (NULL == pSmaScan->pVgroupList) { + nodesDestroyNode(pSmaScan); + return TSDB_CODE_OUT_OF_MEMORY; + } + pSmaScan->pVgroupList->numOfVgroups = 1; + pSmaScan->pVgroupList->vgroups[0].vgId = pIndex->dstVgId; + memcpy(&(pSmaScan->pVgroupList->vgroups[0].epSet), &pIndex->epSet, sizeof(SEpSet)); + + *pOutput = pSmaScan; + return TSDB_CODE_SUCCESS; +} + +static int32_t smaOptimizeImpl(SOptimizeContext* pCxt, SScanLogicNode* pScan) { return TSDB_CODE_SUCCESS; } + +static int32_t smaOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { + SScanLogicNode* pScan = (SScanLogicNode*)optFindPossibleNode(pLogicNode, opkSortMayBeOptimized); + if (NULL == pScan) { + return TSDB_CODE_SUCCESS; + } + return smaOptimizeImpl(pCxt, pScan); +} + +// clang-format off +static const SOptimizeRule optimizeRuleSet[] = { + {.pName = "OptimizeScanData", .optimizeFunc = osdOptimize}, + {.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize}, + {.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize}, + {.pName = "SmaIndex", .optimizeFunc = smaOptimize} +}; +// clang-format on static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimizeRule)); -static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicNode* pLogicNode) { +static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) { SOptimizeContext cxt = {.pPlanCxt = pCxt, .optimized = false}; do { cxt.optimized = false; for (int32_t i = 0; i < optimizeRuleNum; ++i) { - int32_t code = optimizeRuleSet[i].optimizeFunc(&cxt, pLogicNode); + int32_t code = optimizeRuleSet[i].optimizeFunc(&cxt, pLogicSubplan); if (TSDB_CODE_SUCCESS != code) { return code; } @@ -774,5 +888,5 @@ static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicNode* pLogicNode) { } int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) { - return applyOptimizeRule(pCxt, pLogicSubplan->pNode); + return applyOptimizeRule(pCxt, pLogicSubplan); } diff --git a/source/libs/planner/test/planOtherTest.cpp b/source/libs/planner/test/planOtherTest.cpp index 04d76a8b91..85f8b7d9f6 100644 --- a/source/libs/planner/test/planOtherTest.cpp +++ b/source/libs/planner/test/planOtherTest.cpp @@ -43,6 +43,8 @@ TEST_F(PlanOtherTest, createSmaIndex) { useDb("root", "test"); run("CREATE SMA INDEX idx1 ON t1 FUNCTION(MAX(c1), MIN(c3 + 10), SUM(c4)) INTERVAL(10s)"); + + run("SELECT SUM(c4) FROM t1 INTERVAL(10s)"); } TEST_F(PlanOtherTest, explain) { diff --git a/source/libs/planner/test/planTestUtil.cpp b/source/libs/planner/test/planTestUtil.cpp index 4e89f9d008..57d7cb6608 100644 --- a/source/libs/planner/test/planTestUtil.cpp +++ b/source/libs/planner/test/planTestUtil.cpp @@ -14,12 +14,14 @@ */ #include "planTestUtil.h" + #include #include #include #include "cmdnodes.h" +#include "mockCatalogService.h" #include "parser.h" #include "planInt.h" @@ -361,6 +363,7 @@ class PlannerTestBaseImpl { } else if (QUERY_NODE_CREATE_INDEX_STMT == nodeType(pQuery->pRoot)) { SMCreateSmaReq req = {0}; tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req); + g_mockCatalogService->createSmaIndex(&req); nodesStringToNode(req.ast, &pCxt->pAstRoot); pCxt->streamQuery = true; } else if (QUERY_NODE_CREATE_STREAM_STMT == nodeType(pQuery->pRoot)) { diff --git a/source/util/src/tjson.c b/source/util/src/tjson.c index b15c188f04..45a2ffec77 100644 --- a/source/util/src/tjson.c +++ b/source/util/src/tjson.c @@ -14,6 +14,7 @@ */ #define _DEFAULT_SOURCE + #include "tjson.h" #include "cJSON.h" #include "taoserror.h" @@ -138,6 +139,23 @@ int32_t tjsonAddArray(SJson* pJson, const char* pName, FToJson func, const void* return TSDB_CODE_SUCCESS; } +int32_t tjsonAddTArray(SJson* pJson, const char* pName, FToJson func, const SArray* pArray) { + int32_t num = taosArrayGetSize(pArray); + if (num > 0) { + SJson* pJsonArray = tjsonAddArrayToObject(pJson, pName); + if (NULL == pJsonArray) { + return TSDB_CODE_OUT_OF_MEMORY; + } + for (int32_t i = 0; i < num; ++i) { + int32_t code = tjsonAddItem(pJsonArray, func, taosArrayGet(pArray, i)); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + } + } + return TSDB_CODE_SUCCESS; +} + char* tjsonToString(const SJson* pJson) { return cJSON_Print((cJSON*)pJson); } char* tjsonToUnformattedString(const SJson* pJson) { return cJSON_PrintUnformatted((cJSON*)pJson); } @@ -184,7 +202,7 @@ int32_t tjsonGetBigIntValue(const SJson* pJson, const char* pName, int64_t* pVal return TSDB_CODE_FAILED; } #ifdef WINDOWS - sscanf(p,"%lld",pVal); + sscanf(p, "%lld", pVal); #else // sscanf(p,"%ld",pVal); *pVal = taosStr2Int64(p, NULL, 10); @@ -219,7 +237,7 @@ int32_t tjsonGetUBigIntValue(const SJson* pJson, const char* pName, uint64_t* pV return TSDB_CODE_FAILED; } #ifdef WINDOWS - sscanf(p,"%llu",pVal); + sscanf(p, "%llu", pVal); #else // sscanf(p,"%ld",pVal); *pVal = taosStr2UInt64(p, NULL, 10); @@ -299,24 +317,43 @@ int32_t tjsonToArray(const SJson* pJson, const char* pName, FToObject func, void return TSDB_CODE_SUCCESS; } +int32_t tjsonToTArray(const SJson* pJson, const char* pName, FToObject func, SArray** pArray, int32_t itemSize) { + const cJSON* jArray = tjsonGetObjectItem(pJson, pName); + int32_t size = tjsonGetArraySize(jArray); + if (size > 0) { + *pArray = taosArrayInit(size, itemSize); + if (NULL == *pArray) { + return TSDB_CODE_OUT_OF_MEMORY; + } + taosArraySetSize(*pArray, size); + for (int32_t i = 0; i < size; ++i) { + int32_t code = func(tjsonGetArrayItem(jArray, i), taosArrayGet(*pArray, i)); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + } + } + return TSDB_CODE_SUCCESS; +} + SJson* tjsonParse(const char* pStr) { return cJSON_Parse(pStr); } -bool tjsonValidateJson(const char *jIn) { - if (!jIn){ +bool tjsonValidateJson(const char* jIn) { + if (!jIn) { return false; } // set json real data - cJSON *root = cJSON_Parse(jIn); - if (root == NULL){ + cJSON* root = cJSON_Parse(jIn); + if (root == NULL) { return false; } - if(!cJSON_IsObject(root)){ + if (!cJSON_IsObject(root)) { return false; } int size = cJSON_GetArraySize(root); - for(int i = 0; i < size; i++) { + for (int i = 0; i < size; i++) { cJSON* item = cJSON_GetArrayItem(root, i); if (!item) { return false; From c31eb5231a7f5d3a2cc89a404e813b8cf0adbd35 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 10 Jun 2022 16:53:05 +0800 Subject: [PATCH 3/4] feat: sma index optimize --- source/libs/parser/src/parTranslater.c | 54 +------ source/libs/planner/inc/planInt.h | 1 + source/libs/planner/src/planOptimizer.c | 195 +++++++++++++++++------- source/libs/planner/src/planSpliter.c | 21 +-- source/libs/planner/src/planUtil.c | 17 +++ 5 files changed, 164 insertions(+), 124 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index c2fa1ffd22..8994556c67 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -25,9 +25,6 @@ #include "tglobal.h" #include "ttime.h" -#define SMA_TABLE_NAME "#sma_table" -#define SMA_COL_NAME_PREFIX "#sma_col_" - #define generateDealNodeErrMsg(pCxt, code, ...) \ (pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, code, ##__VA_ARGS__), DEAL_RES_ERROR) @@ -2029,22 +2026,7 @@ typedef struct SSmaIndexMatchFuncsCxt { bool match; } SSmaIndexMatchFuncsCxt; -static int32_t smaOptCreateSmaCol(SNode* pSmaFunc, int32_t index, SNode** pOutput) { - SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN); - if (NULL == pCol) { - return TSDB_CODE_SUCCESS; - } - pCol->tableId = tableId; - pCol->tableType = TSDB_SUPER_TABLE; - pCol->colId = index + 2; // skip timestamp primary col - pCol->colType = COLUMN_TYPE_COLUMN; - snprintf(pCol->colName, sizeof(pCol->colName), SMA_COL_NAME_PREFIX "%d", pCol->colId); - strcpy(pCol->tableName, SMA_TABLE_NAME); - strcpy(pCol->tableAlias, SMA_TABLE_NAME); - pCol->node.resType = pSmaFunc->resType; - strcpy(pCol->node.aliasName, pSmaFunc->aliasName); - return TSDB_CODE_SUCCESS; -} + static int32_t collectSmaFunc(SSmaIndexMatchFuncsCxt* pCxt, int32_t index, SNode* pSmaFunc) { if (NULL == pCxt->pUseMap) { @@ -2072,22 +2054,6 @@ static int32_t collectSmaFunc(SSmaIndexMatchFuncsCxt* pCxt, int32_t index, SNode return TSDB_CODE_SUCCESS; } -static int32_t findSmaFunc(SSmaIndexMatchFuncsCxt* pCxt, SNode* pNode, bool* pFound) { - if (!isAggFunc(pNode)) { - return TSDB_CODE_SUCCESS; - } - - int32_t index = 0; - SNode* pSmaFunc = NULL; - FOREACH(pSmaFunc, pCxt->pSmaFuncs) { - if (nodesEqualNode(pSmaFunc, pNode)) { - *pFound = true; - return collectSmaFunc(pCxt, index, pSmaFunc); - } - ++index; - } - return TSDB_CODE_SUCCESS; -} static EDealRes matchSmaFuncsImpl(SNode* pNode, void* pContext) { SSmaIndexMatchFuncsCxt* pCxt = pContext; @@ -2132,23 +2098,7 @@ static int32_t matchSmaFuncs(SSelectStmt* pSelect, STableIndexInfo* pIndex, SNod return cxt.errCode; } -static int32_t couldApplySmaIndex(STranslateContext* pCxt, SSelectStmt* pSelect, STableIndexInfo* pIndex, - SNodeList** pFuncs, SNodeList** pCols) { - if (!equalIntervalWindow((SIntervalWindowNode*)pSelect->pWindow, pSelect->pWhere, pIndex)) { - return TSDB_CODE_SUCCESS; - } - SNodeList* pSmaFuncs = NULL; - int32_t code = nodesStringToList(pIndex->expr, &pSmaFuncs); - if (TSDB_CODE_SUCCESS == code) { - pCxt->currClause = SQL_CLAUSE_SELECT; - code = translateExprList(pCxt, pSmaFuncs); - } - if (TSDB_CODE_SUCCESS == code) { - code = matchSmaFuncs(pSelect, pIndex, pSmaFuncs, pFuncs, pCols); - } - nodesDestroyList(pSmaFuncs); - return code; -} + typedef struct SSmaIndexRewriteFuncsCxt { int32_t errCode; diff --git a/source/libs/planner/inc/planInt.h b/source/libs/planner/inc/planInt.h index 6cfef2a7fb..f017f63404 100644 --- a/source/libs/planner/inc/planInt.h +++ b/source/libs/planner/inc/planInt.h @@ -37,6 +37,7 @@ extern "C" { int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...); int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList); +int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* pNew); int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan); int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan); diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 76338eb491..f80128b652 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -751,35 +751,10 @@ static int32_t opkOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) return opkOptimizeImpl(pCxt, pSort); } -static int32_t smaOptFindSmaFunc(SNodeList* pSmaFuncs, SNode* pFunc, SNode** pCol, bool* pFound) { - int32_t index = 0; - SNode* pSmaFunc = NULL; - FOREACH(pSmaFunc, pSmaFuncs) { - if (nodesEqualNode(pSmaFunc, pFunc)) { - *pFound = true; - return collectSmaFunc(pCxt, index, pSmaFunc); - } - ++index; - } - return TSDB_CODE_SUCCESS; -} - -static bool smaOptCouldApplyIndex(SScanLogicNode* pScan, STableIndexInfo* pIndex) { - if (pScan->interval != pIndex->interval || pScan->intervalUnit != pIndex->intervalUnit || - pScan->offset != pIndex->offset || pScan->sliding != pIndex->sliding || - pScan->slidingUnit != pIndex->slidingUnit) { - return false; - } - // todo time range - SNodeList* pFuncs = NULL; - SNode* pFunc = NULL; - FOREACH(pFunc, ((SWindowLogicNode*)pScan->node.pParent)->pFuncs) {} - - return true; -} - static bool smaOptMayBeOptimized(SLogicNode* pNode) { - if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode)) { + if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode) || NULL == pNode->pParent || + QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode->pParent) || + WINDOW_TYPE_INTERVAL != ((SWindowLogicNode*)pNode->pParent)->winType) { return false; } @@ -788,43 +763,44 @@ static bool smaOptMayBeOptimized(SLogicNode* pNode) { return false; } - int32_t size = taosArrayGetSize(pScan->pSmaIndexes); - for (int32_t i = 0; i < size; ++i) { - STableIndexInfo* pIndex = taosArrayGet(pScan->pSmaIndexes, i); - } - - return false; + return true; } -static int32_t smaOptCreateMerge(SNodeList* pTargets) { +static int32_t smaOptCreateMerge(SLogicNode* pChild, SNodeList* pMergeKeys, SNodeList* pTargets, SLogicNode** pOutput) { SMergeLogicNode* pMerge = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE); if (NULL == pMerge) { return TSDB_CODE_OUT_OF_MEMORY; } - pMerge->node.precision = pPartChild->precision; + pMerge->node.precision = pChild->precision; pMerge->pMergeKeys = pMergeKeys; - int32_t code = TSDB_CODE_SUCCESS; - pMerge->pInputs = nodesCloneList(pPartChild->pTargets); + pMerge->pInputs = nodesCloneList(pChild->pTargets); pMerge->node.pTargets = nodesCloneList(pTargets); if (NULL == pMerge->node.pTargets || NULL == pMerge->pInputs) { - code = TSDB_CODE_OUT_OF_MEMORY; + nodesDestroyNode(pMerge); + return TSDB_CODE_OUT_OF_MEMORY; + } + + *pOutput = (SLogicNode*)pMerge; + return TSDB_CODE_SUCCESS; +} + +static int32_t smaOptRecombinationNode(SLogicSubplan* pLogicSubplan, SLogicNode* pInterval, SLogicNode* pMerge, + SLogicNode* pSmaScan) { + int32_t code = nodesListMakeAppend(&pMerge->pChildren, pInterval); + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeAppend(&pMerge->pChildren, pSmaScan); } if (TSDB_CODE_SUCCESS == code) { - if (NULL == pSubplan) { - code = nodesListMakeAppend(&pSplitNode->pChildren, pMerge); - } else { - code = splReplaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge); - } - } - if (TSDB_CODE_SUCCESS != code) { - nodesDestroyNode(pMerge); + code = replaceLogicNode(pLogicSubplan, pInterval, pMerge); + pSmaScan->pParent = pMerge; + pInterval->pParent = pMerge; } return code; } static int32_t smaOptCreateSmaScan(SScanLogicNode* pScan, STableIndexInfo* pIndex, SNodeList* pCols, - SScanLogicNode** pOutput) { + SLogicNode** pOutput) { SScanLogicNode* pSmaScan = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SCAN); if (NULL == pSmaScan) { return TSDB_CODE_OUT_OF_MEMORY; @@ -848,18 +824,131 @@ static int32_t smaOptCreateSmaScan(SScanLogicNode* pScan, STableIndexInfo* pInde pSmaScan->pVgroupList->vgroups[0].vgId = pIndex->dstVgId; memcpy(&(pSmaScan->pVgroupList->vgroups[0].epSet), &pIndex->epSet, sizeof(SEpSet)); - *pOutput = pSmaScan; + *pOutput = (SLogicNode*)pSmaScan; return TSDB_CODE_SUCCESS; } -static int32_t smaOptimizeImpl(SOptimizeContext* pCxt, SScanLogicNode* pScan) { return TSDB_CODE_SUCCESS; } +static bool smaOptEqualInterval(SWindowLogicNode* pWindow, STableIndexInfo* pIndex) { + if (pWindow->interval != pIndex->interval || pWindow->intervalUnit != pIndex->intervalUnit || + pWindow->offset != pIndex->offset || pWindow->sliding != pIndex->sliding || + pWindow->slidingUnit != pIndex->slidingUnit) { + return false; + } + // todo time range + return true; +} -static int32_t smaOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { - SScanLogicNode* pScan = (SScanLogicNode*)optFindPossibleNode(pLogicNode, opkSortMayBeOptimized); +// #define SMA_TABLE_NAME "#sma_table" +// #define SMA_COL_NAME_PREFIX "#sma_col_" + +static SNode* smaOptCreateSmaCol(SNode* pFunc, uint64_t tableId, int32_t colId) { + SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == pCol) { + return NULL; + } + pCol->tableId = tableId; + pCol->tableType = TSDB_SUPER_TABLE; + pCol->colId = colId; + pCol->colType = COLUMN_TYPE_COLUMN; + snprintf(pCol->colName, sizeof(pCol->colName), "#sma_col_%d", pCol->colId); + // strcpy(pCol->tableName, SMA_TABLE_NAME); + // strcpy(pCol->tableAlias, SMA_TABLE_NAME); + pCol->node.resType = ((SExprNode*)pFunc)->resType; + strcpy(pCol->node.aliasName, ((SExprNode*)pFunc)->aliasName); + return (SNode*)pCol; +} + +static int32_t smaOptFindSmaFunc(SNode* pQueryFunc, SNodeList* pSmaFuncs) { + int32_t index = 0; + SNode* pSmaFunc = NULL; + FOREACH(pSmaFunc, pSmaFuncs) { + if (nodesEqualNode(pQueryFunc, pSmaFunc)) { + return index; + } + ++index; + } + return -1; +} + +static int32_t smaOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeList* pSmaFuncs, SNodeList** pOutput, + int32_t* pWStrartIndex) { + SNodeList* pCols = NULL; + SNode* pFunc = NULL; + int32_t code = TSDB_CODE_SUCCESS; + int32_t index = 0; + FOREACH(pFunc, pFuncs) { + if (FUNCTION_TYPE_WSTARTTS == ((SFunctionNode*)pFunc)->funcType) { + *pWStrartIndex = index; + } + int32_t smaFuncIndex = smaOptFindSmaFunc(pFunc, pSmaFuncs); + if (smaFuncIndex < 0) { + break; + } else { + code = nodesListMakeStrictAppend(&pCols, smaOptCreateSmaCol(pFunc, tableId, smaFuncIndex + 2)); + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + ++index; + } + + if (TSDB_CODE_SUCCESS == code) { + *pOutput = pCols; + } else { + nodesDestroyList(pCols); + } + + return code; +} + +static int32_t smaOptCouldApplyIndex(SWindowLogicNode* pWindow, STableIndexInfo* pIndex, SNodeList** pCols, + int32_t* pWStrartIndex) { + if (!smaOptEqualInterval(pWindow, pIndex)) { + return TSDB_CODE_SUCCESS; + } + SNodeList* pSmaFuncs = NULL; + int32_t code = nodesStringToList(pIndex->expr, &pSmaFuncs); + if (TSDB_CODE_SUCCESS == code) { + code = smaOptCreateSmaCols(pWindow->pFuncs, pIndex->dstTbUid, pSmaFuncs, pCols, pWStrartIndex); + } + nodesDestroyList(pSmaFuncs); + return code; +} + +static int32_t smaOptApplyIndex(SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan, STableIndexInfo* pIndex, + SNodeList* pSmaCols) { + SLogicNode* pSmaScan = NULL; + SLogicNode* pMerge = NULL; + int32_t code = smaOptCreateSmaScan(pScan, pIndex, pSmaCols, &pSmaScan); + if (TSDB_CODE_SUCCESS == code) { + code = smaOptCreateMerge(pScan->node.pParent, NULL, pSmaCols, &pMerge); + } + if (TSDB_CODE_SUCCESS == code) { + code = smaOptRecombinationNode(pLogicSubplan, pScan->node.pParent, pMerge, pSmaScan); + } + return code; +} + +static int32_t smaOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan) { + int32_t nindexes = taosArrayGetSize(pScan->pSmaIndexes); + for (int32_t i = 0; i < nindexes; ++i) { + STableIndexInfo* pIndex = taosArrayGet(pScan->pSmaIndexes, i); + SNodeList* pSmaCols = NULL; + int32_t wstrartIndex = -1; + int32_t code = smaOptCouldApplyIndex((SWindowLogicNode*)pScan->node.pParent, pIndex, &pSmaCols, &wstrartIndex); + if (TSDB_CODE_SUCCESS == code && NULL != pSmaCols) { + code = smaOptApplyIndex(pLogicSubplan, pScan, pIndex, pSmaCols); + } + } + return TSDB_CODE_SUCCESS; +} + +static int32_t smaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { + SScanLogicNode* pScan = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, smaOptMayBeOptimized); if (NULL == pScan) { return TSDB_CODE_SUCCESS; } - return smaOptimizeImpl(pCxt, pScan); + return smaOptimizeImpl(pCxt, pLogicSubplan, pScan); } // clang-format off diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index aebc0c14f4..e00ccf3be3 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -80,29 +80,12 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SE return TSDB_CODE_SUCCESS; } -static int32_t splReplaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* pNew) { - if (NULL == pOld->pParent) { - pSubplan->pNode = (SLogicNode*)pNew; - return TSDB_CODE_SUCCESS; - } - - SNode* pNode; - FOREACH(pNode, pOld->pParent->pChildren) { - if (nodesEqualNode(pNode, pOld)) { - REPLACE_NODE(pNew); - pNew->pParent = pOld->pParent; - return TSDB_CODE_SUCCESS; - } - } - return TSDB_CODE_PLAN_INTERNAL_ERROR; -} - static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode, ESubplanType subplanType) { SExchangeLogicNode* pExchange = NULL; int32_t code = splCreateExchangeNode(pCxt, pSplitNode, &pExchange); if (TSDB_CODE_SUCCESS == code) { - code = splReplaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pExchange); + code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pExchange); } if (TSDB_CODE_SUCCESS == code) { pSubplan->subplanType = subplanType; @@ -322,7 +305,7 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla if (NULL == pSubplan) { code = nodesListMakeAppend(&pSplitNode->pChildren, pMerge); } else { - code = splReplaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge); + code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge); } } if (TSDB_CODE_SUCCESS != code) { diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index 63d31912f0..a4ca864015 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -85,3 +85,20 @@ int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList) { } return cxt.errCode; } + +int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* pNew) { + if (NULL == pOld->pParent) { + pSubplan->pNode = (SLogicNode*)pNew; + return TSDB_CODE_SUCCESS; + } + + SNode* pNode; + FOREACH(pNode, pOld->pParent->pChildren) { + if (nodesEqualNode(pNode, pOld)) { + REPLACE_NODE(pNew); + pNew->pParent = pOld->pParent; + return TSDB_CODE_SUCCESS; + } + } + return TSDB_CODE_PLAN_INTERNAL_ERROR; +} From 58808db4cd48009a8849b4e0e9ede2b01dde86ef Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 11 Jun 2022 09:25:18 +0800 Subject: [PATCH 4/4] feat: sma index optimize --- include/libs/nodes/plannodes.h | 3 +-- source/libs/catalog/src/ctgAsync.c | 6 ++++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index f7891ba6c2..1a3bfdbb04 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -304,7 +304,7 @@ typedef struct SDownstreamSourceNode { typedef struct SExchangePhysiNode { SPhysiNode node; - int32_t srcGroupId; // group id of datasource suplans + int32_t srcGroupId; // group id of datasource suplans bool singleChannel; SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode } SExchangePhysiNode; @@ -438,7 +438,6 @@ typedef struct SQueryPlan { int32_t numOfSubplans; SNodeList* pSubplans; // Element is SNodeListNode. The execution level of subplan, starting from 0. SExplainInfo explainInfo; - SArray* pPlaceholderValues; } SQueryPlan; void nodesWalkPhysiPlan(SNode* pNode, FNodeWalker walker, void* pContext); diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 2574528d15..f61a3637ed 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -525,7 +525,7 @@ int32_t ctgDumpTbIndexRes(SCtgTask* pTask) { } SMetaRes res = {.code = pTask->code, .pRes = pTask->res}; - taosArrayPush(pJob->jobRes.pTableHash, &res); + taosArrayPush(pJob->jobRes.pTableIndex, &res); return TSDB_CODE_SUCCESS; } @@ -875,7 +875,9 @@ int32_t ctgHandleGetTbIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf TSWAP(pTask->res, pTask->msgCtx.out); _return: - + if (TSDB_CODE_MND_DB_INDEX_NOT_EXIST == code) { + code = TSDB_CODE_SUCCESS; + } ctgHandleTaskEnd(pTask, code); CTG_RET(code);