From 45a806f78961c23979ae4ae68eaa9c3e89d2e1ca Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Tue, 21 Jun 2022 19:52:26 +0800 Subject: [PATCH] feat: sql function 'last_row' --- include/libs/function/functionMgt.h | 1 + include/libs/nodes/nodes.h | 9 ++-- include/libs/nodes/plannodes.h | 4 +- include/libs/nodes/querynodes.h | 1 + source/libs/function/src/builtins.c | 2 +- source/libs/function/src/functionMgt.c | 7 +++ source/libs/nodes/src/nodesCodeFuncs.c | 4 ++ source/libs/nodes/src/nodesUtilFuncs.c | 3 ++ source/libs/parser/src/parCalcConst.c | 2 +- source/libs/parser/src/parTranslater.c | 1 + source/libs/planner/src/planLogicCreater.c | 59 +++++++++++++++++++++- source/libs/planner/src/planOptimizer.c | 2 +- source/libs/planner/src/planPhysiCreater.c | 5 +- source/libs/planner/src/planSpliter.c | 2 +- source/libs/planner/test/planBasicTest.cpp | 12 +++++ 15 files changed, 103 insertions(+), 11 deletions(-) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index cbaff29cb2..1f314d19e7 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -190,6 +190,7 @@ bool fmIsForbidWindowFunc(int32_t funcId); bool fmIsForbidGroupByFunc(int32_t funcId); bool fmIsIntervalInterpoFunc(int32_t funcId); bool fmIsInterpFunc(int32_t funcId); +bool fmIsLastRowFunc(int32_t funcId); int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc); diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 58e2393970..88a6185495 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -59,10 +59,10 @@ extern "C" { for (SListCell* cell = (NULL != (list) ? (list)->pHead : NULL); \ (NULL != cell ? (node = &(cell->pNode), true) : (node = NULL, false)); cell = cell->pNext) -#define DESTORY_LIST(list) \ - do { \ - nodesDestroyList((list)); \ - (list) = NULL; \ +#define NODES_DESTORY_LIST(list) \ + do { \ + nodesDestroyList((list)); \ + (list) = NULL; \ } while (0) #define NODES_CLEAR_LIST(list) \ @@ -219,6 +219,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, + QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, QUERY_NODE_PHYSICAL_PLAN_PROJECT, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index b07e8f39d5..9fdc66d76c 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -40,7 +40,8 @@ typedef enum EScanType { SCAN_TYPE_SYSTEM_TABLE, SCAN_TYPE_STREAM, SCAN_TYPE_TABLE_MERGE, - SCAN_TYPE_BLOCK_INFO + SCAN_TYPE_BLOCK_INFO, + SCAN_TYPE_LAST_ROW } EScanType; typedef struct SScanLogicNode { @@ -260,6 +261,7 @@ typedef struct SScanPhysiNode { typedef SScanPhysiNode STagScanPhysiNode; typedef SScanPhysiNode SBlockDistScanPhysiNode; +typedef SScanPhysiNode SLastRowScanPhysiNode; typedef struct SSystemTableScanPhysiNode { SScanPhysiNode scan; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 7a63f87412..78013e5457 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -258,6 +258,7 @@ typedef struct SSelectStmt { bool hasUniqueFunc; bool hasTailFunc; bool hasInterpFunc; + bool hasLastRowFunc; } SSelectStmt; typedef enum ESetOperatorType { SET_OP_TYPE_UNION_ALL = 1, SET_OP_TYPE_UNION } ESetOperatorType; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index cfad00f458..0c92f1328f 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1799,7 +1799,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "last_row", .type = FUNCTION_TYPE_LAST_ROW, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC, + .classification = FUNC_MGT_MULTI_RES_FUNC, .translateFunc = translateLastRow, .getEnvFunc = getMinmaxFuncEnv, .initFunc = minmaxFunctionSetup, diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 710b01ce59..5fcf5e239c 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -186,6 +186,13 @@ bool fmIsInterpFunc(int32_t funcId) { return FUNCTION_TYPE_INTERP == funcMgtBuiltins[funcId].type; } +bool fmIsLastRowFunc(int32_t funcId) { + if (funcId < 0 || funcId >= funcMgtBuiltinsNum) { + return false; + } + return FUNCTION_TYPE_LAST_ROW == funcMgtBuiltins[funcId].type; +} + void fmFuncMgtDestroy() { void* m = gFunMgtService.pFuncNameHashTable; if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) { diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index bde5159087..e3430b866b 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -220,6 +220,8 @@ const char* nodesNodeName(ENodeType type) { return "PhysiSystemTableScan"; case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: return "PhysiBlockDistScan"; + case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: + return "PhysiLastRowScan"; case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return "PhysiProject"; case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: @@ -4105,6 +4107,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return logicPlanToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: + case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: return physiTagScanNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN: @@ -4245,6 +4248,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToLogicPlan(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: + case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: return jsonToPhysiTagScanNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index ebacf5574f..e3755c74e6 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -273,6 +273,8 @@ SNode* nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SSystemTableScanPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: return makeNode(type, sizeof(SBlockDistScanPhysiNode)); + case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: + return makeNode(type, sizeof(SLastRowScanPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return makeNode(type, sizeof(SProjectPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: @@ -781,6 +783,7 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: + case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: destroyScanPhysiNode((SScanPhysiNode*)pNode); break; case QUERY_NODE_PHYSICAL_PLAN_PROJECT: { diff --git a/source/libs/parser/src/parCalcConst.c b/source/libs/parser/src/parCalcConst.c index 22d7afd642..042d6ab184 100644 --- a/source/libs/parser/src/parCalcConst.c +++ b/source/libs/parser/src/parCalcConst.c @@ -227,7 +227,7 @@ static int32_t calcConstGroupBy(SCalcConstContext* pCxt, SSelectStmt* pSelect) { } } } - DESTORY_LIST(pSelect->pGroupByList); + NODES_DESTORY_LIST(pSelect->pGroupByList); } return code; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index fa0a66820f..d6619ebcc8 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1176,6 +1176,7 @@ static void setFuncClassification(SSelectStmt* pSelect, SFunctionNode* pFunc) { pSelect->hasUniqueFunc = pSelect->hasUniqueFunc ? true : (FUNCTION_TYPE_UNIQUE == pFunc->funcType); pSelect->hasTailFunc = pSelect->hasTailFunc ? true : (FUNCTION_TYPE_TAIL == pFunc->funcType); pSelect->hasInterpFunc = pSelect->hasInterpFunc ? true : (FUNCTION_TYPE_INTERP == pFunc->funcType); + pSelect->hasLastRowFunc = pSelect->hasLastRowFunc ? true : (FUNCTION_TYPE_LAST_ROW == pFunc->funcType); } } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 36d2efc13f..7e95cb8b7e 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -296,6 +296,59 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect return code; } +static int32_t createColumnByLastRow(SNodeList* pFuncs, SNodeList** pOutput) { + int32_t code = TSDB_CODE_SUCCESS; + SNodeList* pCols = NULL; + SNode* pFunc = NULL; + FOREACH(pFunc, pFuncs) { + SFunctionNode* pLastRow = (SFunctionNode*)pFunc; + SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pLastRow->pParameterList, 0); + snprintf(pCol->colName, sizeof(pCol->colName), "%s", pLastRow->node.aliasName); + snprintf(pCol->node.aliasName, sizeof(pCol->colName), "%s", pLastRow->node.aliasName); + NODES_CLEAR_LIST(pLastRow->pParameterList); + code = nodesListMakeStrictAppend(&pCols, (SNode*)pCol); + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + if (TSDB_CODE_SUCCESS == code) { + *pOutput = pCols; + } else { + nodesDestroyList(pCols); + } + return code; +} + +static int32_t createLastRowScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SRealTableNode* pRealTable, + SLogicNode** pLogicNode) { + SScanLogicNode* pScan = NULL; + int32_t code = makeScanLogicNode(pCxt, pRealTable, false, (SLogicNode**)&pScan); + + SNodeList* pFuncs = NULL; + if (TSDB_CODE_SUCCESS == code) { + pScan->scanType = SCAN_TYPE_LAST_ROW; + code = nodesCollectFuncs(pSelect, SQL_CLAUSE_FROM, fmIsLastRowFunc, &pFuncs); + } + if (TSDB_CODE_SUCCESS == code) { + code = rewriteExprsForSelect(pFuncs, pSelect, SQL_CLAUSE_FROM); + } + if (TSDB_CODE_SUCCESS == code) { + code = createColumnByLastRow(pFuncs, &pScan->pScanCols); + } + if (TSDB_CODE_SUCCESS == code) { + code = createColumnByRewriteExprs(pScan->pScanCols, &pScan->node.pTargets); + } + + if (TSDB_CODE_SUCCESS == code) { + *pLogicNode = (SLogicNode*)pScan; + } else { + nodesDestroyNode((SNode*)pScan); + } + nodesDestroyList(pFuncs); + + return code; +} + static int32_t createSubqueryLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, STempTableNode* pTable, SLogicNode** pLogicNode) { return createQueryLogicNode(pCxt, pTable->pSubquery, pLogicNode); @@ -367,7 +420,11 @@ static int32_t doCreateLogicNodeByTable(SLogicPlanContext* pCxt, SSelectStmt* pS SLogicNode** pLogicNode) { switch (nodeType(pTable)) { case QUERY_NODE_REAL_TABLE: - return createScanLogicNode(pCxt, pSelect, (SRealTableNode*)pTable, pLogicNode); + if (pSelect->hasLastRowFunc) { + return createLastRowScanLogicNode(pCxt, pSelect, (SRealTableNode*)pTable, pLogicNode); + } else { + return createScanLogicNode(pCxt, pSelect, (SRealTableNode*)pTable, pLogicNode); + } case QUERY_NODE_TEMP_TABLE: return createSubqueryLogicNode(pCxt, pSelect, (STempTableNode*)pTable, pLogicNode); case QUERY_NODE_JOIN_TABLE: diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index cd3218cf94..d74d6230fc 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1126,7 +1126,7 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub break; } } - DESTORY_LIST(((SAggLogicNode*)pNode)->pGroupKeys); + NODES_DESTORY_LIST(((SAggLogicNode*)pNode)->pGroupKeys); } if (TSDB_CODE_SUCCESS == code) { code = partTagsOptRebuildTbanme(pScan->pPartTags); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index ff78370c52..42ac3855f9 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -462,6 +462,8 @@ static ENodeType getScanOperatorType(EScanType scanType) { return QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN; case SCAN_TYPE_BLOCK_INFO: return QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN; + case SCAN_TYPE_LAST_ROW: + return QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN; default: break; } @@ -559,6 +561,7 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, switch (pScanLogicNode->scanType) { case SCAN_TYPE_TAG: case SCAN_TYPE_BLOCK_INFO: + case SCAN_TYPE_LAST_ROW: return createSimpleScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode); case SCAN_TYPE_TABLE: return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode); @@ -732,7 +735,7 @@ static int32_t rewritePrecalcExprs(SPhysiPlanContext* pCxt, SNodeList* pList, SN SRewritePrecalcExprsCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pPrecalcExprs = *pPrecalcExprs}; nodesRewriteExprs(*pRewrittenList, doRewritePrecalcExprs, &cxt); if (0 == LIST_LENGTH(cxt.pPrecalcExprs) || TSDB_CODE_SUCCESS != cxt.errCode) { - DESTORY_LIST(*pPrecalcExprs); + NODES_DESTORY_LIST(*pPrecalcExprs); } return cxt.errCode; } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 29119bf1d2..f0e0e84bd9 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -934,7 +934,7 @@ static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubpl } if (TSDB_CODE_SUCCESS == code) { nodesDestroyList(pSubplanChildren); - DESTORY_LIST(pSplitNode->pChildren); + NODES_DESTORY_LIST(pSplitNode->pChildren); } return code; } diff --git a/source/libs/planner/test/planBasicTest.cpp b/source/libs/planner/test/planBasicTest.cpp index f74c7df355..4ee6c1ad01 100644 --- a/source/libs/planner/test/planBasicTest.cpp +++ b/source/libs/planner/test/planBasicTest.cpp @@ -83,3 +83,15 @@ TEST_F(PlanBasicTest, interpFunc) { run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s) FILL(LINEAR)"); } + +TEST_F(PlanBasicTest, lastRowFunc) { + useDb("root", "test"); + + run("SELECT LAST_ROW(c1) FROM t1"); + + run("SELECT LAST_ROW(*) FROM t1"); + + run("SELECT LAST_ROW(c1, c2) FROM t1"); + + run("SELECT LAST_ROW(c1) FROM st1"); +}