From cd36807faf56d46067e013bc1d937d33f2653aaa Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Mon, 27 Jun 2022 21:30:20 +0800 Subject: [PATCH] feat: refactor the plan implementation of unique function --- include/libs/nodes/plannodes.h | 1 + include/libs/nodes/querynodes.h | 1 + include/libs/planner/planner.h | 2 + include/libs/qcom/query.h | 6 +- source/libs/function/src/builtins.c | 26 +- source/libs/nodes/src/nodesCloneFuncs.c | 1 + source/libs/nodes/src/nodesUtilFuncs.c | 3 +- source/libs/parser/src/parTranslater.c | 101 +------ source/libs/planner/src/planLogicCreater.c | 45 +++- source/libs/planner/src/planOptimizer.c | 247 +++++++++++++++++- source/libs/planner/src/planSpliter.c | 2 +- source/libs/planner/src/planner.c | 5 + source/libs/planner/test/planBasicTest.cpp | 2 + source/libs/planner/test/planOptimizeTest.cpp | 6 +- tests/system-test/2-query/unique.py | 2 +- 15 files changed, 325 insertions(+), 125 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 0bd917a9c6..4671c8b81e 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -103,6 +103,7 @@ typedef struct SIndefRowsFuncLogicNode { SLogicNode node; SNodeList* pFuncs; bool isTailFunc; + bool isUniqueFunc; } SIndefRowsFuncLogicNode; typedef struct SInterpFuncLogicNode { diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 39c569acef..56d0a3f9b9 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -50,6 +50,7 @@ typedef struct SExprNode { char aliasName[TSDB_COL_NAME_LEN]; char userAlias[TSDB_COL_NAME_LEN]; SArray* pAssociation; + bool orderAlias; } SExprNode; typedef enum EColumnType { COLUMN_TYPE_COLUMN = 1, COLUMN_TYPE_TAG, COLUMN_TYPE_TBNAME } EColumnType; diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index b350837551..727cdd8ad6 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -48,6 +48,8 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo // @pSource one execution location of this group of datasource subplans int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstreamSourceNode* pSource); +int32_t qClearSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId); + // Convert to subplan to string for the scheduler to send to the executor int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen); int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan); diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 0b767e96f6..9e8ce3ffb6 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -271,19 +271,19 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define qDebug(...) \ do { \ if (qDebugFlag & DEBUG_DEBUG) { \ - taosPrintLog("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \ + taosPrintLog("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ } \ } while (0) #define qTrace(...) \ do { \ if (qDebugFlag & DEBUG_TRACE) { \ - taosPrintLog("QRY ", DEBUG_TRACE, qDebugFlag, __VA_ARGS__); \ + taosPrintLog("QRY ", DEBUG_TRACE, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ } \ } while (0) #define qDebugL(...) \ do { \ if (qDebugFlag & DEBUG_DEBUG) { \ - taosPrintLongString("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \ + taosPrintLongString("QRY ", DEBUG_DEBUG, tsLogEmbedded ? 255 : qDebugFlag, __VA_ARGS__); \ } \ } while (0) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index d41bc89a5f..168df674d0 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1220,19 +1220,19 @@ static int32_t translateSubstr(SFunctionNode* pFunc, char* pErrBuf, int32_t len) static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // The number of parameters has been limited by the syntax definition - //uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + // uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; // The function return type has been set during syntax parsing uint8_t para2Type = pFunc->node.resType.type; - //if (para2Type != TSDB_DATA_TYPE_BIGINT && para2Type != TSDB_DATA_TYPE_UBIGINT && - // para2Type != TSDB_DATA_TYPE_VARCHAR && para2Type != TSDB_DATA_TYPE_NCHAR && - // para2Type != TSDB_DATA_TYPE_TIMESTAMP) { - // return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); - //} - //if ((para2Type == TSDB_DATA_TYPE_TIMESTAMP && IS_VAR_DATA_TYPE(para1Type)) || - // (para2Type == TSDB_DATA_TYPE_BINARY && para1Type == TSDB_DATA_TYPE_NCHAR)) { - // return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); - //} + // if (para2Type != TSDB_DATA_TYPE_BIGINT && para2Type != TSDB_DATA_TYPE_UBIGINT && + // para2Type != TSDB_DATA_TYPE_VARCHAR && para2Type != TSDB_DATA_TYPE_NCHAR && + // para2Type != TSDB_DATA_TYPE_TIMESTAMP) { + // return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + // } + // if ((para2Type == TSDB_DATA_TYPE_TIMESTAMP && IS_VAR_DATA_TYPE(para1Type)) || + // (para2Type == TSDB_DATA_TYPE_BINARY && para1Type == TSDB_DATA_TYPE_NCHAR)) { + // return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + // } int32_t para2Bytes = pFunc->node.resType.bytes; if (IS_VAR_DATA_TYPE(para2Type)) { @@ -1882,7 +1882,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "first", .type = FUNCTION_TYPE_FIRST, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, .translateFunc = translateFirstLast, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -1917,7 +1917,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "last", .type = FUNCTION_TYPE_LAST, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, .translateFunc = translateFirstLast, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, @@ -2109,7 +2109,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "unique", .type = FUNCTION_TYPE_UNIQUE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | + .classification = FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_WINDOW_FUNC | FUNC_MGT_FORBID_GROUP_BY_FUNC, .translateFunc = translateUnique, .getEnvFunc = getUniqueFuncEnv, diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 9d8baf472b..b372bf75fc 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -100,6 +100,7 @@ static int32_t exprNodeCopy(const SExprNode* pSrc, SExprNode* pDst) { COPY_OBJECT_FIELD(resType, sizeof(SDataType)); COPY_CHAR_ARRAY_FIELD(aliasName); COPY_CHAR_ARRAY_FIELD(userAlias); + COPY_SCALAR_FIELD(orderAlias); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index be6faa92cb..dc9d9b92ee 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1500,7 +1500,8 @@ typedef struct SCollectFuncsCxt { static EDealRes collectFuncs(SNode* pNode, void* pContext) { SCollectFuncsCxt* pCxt = (SCollectFuncsCxt*)pContext; - if (QUERY_NODE_FUNCTION == nodeType(pNode) && pCxt->classifier(((SFunctionNode*)pNode)->funcId)) { + if (QUERY_NODE_FUNCTION == nodeType(pNode) && pCxt->classifier(((SFunctionNode*)pNode)->funcId) && + !(((SExprNode*)pNode)->orderAlias)) { pCxt->errCode = nodesListStrictAppend(pCxt->pFuncs, nodesCloneNode(pNode)); return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR); } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 45240536e3..7106f8df96 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1355,25 +1355,6 @@ static EDealRes rewriteColToSelectValFunc(STranslateContext* pCxt, SNode** pNode return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR; } -static EDealRes rewriteExprToGroupKeyFunc(STranslateContext* pCxt, SNode** pNode) { - SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); - if (NULL == pFunc) { - pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; - return DEAL_RES_ERROR; - } - - strcpy(pFunc->functionName, "_group_key"); - strcpy(pFunc->node.aliasName, ((SExprNode*)*pNode)->aliasName); - pCxt->errCode = nodesListMakeAppend(&pFunc->pParameterList, *pNode); - if (TSDB_CODE_SUCCESS == pCxt->errCode) { - *pNode = (SNode*)pFunc; - pCxt->errCode = fmGetFuncInfo(pFunc, pCxt->msgBuf.buf, pCxt->msgBuf.len); - } - pCxt->pCurrSelectStmt->hasAggFuncs = true; - - return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR); -} - static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) { SCheckExprForGroupByCxt* pCxt = (SCheckExprForGroupByCxt*)pContext; if (!nodesIsExprNode(*pNode) || isAliasColumn(*pNode)) { @@ -1393,7 +1374,13 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) { SNode* pGroupNode = NULL; FOREACH(pGroupNode, getGroupByList(pCxt->pTranslateCxt)) { if (nodesEqualNode(getGroupByNode(pGroupNode), *pNode)) { - return rewriteExprToGroupKeyFunc(pCxt->pTranslateCxt, pNode); + return DEAL_RES_IGNORE_CHILD; + } + } + SNode* pPartKey = NULL; + FOREACH(pPartKey, pCxt->pTranslateCxt->pCurrSelectStmt->pPartitionByList) { + if (nodesEqualNode(pPartKey, *pNode)) { + return DEAL_RES_IGNORE_CHILD; } } if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) { @@ -1451,25 +1438,6 @@ static int32_t rewriteColsToSelectValFunc(STranslateContext* pCxt, SSelectStmt* return pCxt->errCode; } -static EDealRes rewriteExprsToGroupKeyFuncImpl(SNode** pNode, void* pContext) { - STranslateContext* pCxt = pContext; - SNode* pPartKey = NULL; - FOREACH(pPartKey, pCxt->pCurrSelectStmt->pPartitionByList) { - if (nodesEqualNode(pPartKey, *pNode)) { - return rewriteExprToGroupKeyFunc(pCxt, pNode); - } - } - return DEAL_RES_CONTINUE; -} - -static int32_t rewriteExprsToGroupKeyFunc(STranslateContext* pCxt, SSelectStmt* pSelect) { - nodesRewriteExprs(pSelect->pProjectionList, rewriteExprsToGroupKeyFuncImpl, pCxt); - if (TSDB_CODE_SUCCESS == pCxt->errCode && !pSelect->isDistinct) { - nodesRewriteExprs(pSelect->pOrderByList, rewriteExprsToGroupKeyFuncImpl, pCxt); - } - return pCxt->errCode; -} - typedef struct CheckAggColCoexistCxt { STranslateContext* pTranslateCxt; bool existAggFunc; @@ -1529,9 +1497,6 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) if (cxt.existIndefiniteRowsFunc && cxt.existCol) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC); } - if (cxt.existAggFunc && NULL != pSelect->pPartitionByList) { - return rewriteExprsToGroupKeyFunc(pCxt, pSelect); - } return TSDB_CODE_SUCCESS; } @@ -2408,54 +2373,6 @@ static EDealRes rewriteSeletcValueFunc(STranslateContext* pCxt, SNode** pNode) { return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR; } -static EDealRes rewriteUniqueFunc(SNode** pNode, void* pContext) { - SRwriteUniqueCxt* pCxt = pContext; - if (QUERY_NODE_FUNCTION == nodeType(*pNode)) { - SFunctionNode* pFunc = (SFunctionNode*)*pNode; - if (FUNCTION_TYPE_UNIQUE == pFunc->funcType) { - SNode* pExpr = nodesListGetNode(pFunc->pParameterList, 0); - NODES_CLEAR_LIST(pFunc->pParameterList); - strcpy(((SExprNode*)pExpr)->aliasName, ((SExprNode*)*pNode)->aliasName); - nodesDestroyNode(*pNode); - *pNode = pExpr; - pCxt->pExpr = pExpr; - return DEAL_RES_IGNORE_CHILD; - } else if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType) { - return rewriteSeletcValueFunc(pCxt->pTranslateCxt, pNode); - } - } - return DEAL_RES_CONTINUE; -} - -static SNode* createGroupingSet(SNode* pExpr) { - SGroupingSetNode* pGroupingSet = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET); - if (NULL == pGroupingSet) { - return NULL; - } - pGroupingSet->groupingSetType = GP_TYPE_NORMAL; - if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pGroupingSet->pParameterList, nodesCloneNode(pExpr))) { - nodesDestroyNode((SNode*)pGroupingSet); - return NULL; - } - return (SNode*)pGroupingSet; -} - -// from: select unique(expr), col1 + col2 from t where_clause partition_by_clause order_by_clause ... -// to: select expr, first(col1) + first(col2) from t where_clause partition_by_clause group by expr order_by_clause ... -static int32_t rewriteUniqueStmt(STranslateContext* pCxt, SSelectStmt* pSelect) { - if (!pSelect->hasUniqueFunc) { - return TSDB_CODE_SUCCESS; - } - - SRwriteUniqueCxt cxt = {.pTranslateCxt = pCxt, .pExpr = NULL}; - nodesRewriteExprs(pSelect->pProjectionList, rewriteUniqueFunc, &cxt); - if (TSDB_CODE_SUCCESS == cxt.pTranslateCxt->errCode) { - cxt.pTranslateCxt->errCode = nodesListMakeStrictAppend(&pSelect->pGroupByList, createGroupingSet(cxt.pExpr)); - } - pSelect->hasIndefiniteRowsFunc = false; - return cxt.pTranslateCxt->errCode; -} - typedef struct SReplaceOrderByAliasCxt { STranslateContext* pTranslateCxt; SNodeList* pProjectionList; @@ -2474,6 +2391,7 @@ static EDealRes replaceOrderByAliasImpl(SNode** pNode, void* pContext) { pCxt->pTranslateCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; return DEAL_RES_ERROR; } + ((SExprNode*)pNew)->orderAlias = true; nodesDestroyNode(*pNode); *pNode = pNew; return DEAL_RES_CONTINUE; @@ -2529,9 +2447,6 @@ static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect if (TSDB_CODE_SUCCESS == code) { code = translateInterp(pCxt, pSelect); } - if (TSDB_CODE_SUCCESS == code) { - code = rewriteUniqueStmt(pCxt, pSelect); - } if (TSDB_CODE_SUCCESS == code) { code = rewriteTimelineFunc(pCxt, pSelect); } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 396a7b6193..ef8b109b62 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -437,6 +437,33 @@ static SColumnNode* createColumnByExpr(const char* pStmtName, SExprNode* pExpr) return pCol; } +static SNode* createGroupingSetNode(SNode* pExpr) { + SGroupingSetNode* pGroupingSet = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET); + if (NULL == pGroupingSet) { + return NULL; + } + pGroupingSet->groupingSetType = GP_TYPE_NORMAL; + if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pGroupingSet->pParameterList, nodesCloneNode(pExpr))) { + nodesDestroyNode((SNode*)pGroupingSet); + return NULL; + } + return (SNode*)pGroupingSet; +} + +static int32_t createGroupKeysFromPartKeys(SNodeList* pPartKeys, SNodeList** pOutput) { + SNodeList* pGroupKeys = NULL; + SNode* pPartKey = NULL; + FOREACH(pPartKey, pPartKeys) { + int32_t code = nodesListMakeStrictAppend(&pGroupKeys, createGroupingSetNode(pPartKey)); + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyList(pGroupKeys); + return code; + } + } + *pOutput = pGroupKeys; + return TSDB_CODE_SUCCESS; +} + static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { if (!pSelect->hasAggFuncs && NULL == pSelect->pGroupByList) { return TSDB_CODE_SUCCESS; @@ -459,10 +486,18 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, code = rewriteExprsForSelect(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY); } + if (NULL != pSelect->pPartitionByList) { + code = createGroupKeysFromPartKeys(pSelect->pPartitionByList, &pAgg->pGroupKeys); + } + if (NULL != pSelect->pGroupByList) { - pAgg->pGroupKeys = nodesCloneList(pSelect->pGroupByList); - if (NULL == pAgg->pGroupKeys) { - code = TSDB_CODE_OUT_OF_MEMORY; + if (NULL != pAgg->pGroupKeys) { + code = nodesListStrictAppendList(pAgg->pGroupKeys, nodesCloneList(pSelect->pGroupByList)); + } else { + pAgg->pGroupKeys = nodesCloneList(pSelect->pGroupByList); + if (NULL == pAgg->pGroupKeys) { + code = TSDB_CODE_OUT_OF_MEMORY; + } } } @@ -508,6 +543,7 @@ static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt } pIdfRowsFunc->isTailFunc = pSelect->hasTailFunc; + pIdfRowsFunc->isUniqueFunc = pSelect->hasUniqueFunc; // indefinite rows functions and _select_values functions int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, fmIsVectorFunc, &pIdfRowsFunc->pFuncs); @@ -809,7 +845,8 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel } static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { - if (NULL == pSelect->pPartitionByList) { + if (NULL == pSelect->pPartitionByList || (pSelect->hasAggFuncs && NULL == pSelect->pWindow) || + NULL != pSelect->pGroupByList) { return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 53c9ef8dd0..29e25d6311 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -987,6 +987,7 @@ static int32_t smaIndexOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogi code = smaIndexOptApplyIndex(pLogicSubplan, pScan, pIndex, pSmaCols, wstrartIndex); taosArrayDestroyEx(pScan->pSmaIndexes, smaIndexOptDestroySmaIndex); pScan->pSmaIndexes = NULL; + pCxt->optimized = true; break; } } @@ -1064,6 +1065,43 @@ static int32_t partTagsOptRebuildTbanme(SNodeList* pPartKeys) { return code; } +static SNode* partTagsCreateGroupKeyFunc(SNode* pNode) { + SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); + if (NULL == pFunc) { + return NULL; + } + + strcpy(pFunc->functionName, "_group_key"); + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + SColumnNode* pCol = (SColumnNode*)pNode; + snprintf(pFunc->node.aliasName, sizeof(pFunc->node.aliasName), "%s.%s", pCol->tableAlias, pCol->colName); + } else { + strcpy(pFunc->node.aliasName, ((SExprNode*)pNode)->aliasName); + } + int32_t code = nodesListMakeStrictAppend(&pFunc->pParameterList, nodesCloneNode(pNode)); + if (TSDB_CODE_SUCCESS == code) { + code = fmGetFuncInfo(pFunc, NULL, 0); + } + + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyNode((SNode*)pFunc); + return NULL; + } + + return (SNode*)pFunc; +} + +static int32_t partTagsRewriteGroupTagsToGroupKeyFuncs(SNodeList* pGroupTags, SNodeList* pAggFuncs) { + SNode* pNode = NULL; + FOREACH(pNode, pGroupTags) { + int32_t code = nodesListStrictAppend(pAggFuncs, partTagsCreateGroupKeyFunc(pNode)); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + } + return TSDB_CODE_SUCCESS; +} + static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { SLogicNode* pNode = optFindPossibleNode(pLogicSubplan->pNode, partTagsOptMayBeOptimized); if (NULL == pNode) { @@ -1080,15 +1118,17 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub nodesDestroyNode((SNode*)pNode); } } else { - SNode* pGroupKey = NULL; - FOREACH(pGroupKey, ((SAggLogicNode*)pNode)->pGroupKeys) { + SAggLogicNode* pAgg = (SAggLogicNode*)pNode; + SNode* pGroupKey = NULL; + FOREACH(pGroupKey, pAgg->pGroupKeys) { code = nodesListMakeStrictAppend( &pScan->pGroupTags, nodesCloneNode(nodesListGetNode(((SGroupingSetNode*)pGroupKey)->pParameterList, 0))); if (TSDB_CODE_SUCCESS != code) { break; } } - NODES_DESTORY_LIST(((SAggLogicNode*)pNode)->pGroupKeys); + NODES_DESTORY_LIST(pAgg->pGroupKeys); + code = partTagsRewriteGroupTagsToGroupKeyFuncs(pScan->pGroupTags, pAgg->pAggFuncs); } if (TSDB_CODE_SUCCESS == code) { code = partTagsOptRebuildTbanme(pScan->pGroupTags); @@ -1163,6 +1203,7 @@ static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* NODES_CLEAR_LIST(pProjectNode->node.pChildren); nodesDestroyNode((SNode*)pProjectNode); } + pCxt->optimized = true; return code; } @@ -1306,6 +1347,7 @@ static int32_t rewriteTailOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pL nodesDestroyNode((SNode*)pSort); nodesDestroyNode((SNode*)pProject); } + pCxt->optimized = true; return code; } @@ -1344,6 +1386,7 @@ static int32_t eliminateSetOpOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pSetOpNode->pChildren = NULL; ERASE_NODE(pSetOpNode->pParent->pChildren); + pCxt->optimized = true; return TSDB_CODE_SUCCESS; } } @@ -1360,6 +1403,184 @@ static int32_t eliminateSetOpOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLo return eliminateSetOpOptimizeImpl(pCxt, pLogicSubplan, pSetOpNode); } +static bool rewriteUniqueOptMayBeOptimized(SLogicNode* pNode) { + return QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC == nodeType(pNode) && ((SIndefRowsFuncLogicNode*)pNode)->isUniqueFunc; +} + +static SNode* rewriteUniqueOptCreateGroupingSet(SNode* pExpr) { + SGroupingSetNode* pGroupingSet = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET); + if (NULL == pGroupingSet) { + return NULL; + } + pGroupingSet->groupingSetType = GP_TYPE_NORMAL; + SExprNode* pGroupExpr = (SExprNode*)nodesCloneNode(pExpr); + if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pGroupingSet->pParameterList, (SNode*)pGroupExpr)) { + nodesDestroyNode((SNode*)pGroupingSet); + return NULL; + } + return (SNode*)pGroupingSet; +} + +static SNode* rewriteUniqueOptCreateFirstFunc(SFunctionNode* pSelectValue, SNode* pCol) { + SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); + if (NULL == pFunc) { + return NULL; + } + + strcpy(pFunc->functionName, "first"); + if (NULL != pSelectValue) { + snprintf(pFunc->node.aliasName, sizeof(pFunc->node.aliasName), "%s", pSelectValue->node.aliasName); + } else { + snprintf(pFunc->node.aliasName, sizeof(pFunc->node.aliasName), "%s.%p", pFunc->functionName, pFunc); + } + int32_t code = nodesListMakeStrictAppend(&pFunc->pParameterList, nodesCloneNode(pCol)); + if (TSDB_CODE_SUCCESS == code) { + code = fmGetFuncInfo(pFunc, NULL, 0); + } + + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyNode((SNode*)pFunc); + return NULL; + } + + return (SNode*)pFunc; +} + +static int32_t rewriteUniqueOptCreateAgg(SIndefRowsFuncLogicNode* pIndef, SLogicNode** pOutput) { + SAggLogicNode* pAgg = (SAggLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_AGG); + if (NULL == pAgg) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + TSWAP(pAgg->node.pChildren, pIndef->node.pChildren); + pAgg->node.precision = pIndef->node.precision; + + int32_t code = TSDB_CODE_SUCCESS; + bool hasSelectPrimaryKey = false; + SNode* pPrimaryKey = NULL; + SNode* pNode = NULL; + FOREACH(pNode, pIndef->pFuncs) { + SFunctionNode* pFunc = (SFunctionNode*)pNode; + SNode* pExpr = nodesListGetNode(pFunc->pParameterList, 0); + if (FUNCTION_TYPE_UNIQUE == pFunc->funcType) { + pPrimaryKey = nodesListGetNode(pFunc->pParameterList, 1); + code = nodesListMakeStrictAppend(&pAgg->pGroupKeys, rewriteUniqueOptCreateGroupingSet(pExpr)); + } else if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pExpr)->colId) { // _select_value(ts) => first(ts) + hasSelectPrimaryKey = true; + code = nodesListMakeStrictAppend(&pAgg->pAggFuncs, rewriteUniqueOptCreateFirstFunc(pFunc, pExpr)); + } else { // _select_value(other_col) + code = nodesListMakeStrictAppend(&pAgg->pAggFuncs, nodesCloneNode(pNode)); + } + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + + if (TSDB_CODE_SUCCESS == code) { + code = createColumnByRewriteExprs(pAgg->pGroupKeys, &pAgg->node.pTargets); + } + if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pAggFuncs) { + code = createColumnByRewriteExprs(pAgg->pAggFuncs, &pAgg->node.pTargets); + } + + if (TSDB_CODE_SUCCESS == code && !hasSelectPrimaryKey && NULL != pAgg->pAggFuncs) { + code = nodesListMakeStrictAppend(&pAgg->pAggFuncs, rewriteUniqueOptCreateFirstFunc(NULL, pPrimaryKey)); + } + + if (TSDB_CODE_SUCCESS == code) { + *pOutput = (SLogicNode*)pAgg; + } else { + nodesDestroyNode((SNode*)pAgg); + } + return code; +} + +static SNode* rewriteUniqueOptCreateProjectCol(SFunctionNode* pFunc) { + SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == pCol) { + return NULL; + } + + pCol->node.resType = pFunc->node.resType; + if (FUNCTION_TYPE_UNIQUE == pFunc->funcType) { + SExprNode* pExpr = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0); + if (QUERY_NODE_COLUMN == nodeType(pExpr)) { + strcpy(pCol->tableAlias, ((SColumnNode*)pExpr)->tableAlias); + strcpy(pCol->colName, ((SColumnNode*)pExpr)->colName); + } else { + strcpy(pCol->colName, pExpr->aliasName); + } + } else { + strcpy(pCol->colName, pFunc->node.aliasName); + } + strcpy(pCol->node.aliasName, pFunc->node.aliasName); + + return (SNode*)pCol; +} + +static int32_t rewriteUniqueOptCreateProject(SIndefRowsFuncLogicNode* pIndef, SLogicNode** pOutput) { + SProjectLogicNode* pProject = (SProjectLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_PROJECT); + if (NULL == pProject) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + TSWAP(pProject->node.pTargets, pIndef->node.pTargets); + pProject->node.precision = pIndef->node.precision; + + int32_t code = TSDB_CODE_SUCCESS; + SNode* pNode = NULL; + FOREACH(pNode, pIndef->pFuncs) { + code = nodesListMakeStrictAppend(&pProject->pProjections, rewriteUniqueOptCreateProjectCol((SFunctionNode*)pNode)); + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + + if (TSDB_CODE_SUCCESS == code) { + *pOutput = (SLogicNode*)pProject; + } else { + nodesDestroyNode((SNode*)pProject); + } + return code; +} + +static int32_t rewriteUniqueOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, + SIndefRowsFuncLogicNode* pIndef) { + SLogicNode* pAgg = NULL; + SLogicNode* pProject = NULL; + int32_t code = rewriteUniqueOptCreateAgg(pIndef, &pAgg); + if (TSDB_CODE_SUCCESS == code) { + code = rewriteUniqueOptCreateProject(pIndef, &pProject); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeAppend(&pProject->pChildren, (SNode*)pAgg); + pAgg->pParent = pProject; + pAgg = NULL; + } + if (TSDB_CODE_SUCCESS == code) { + code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pIndef, pProject); + } + if (TSDB_CODE_SUCCESS == code) { + nodesDestroyNode((SNode*)pIndef); + } else { + nodesDestroyNode((SNode*)pAgg); + nodesDestroyNode((SNode*)pProject); + } + pCxt->optimized = true; + return code; +} + +static int32_t rewriteUniqueOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { + SIndefRowsFuncLogicNode* pIndef = + (SIndefRowsFuncLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, rewriteUniqueOptMayBeOptimized); + + if (NULL == pIndef) { + return TSDB_CODE_SUCCESS; + } + + return rewriteUniqueOptimizeImpl(pCxt, pLogicSubplan, pIndef); +} + // clang-format off static const SOptimizeRule optimizeRuleSet[] = { {.pName = "ScanPath", .optimizeFunc = scanPathOptimize}, @@ -1369,23 +1590,37 @@ static const SOptimizeRule optimizeRuleSet[] = { {.pName = "PartitionTags", .optimizeFunc = partTagsOptimize}, {.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize}, {.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize}, - {.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize} + {.pName = "RewriteTail", .optimizeFunc = rewriteTailOptimize}, + {.pName = "RewriteUnique", .optimizeFunc = rewriteUniqueOptimize} }; // clang-format on static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimizeRule)); +static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) { + char* pStr = NULL; + nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL); + qDebugL("apply optimize %s rule: %s", pRuleName, pStr); + taosMemoryFree(pStr); +} + static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) { SOptimizeContext cxt = {.pPlanCxt = pCxt, .optimized = false}; + bool optimized = false; do { - cxt.optimized = false; + optimized = false; for (int32_t i = 0; i < optimizeRuleNum; ++i) { + cxt.optimized = false; int32_t code = optimizeRuleSet[i].optimizeFunc(&cxt, pLogicSubplan); if (TSDB_CODE_SUCCESS != code) { return code; } + if (cxt.optimized) { + optimized = true; + dumpLogicSubplan(optimizeRuleSet[i].pName, pLogicSubplan); + } } - } while (cxt.optimized); + } while (optimized); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 49ab50f913..9d23df5bda 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -1237,7 +1237,7 @@ static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule)); static void dumpLogicSubplan(const char* pRuleName, SLogicSubplan* pSubplan) { char* pStr = NULL; nodesNodeToString((SNode*)pSubplan, false, &pStr, NULL); - qDebugL("apply %s rule: %s", pRuleName, pStr); + qDebugL("apply split %s rule: %s", pRuleName, pStr); taosMemoryFree(pStr); } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index e75c8375fb..1b9d16311c 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -85,6 +85,11 @@ int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstream return setSubplanExecutionNode(subplan->pNode, groupId, pSource); } +int32_t qClearSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId) { + // todo + return TSDB_CODE_FAILED; +} + int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) { if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType && NULL == pSubplan->pNode) { SDataInserterNode* insert = (SDataInserterNode*)pSubplan->pDataSink; diff --git a/source/libs/planner/test/planBasicTest.cpp b/source/libs/planner/test/planBasicTest.cpp index 8c19b52a09..094ecbe8e6 100644 --- a/source/libs/planner/test/planBasicTest.cpp +++ b/source/libs/planner/test/planBasicTest.cpp @@ -56,6 +56,8 @@ TEST_F(PlanBasicTest, uniqueFunc) { run("SELECT UNIQUE(c2 + 10) FROM t1 WHERE c1 > 10"); + run("SELECT UNIQUE(c2 + 10), c2 FROM t1 WHERE c1 > 10"); + run("SELECT UNIQUE(c2 + 10), ts, c2 FROM t1 WHERE c1 > 10"); run("SELECT UNIQUE(c1) a FROM t1 ORDER BY a"); diff --git a/source/libs/planner/test/planOptimizeTest.cpp b/source/libs/planner/test/planOptimizeTest.cpp index 9bd95a79f9..5d964dd94a 100644 --- a/source/libs/planner/test/planOptimizeTest.cpp +++ b/source/libs/planner/test/planOptimizeTest.cpp @@ -20,7 +20,7 @@ using namespace std; class PlanOptimizeTest : public PlannerTestBase {}; -TEST_F(PlanOptimizeTest, optimizeScanData) { +TEST_F(PlanOptimizeTest, scanPath) { useDb("root", "test"); run("SELECT COUNT(*) FROM t1"); @@ -32,7 +32,7 @@ TEST_F(PlanOptimizeTest, optimizeScanData) { run("SELECT PERCENTILE(c1, 40), COUNT(*) FROM t1"); } -TEST_F(PlanOptimizeTest, ConditionPushDown) { +TEST_F(PlanOptimizeTest, pushDownCondition) { useDb("root", "test"); run("SELECT ts, c1 FROM st1 WHERE tag1 > 4"); @@ -44,7 +44,7 @@ TEST_F(PlanOptimizeTest, ConditionPushDown) { run("SELECT ts, c1 FROM st1 WHERE tag1 > 4 AND tag2 = 'hello' AND c1 > 10"); } -TEST_F(PlanOptimizeTest, orderByPrimaryKey) { +TEST_F(PlanOptimizeTest, sortPrimaryKey) { useDb("root", "test"); run("SELECT c1 FROM t1 ORDER BY ts"); diff --git a/tests/system-test/2-query/unique.py b/tests/system-test/2-query/unique.py index f910ff1439..aeebf2425a 100644 --- a/tests/system-test/2-query/unique.py +++ b/tests/system-test/2-query/unique.py @@ -266,7 +266,7 @@ class TDTestCase: tdSql.query("select unique(c1) from ct4") tdSql.checkRows(10) - tdSql.error("select unique(c1),tbname from ct1") + #tdSql.error("select unique(c1),tbname from ct1") #support #tdSql.error("select unique(c1),t1 from ct1") #support # unique with common col