From 3e5ab6b54cecbb658f0028fcb11af85969183e8a Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Tue, 31 May 2022 11:29:10 +0800 Subject: [PATCH 01/11] feat: add physical plannode of indefinite rows func --- include/libs/nodes/nodes.h | 2 + include/libs/nodes/plannodes.h | 11 ++ source/libs/function/src/builtins.c | 109 ++----------------- source/libs/nodes/src/nodesCloneFuncs.c | 8 ++ source/libs/nodes/src/nodesCodeFuncs.c | 67 ++++++++++++ source/libs/nodes/src/nodesTraverseFuncs.c | 2 + source/libs/nodes/src/nodesUtilFuncs.c | 4 + source/libs/planner/src/planLogicCreater.c | 34 ++++++ source/libs/planner/src/planPhysiCreater.c | 42 ++++++- source/libs/planner/test/planProjectTest.cpp | 34 ++++++ 10 files changed, 214 insertions(+), 99 deletions(-) create mode 100644 source/libs/planner/test/planProjectTest.cpp diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 3860266725..639a1173cd 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -194,6 +194,7 @@ typedef enum ENodeType { QUERY_NODE_LOGIC_PLAN_FILL, QUERY_NODE_LOGIC_PLAN_SORT, QUERY_NODE_LOGIC_PLAN_PARTITION, + QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC, QUERY_NODE_LOGIC_SUBPLAN, QUERY_NODE_LOGIC_PLAN, @@ -217,6 +218,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION_WINDOW, QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW, QUERY_NODE_PHYSICAL_PLAN_PARTITION, + QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, QUERY_NODE_PHYSICAL_PLAN_DISPATCH, QUERY_NODE_PHYSICAL_PLAN_INSERT, QUERY_NODE_PHYSICAL_SUBPLAN, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 2648a468dd..56a1343dcb 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -84,6 +84,11 @@ typedef struct SProjectLogicNode { int64_t soffset; } SProjectLogicNode; +typedef struct SIndefRowsFuncLogicNode { + SLogicNode node; + SNodeList* pVectorFuncs; +} SIndefRowsFuncLogicNode; + typedef struct SVnodeModifLogicNode { SLogicNode node; int32_t msgType; @@ -236,6 +241,12 @@ typedef struct SProjectPhysiNode { int64_t soffset; } SProjectPhysiNode; +typedef struct SIndefRowsFuncPhysiNode { + SPhysiNode node; + SNodeList* pExprs; + SNodeList* pVectorFuncs; +} SIndefRowsFuncPhysiNode; + typedef struct SJoinPhysiNode { SPhysiNode node; EJoinType joinType; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index c9d556c86b..831af33add 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -156,14 +156,7 @@ static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } - //param0 - SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0); - if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "The first parameter of PERCENTILE function can only be column"); - } - - //param1 + // param1 SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); if (pValue->datum.i < 0 || pValue->datum.i > 100) { @@ -178,7 +171,7 @@ static int32_t translatePercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - //set result type + // set result type pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; return TSDB_CODE_SUCCESS; } @@ -197,14 +190,7 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } - //param0 - SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0); - if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "The first parameter of APERCENTILE function can only be column"); - } - - //param1 + // param1 SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1); if (nodeType(pParamNode1) != QUERY_NODE_VALUE) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); @@ -223,7 +209,7 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - //param2 + // param2 if (3 == numOfParams) { uint8_t para3Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type; if (!IS_VAR_DATA_TYPE(para3Type)) { @@ -263,14 +249,7 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - //param0 - SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0); - if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "The first parameter of TOP/BOTTOM function can only be column"); - } - - //param1 + // param1 SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1); if (nodeType(pParamNode1) != QUERY_NODE_VALUE) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); @@ -287,7 +266,7 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { pValue->notReserved = true; - //set result type + // set result type SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType; pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type}; return TSDB_CODE_SUCCESS; @@ -317,13 +296,6 @@ static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } - // param0 - SNode* pParaNode0 = nodesListGetNode(pFunc->pParameterList, 0); - if (QUERY_NODE_COLUMN != nodeType(pParaNode0)) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "The first parameter of ELAPSED function can only be column"); - } - uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; if (TSDB_DATA_TYPE_TIMESTAMP != paraType) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); @@ -389,13 +361,6 @@ static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t l return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } - // param0 - SNode* pParaNode0 = nodesListGetNode(pFunc->pParameterList, 0); - if (QUERY_NODE_COLUMN != nodeType(pParaNode0)) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "The first parameter of HISTOGRAM function can only be column"); - } - uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; if (!IS_NUMERIC_TYPE(colType)) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); @@ -428,12 +393,6 @@ static int32_t translateHLL(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } - SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); - if (QUERY_NODE_COLUMN != nodeType(pPara)) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "The input parameter of HYPERLOGLOG function can only be column"); - } - pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_UBIGINT].bytes, .type = TSDB_DATA_TYPE_UBIGINT}; return TSDB_CODE_SUCCESS; } @@ -444,12 +403,6 @@ static int32_t translateStateCount(SFunctionNode* pFunc, char* pErrBuf, int32_t return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } - // param0 - SNode* pParaNode0 = nodesListGetNode(pFunc->pParameterList, 0); - if (QUERY_NODE_COLUMN != nodeType(pParaNode0)) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "The input parameter of STATECOUNT function can only be column"); - } uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; if (!IS_NUMERIC_TYPE(colType)) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); @@ -484,12 +437,6 @@ static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32 return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } - // param0 - SNode* pParaNode0 = nodesListGetNode(pFunc->pParameterList, 0); - if (QUERY_NODE_COLUMN != nodeType(pParaNode0)) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "The input parameter of STATEDURATION function can only be column"); - } uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; if (!IS_NUMERIC_TYPE(colType)) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); @@ -528,12 +475,6 @@ static int32_t translateCsum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } - SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); - if (QUERY_NODE_COLUMN != nodeType(pPara)) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "The input parameter of CSUM function can only be column"); - } - uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; uint8_t resType; if (!IS_NUMERIC_TYPE(colType)) { @@ -559,13 +500,6 @@ static int32_t translateMavg(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } - // param0 - SNode* pParaNode0 = nodesListGetNode(pFunc->pParameterList, 0); - if (QUERY_NODE_COLUMN != nodeType(pParaNode0)) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "The first parameter of MAVG function can only be column"); - } - uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; // param1 @@ -595,13 +529,6 @@ static int32_t translateSample(SFunctionNode* pFunc, char* pErrBuf, int32_t len) return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } - // param0 - SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0); - if (QUERY_NODE_COLUMN != nodeType(pParamNode0)) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "The first parameter of SAMPLE function can only be column"); - } - SExprNode* pCol = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0); uint8_t colType = pCol->resType.type; @@ -639,12 +566,6 @@ static int32_t translateTail(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } - // param0 - SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); - if (QUERY_NODE_COLUMN != nodeType(pPara)) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "The first parameter of TAIL function can only be column"); - } SExprNode* pCol = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0); uint8_t colType = pCol->resType.type; @@ -659,8 +580,8 @@ static int32_t translateTail(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (pValue->datum.i < ((i > 1) ? 0 : 1) || pValue->datum.i > 100) { return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "TAIL function second parameter should be in range [1, 100], " - "third parameter should be in range [0, 100]"); + "TAIL function second parameter should be in range [1, 100], " + "third parameter should be in range [0, 100]"); } pValue->notReserved = true; @@ -721,20 +642,12 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } - //param0 - SNode* pParamNode0 = nodesListGetNode(pFunc->pParameterList, 0); - if (nodeType(pParamNode0) != QUERY_NODE_COLUMN) { - return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "The first parameter of DIFF function can only be column"); - } - uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; - if (!IS_SIGNED_NUMERIC_TYPE(colType) && !IS_FLOAT_TYPE(colType) && - TSDB_DATA_TYPE_BOOL != colType) { + if (!IS_SIGNED_NUMERIC_TYPE(colType) && !IS_FLOAT_TYPE(colType) && TSDB_DATA_TYPE_BOOL != colType) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - //param1 + // param1 if (numOfParams == 2) { uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type; if (!IS_INTEGER_TYPE(paraType)) { @@ -852,7 +765,7 @@ static int32_t translateSubstr(SFunctionNode* pFunc, char* pErrBuf, int32_t len) if (3 == numOfParams) { SExprNode* p2 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 2); - uint8_t para2Type = p2->resType.type; + uint8_t para2Type = p2->resType.type; if (!IS_INTEGER_TYPE(para2Type)) { return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 68d3741b48..40c9e68620 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -408,6 +408,12 @@ static SNode* logicPartitionCopy(const SPartitionLogicNode* pSrc, SPartitionLogi return (SNode*)pDst; } +static SNode* logicIndefRowsFuncCopy(const SIndefRowsFuncLogicNode* pSrc, SIndefRowsFuncLogicNode* pDst) { + COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); + CLONE_NODE_LIST_FIELD(pVectorFuncs); + return (SNode*)pDst; +} + static SNode* logicSubplanCopy(const SLogicSubplan* pSrc, SLogicSubplan* pDst) { COPY_OBJECT_FIELD(id, sizeof(SSubplanId)); CLONE_NODE_FIELD(pNode); @@ -537,6 +543,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) { return logicSortCopy((const SSortLogicNode*)pNode, (SSortLogicNode*)pDst); case QUERY_NODE_LOGIC_PLAN_PARTITION: return logicPartitionCopy((const SPartitionLogicNode*)pNode, (SPartitionLogicNode*)pDst); + case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC: + return logicIndefRowsFuncCopy((const SIndefRowsFuncLogicNode*)pNode, (SIndefRowsFuncLogicNode*)pDst); case QUERY_NODE_LOGIC_SUBPLAN: return logicSubplanCopy((const SLogicSubplan*)pNode, (SLogicSubplan*)pDst); default: diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 78710569cb..e47913a91b 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -198,6 +198,8 @@ const char* nodesNodeName(ENodeType type) { return "LogicSort"; case QUERY_NODE_LOGIC_PLAN_PARTITION: return "LogicPartition"; + case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC: + return "LogicIndefRowsFunc"; case QUERY_NODE_LOGIC_SUBPLAN: return "LogicSubplan"; case QUERY_NODE_LOGIC_PLAN: @@ -236,6 +238,8 @@ const char* nodesNodeName(ENodeType type) { return "PhysiStateWindow"; case QUERY_NODE_PHYSICAL_PLAN_PARTITION: return "PhysiPartition"; + case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: + return "PhysiIndefRowsFunc"; case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: return "PhysiDispatch"; case QUERY_NODE_PHYSICAL_PLAN_INSERT: @@ -727,6 +731,30 @@ static int32_t jsonToLogicPartitionNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkIndefRowsFuncLogicPlanVectorFuncs = "VectorFuncs"; + +static int32_t logicIndefRowsFuncNodeToJson(const void* pObj, SJson* pJson) { + const SIndefRowsFuncLogicNode* pNode = (const SIndefRowsFuncLogicNode*)pObj; + + int32_t code = logicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkIndefRowsFuncLogicPlanVectorFuncs, pNode->pVectorFuncs); + } + + return code; +} + +static int32_t jsonToLogicIndefRowsFuncNode(const SJson* pJson, void* pObj) { + SIndefRowsFuncLogicNode* pNode = (SIndefRowsFuncLogicNode*)pObj; + + int32_t code = jsonToLogicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkIndefRowsFuncLogicPlanVectorFuncs, &pNode->pVectorFuncs); + } + + return code; +} + static const char* jkSubplanIdQueryId = "QueryId"; static const char* jkSubplanIdGroupId = "GroupId"; static const char* jkSubplanIdSubplanId = "SubplanId"; @@ -1744,6 +1772,37 @@ static int32_t jsonToPhysiPartitionNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkIndefRowsFuncPhysiPlanExprs = "Exprs"; +static const char* jkIndefRowsFuncPhysiPlanVectorFuncs = "VectorFuncs"; + +static int32_t physiIndefRowsFuncNodeToJson(const void* pObj, SJson* pJson) { + const SIndefRowsFuncPhysiNode* pNode = (const SIndefRowsFuncPhysiNode*)pObj; + + int32_t code = physicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkIndefRowsFuncPhysiPlanExprs, pNode->pExprs); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkIndefRowsFuncPhysiPlanVectorFuncs, pNode->pVectorFuncs); + } + + return code; +} + +static int32_t jsonToPhysiIndefRowsFuncNode(const SJson* pJson, void* pObj) { + SIndefRowsFuncPhysiNode* pNode = (SIndefRowsFuncPhysiNode*)pObj; + + int32_t code = jsonToPhysicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkIndefRowsFuncPhysiPlanExprs, &pNode->pExprs); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkIndefRowsFuncPhysiPlanVectorFuncs, &pNode->pVectorFuncs); + } + + return code; +} + static const char* jkDataSinkInputDataBlockDesc = "InputDataBlockDesc"; static int32_t physicDataSinkNodeToJson(const void* pObj, SJson* pJson) { @@ -3394,6 +3453,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return logicSortNodeToJson(pObj, pJson); case QUERY_NODE_LOGIC_PLAN_PARTITION: return logicPartitionNodeToJson(pObj, pJson); + case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC: + return logicIndefRowsFuncNodeToJson(pObj, pJson); case QUERY_NODE_LOGIC_SUBPLAN: return logicSubplanToJson(pObj, pJson); case QUERY_NODE_LOGIC_PLAN: @@ -3428,6 +3489,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return physiStateWindowNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_PARTITION: return physiPartitionNodeToJson(pObj, pJson); + case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: + return physiIndefRowsFuncNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: return physiDispatchNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_INSERT: @@ -3505,6 +3568,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToLogicSortNode(pJson, pObj); case QUERY_NODE_LOGIC_PLAN_PARTITION: return jsonToLogicPartitionNode(pJson, pObj); + case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC: + return jsonToLogicIndefRowsFuncNode(pJson, pObj); case QUERY_NODE_LOGIC_SUBPLAN: return jsonToLogicSubplan(pJson, pObj); case QUERY_NODE_LOGIC_PLAN: @@ -3539,6 +3604,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToPhysiStateWindowNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_PARTITION: return jsonToPhysiPartitionNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: + return jsonToPhysiIndefRowsFuncNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: return jsonToPhysiDispatchNode(pJson, pObj); case QUERY_NODE_PHYSICAL_SUBPLAN: diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index ae1ff5744b..9c433aa678 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -350,6 +350,7 @@ void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker wa case SQL_CLAUSE_GROUP_BY: nodesWalkExpr(pSelect->pHaving, walker, pContext); case SQL_CLAUSE_HAVING: + case SQL_CLAUSE_SELECT: case SQL_CLAUSE_DISTINCT: nodesWalkExprs(pSelect->pOrderByList, walker, pContext); case SQL_CLAUSE_ORDER_BY: @@ -382,6 +383,7 @@ void nodesRewriteSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeRewrit case SQL_CLAUSE_GROUP_BY: nodesRewriteExpr(&(pSelect->pHaving), rewriter, pContext); case SQL_CLAUSE_HAVING: + case SQL_CLAUSE_SELECT: case SQL_CLAUSE_DISTINCT: nodesRewriteExprs(pSelect->pOrderByList, rewriter, pContext); case SQL_CLAUSE_ORDER_BY: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index e28844f2e1..5232c96140 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -230,6 +230,8 @@ SNodeptr nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SSortLogicNode)); case QUERY_NODE_LOGIC_PLAN_PARTITION: return makeNode(type, sizeof(SPartitionLogicNode)); + case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC: + return makeNode(type, sizeof(SIndefRowsFuncLogicNode)); case QUERY_NODE_LOGIC_SUBPLAN: return makeNode(type, sizeof(SLogicSubplan)); case QUERY_NODE_LOGIC_PLAN: @@ -268,6 +270,8 @@ SNodeptr nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SStateWinodwPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_PARTITION: return makeNode(type, sizeof(SPartitionPhysiNode)); + case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: + return makeNode(type, sizeof(SIndefRowsFuncPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: return makeNode(type, sizeof(SDataDispatcherNode)); case QUERY_NODE_PHYSICAL_PLAN_INSERT: diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 467b26b7c4..273f46ac17 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -476,6 +476,37 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, return code; } +static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { + // top/bottom are both an aggregate function and a indefinite rows function + if (!pSelect->hasIndefiniteRowsFunc || pSelect->hasAggFuncs) { + return TSDB_CODE_SUCCESS; + } + + SIndefRowsFuncLogicNode* pIdfRowsFunc = + (SIndefRowsFuncLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC); + if (NULL == pIdfRowsFunc) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, fmIsVectorFunc, &pIdfRowsFunc->pVectorFuncs); + if (TSDB_CODE_SUCCESS == code) { + code = rewriteExprForSelect(pIdfRowsFunc->pVectorFuncs, pSelect, SQL_CLAUSE_SELECT); + } + + // set the output + if (TSDB_CODE_SUCCESS == code) { + code = createColumnByRewriteExps(pCxt, pIdfRowsFunc->pVectorFuncs, &pIdfRowsFunc->node.pTargets); + } + + if (TSDB_CODE_SUCCESS == code) { + *pLogicNode = (SLogicNode*)pIdfRowsFunc; + } else { + nodesDestroyNode(pIdfRowsFunc); + } + + return code; +} + static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SWindowLogicNode* pWindow, SLogicNode** pLogicNode) { int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_WINDOW, fmIsWindowClauseFunc, &pWindow->pFuncs); @@ -787,6 +818,9 @@ static int32_t createSelectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele if (TSDB_CODE_SUCCESS == code) { code = createChildLogicNode(pCxt, pSelect, createAggLogicNode, &pRoot); } + if (TSDB_CODE_SUCCESS == code) { + code = createChildLogicNode(pCxt, pSelect, createIndefRowsFuncLogicNode, &pRoot); + } if (TSDB_CODE_SUCCESS == code) { code = createChildLogicNode(pCxt, pSelect, createDistinctLogicNode, &pRoot); } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index a45eabefb9..69fa5499c4 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -800,6 +800,43 @@ static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, return code; } +static int32_t createIndefRowsFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, + SIndefRowsFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) { + SIndefRowsFuncPhysiNode* pIdfRowsFunc = (SIndefRowsFuncPhysiNode*)makePhysiNode( + pCxt, getPrecision(pChildren), (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC); + if (NULL == pIdfRowsFunc) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SNodeList* pPrecalcExprs = NULL; + SNodeList* pVectorFuncs = NULL; + int32_t code = rewritePrecalcExprs(pCxt, pFuncLogicNode->pVectorFuncs, &pPrecalcExprs, &pVectorFuncs); + + SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc); + // push down expression to pOutputDataBlockDesc of child node + if (TSDB_CODE_SUCCESS == code && NULL != pPrecalcExprs) { + code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs, &pIdfRowsFunc->pExprs); + if (TSDB_CODE_SUCCESS == code) { + code = pushdownDataBlockSlots(pCxt, pIdfRowsFunc->pExprs, pChildTupe); + } + } + + if (TSDB_CODE_SUCCESS == code && NULL != pVectorFuncs) { + code = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pVectorFuncs, &pIdfRowsFunc->pVectorFuncs); + if (TSDB_CODE_SUCCESS == code) { + code = addDataBlockSlots(pCxt, pIdfRowsFunc->pVectorFuncs, pIdfRowsFunc->node.pOutputDataBlockDesc); + } + } + + if (TSDB_CODE_SUCCESS == code) { + *pPhyNode = (SPhysiNode*)pIdfRowsFunc; + } else { + nodesDestroyNode(pIdfRowsFunc); + } + + return code; +} + static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SProjectLogicNode* pProjectLogicNode, SPhysiNode** pPhyNode) { SProjectPhysiNode* pProject = (SProjectPhysiNode*)makePhysiNode( @@ -949,7 +986,8 @@ static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) { SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode( pCxt, getPrecision(pChildren), (SLogicNode*)pWindowLogicNode, - (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW : QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW)); + (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW + : QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW)); if (NULL == pSession) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -1153,6 +1191,8 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode return createPartitionPhysiNode(pCxt, pChildren, (SPartitionLogicNode*)pLogicNode, pPhyNode); case QUERY_NODE_LOGIC_PLAN_FILL: return createFillPhysiNode(pCxt, pChildren, (SFillLogicNode*)pLogicNode, pPhyNode); + case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC: + return createIndefRowsFuncPhysiNode(pCxt, pChildren, (SIndefRowsFuncLogicNode*)pLogicNode, pPhyNode); default: break; } diff --git a/source/libs/planner/test/planProjectTest.cpp b/source/libs/planner/test/planProjectTest.cpp new file mode 100644 index 0000000000..3ef0038ae0 --- /dev/null +++ b/source/libs/planner/test/planProjectTest.cpp @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "planTestUtil.h" + +using namespace std; + +class PlanProjectTest : public PlannerTestBase {}; + +TEST_F(PlanProjectTest, basic) { + useDb("root", "test"); + + run("SELECT CEIL(c1) FROM t1"); +} + +TEST_F(PlanProjectTest, indefiniteRowsFunc) { + useDb("root", "test"); + + run("SELECT MAVG(c1, 10) FROM t1"); + + run("SELECT MAVG(CEIL(c1), 20) + 2 FROM t1"); +} From 0244c4ff9435217bdfb2f6c9090dba3acb23c6ba Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 4 Jun 2022 19:19:49 +0800 Subject: [PATCH 02/11] enh(query): add a new operator. --- source/libs/executor/inc/executorimpl.h | 21 ++- source/libs/executor/src/executorimpl.c | 174 +++++++++++++++++++++--- 2 files changed, 171 insertions(+), 24 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index cb45fa9730..16d758c0e3 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -493,11 +493,11 @@ typedef struct SAggOperatorInfo { typedef struct SProjectOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode - SOptrBasicInfo binfo; - SAggSupporter aggSup; + SOptrBasicInfo binfo; // todo remove it + SAggSupporter aggSup; // todo remove it - SSDataBlock* existDataBlock; - SArray* pPseudoColInfo; + SSDataBlock* existDataBlock; // todo remove it + SArray* pPseudoColInfo; // todo remove it SLimit limit; SLimit slimit; @@ -509,6 +509,17 @@ typedef struct SProjectOperatorInfo { int64_t curOutput; } SProjectOperatorInfo; +typedef struct SIndefOperatorInfo { + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SArray* pPseudoColInfo; + + SExprInfo* pScalarExpr; + int32_t numOfScalarExpr; + SqlFunctionCtx* pScalarCtx; + int32_t* rowCellInfoOffset; +} SIndefOperatorInfo; + typedef struct SFillOperatorInfo { struct SFillInfo* pFillInfo; SSDataBlock* pRes; @@ -731,6 +742,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo); + SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pIndexMap, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e7de886b03..acc38735f8 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3750,18 +3750,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { break; } -#if 0 - // Return result of the previous group in the firstly. - if (false) { - if (pRes->info.rows > 0) { - pProjectInfo->existDataBlock = pBlock; - break; - } else { // init output buffer for a new group data - initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfExprs); - } - } -#endif - // the pDataBlock are always the same one, no need to call this again int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag); if (code != TSDB_CODE_SUCCESS) { @@ -3917,6 +3905,17 @@ static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr) { return 1; } +static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) { + for (int32_t i = 0; i < numOfExprs; ++i) { + SExprInfo* pExprInfo = &pExpr[i]; + if (pExprInfo->pExpr->nodeType == QUERY_NODE_COLUMN) { + taosMemoryFree(pExprInfo->base.pParam[0].pCol); + } + taosMemoryFree(pExprInfo->base.pParam); + taosMemoryFree(pExprInfo->pExpr); + } +} + static void destroyOperatorInfo(SOperatorInfo* pOperator) { if (pOperator == NULL) { return; @@ -3936,14 +3935,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { } if (pOperator->pExpr != NULL) { - for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { - SExprInfo* pExprInfo = &pOperator->pExpr[i]; - if (pExprInfo->pExpr->nodeType == QUERY_NODE_COLUMN) { - taosMemoryFree(pExprInfo->base.pParam[0].pCol); - } - taosMemoryFree(pExprInfo->base.pParam); - taosMemoryFree(pExprInfo->pExpr); - } + destroyExprInfo(pOperator->pExpr, pOperator->numOfExprs); } taosMemoryFreeClear(pOperator->pExpr); @@ -4132,6 +4124,19 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pInfo->pPseudoColInfo); } +static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) { + SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*) param; + doDestroyBasicInfo(&pInfo->binfo, numOfOutput); + + taosArrayDestroy(pInfo->pPseudoColInfo); + cleanupAggSup(&pInfo->aggSup); + + destroySqlFunctionCtx(pInfo->pScalarCtx, numOfOutput); + destroyExprInfo(pInfo->pScalarExpr, pInfo->numOfScalarExpr); + + taosMemoryFree(pInfo->rowCellInfoOffset); +} + void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { SExchangeInfo* pExInfo = (SExchangeInfo*)param; taosArrayDestroy(pExInfo->pSources); @@ -4209,6 +4214,133 @@ _error: return NULL; } +static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { + SIndefOperatorInfo* pIndefInfo = pOperator->info; + SOptrBasicInfo* pInfo = &pIndefInfo->binfo; + + SSDataBlock* pRes = pInfo->pRes; + blockDataCleanup(pRes); + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + int64_t st = 0; + int32_t order = 0; + int32_t scanFlag = 0; + + if (pOperator->cost.openCost == 0) { + st = taosGetTimestampUs(); + } + + SOperatorInfo* downstream = pOperator->pDownstream[0]; + + while (1) { + // The downstream exec may change the value of the newgroup, so use a local variable instead. + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + if (pBlock == NULL) { + doSetOperatorCompleted(pOperator); + break; + } + + // the pDataBlock are always the same one, no need to call this again + int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } + + // there is an scalar expression that needs to be calculated before apply the group aggregation. + if (pIndefInfo->pScalarExpr != NULL) { + code = projectApplyFunctions(pIndefInfo->pScalarExpr, pBlock, pBlock, pIndefInfo->pScalarCtx, + pIndefInfo->numOfScalarExpr, pIndefInfo->pPseudoColInfo); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } + } + + setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false); + blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); + + code = projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, pIndefInfo->pPseudoColInfo); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } + } + + size_t rows = pInfo->pRes->info.rows; + pOperator->resultInfo.totalRows += rows; + + if (pOperator->cost.openCost == 0) { + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; + } + + return (rows > 0) ? pInfo->pRes : NULL; +} + +SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo) { + SIndefOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIndefOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode; + + int32_t numOfExpr = 0; + SExprInfo* pExprInfo = createExprInfo(pPhyNode->pVectorFuncs, NULL, &numOfExpr); + + int32_t numOfScalarExpr = 0; + if (pPhyNode->pExprs != NULL) { + pInfo->pScalarExpr = createExprInfo(pPhyNode->pExprs, NULL, &numOfScalarExpr); + pInfo->pScalarCtx = createSqlFunctionCtx(pInfo->pScalarExpr, numOfScalarExpr, &pInfo->rowCellInfoOffset); + } + + SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc);; + + int32_t numOfRows = 4096; + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + + // Make sure the size of SSDataBlock will never exceed the size of 2MB. + int32_t TWOMB = 2 * 1024 * 1024; + if (numOfRows * pResBlock->info.rowSize > TWOMB) { + numOfRows = TWOMB / pResBlock->info.rowSize; + } + initResultSizeInfo(pOperator, numOfRows); + + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfExpr, pResBlock, keyBufSize, pTaskInfo->id.str); + setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr, pTaskInfo); + + pInfo->binfo.pRes = pResBlock; + pInfo->numOfScalarExpr = numOfScalarExpr; + pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfExpr); + + pOperator->name = "IndefinitOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pExpr = pExprInfo; + pOperator->numOfExprs = numOfExpr; + pOperator->pTaskInfo = pTaskInfo; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL, + destroyIndefinitOperatorInfo, NULL, NULL, NULL); + + int32_t code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return pOperator; + + _error: + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + return NULL; +} + static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode, STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) { SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode); @@ -4710,6 +4842,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SInterval* pInterval = &((SIntervalAggOperatorInfo*)ops[0]->info)->interval; pOptr = createFillOperatorInfo(ops[0], pExprInfo, num, pInterval, &pFillNode->timeRange, pResBlock, pFillNode->mode, (SNodeListNode*)pFillNode->pValues, false, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) { + pOptr = createIndefinitOutputOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else { ASSERT(0); } From 8e8f5fbab2701e9a713753c7574a99c1078b1a45 Mon Sep 17 00:00:00 2001 From: "wenzhouwww@live.cn" Date: Mon, 6 Jun 2022 11:30:11 +0800 Subject: [PATCH 03/11] update csum case --- tests/system-test/2-query/csum.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tests/system-test/2-query/csum.py b/tests/system-test/2-query/csum.py index a331311fd2..5c2de5ea14 100644 --- a/tests/system-test/2-query/csum.py +++ b/tests/system-test/2-query/csum.py @@ -240,7 +240,7 @@ class TDTestCase: tdSql.error("select csum(c1) t1") # no from tdSql.error("select csum( c1 ) from ") # no table_expr # tdSql.error(self.csum_query_form(col="st1")) # tag col - tdSql.error(self.csum_query_form(col=1)) # col is a value + # tdSql.error(self.csum_query_form(col=1)) # col is a value tdSql.error(self.csum_query_form(col="'c1'")) # col is a string tdSql.error(self.csum_query_form(col=None)) # col is NULL 1 tdSql.error(self.csum_query_form(col="NULL")) # col is NULL 2 @@ -407,6 +407,14 @@ class TDTestCase: tdDnodes.start(index) self.csum_current_query() self.csum_error_query() + tdSql.query("select csum(1) from t1 ") + tdSql.checkRows(7) + tdSql.checkData(0,0,1) + tdSql.checkData(1,0,2) + tdSql.checkData(2,0,3) + tdSql.checkData(3,0,4) + tdSql.query("select csum(abs(c1))+2 from t1 ") + tdSql.checkRows(4) def run(self): import traceback From 7fc1e3a40fd9fe61707afcd5e926f650ccea1d35 Mon Sep 17 00:00:00 2001 From: "wenzhouwww@live.cn" Date: Mon, 6 Jun 2022 11:35:28 +0800 Subject: [PATCH 04/11] update case for mavg --- tests/system-test/2-query/mavg.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/tests/system-test/2-query/mavg.py b/tests/system-test/2-query/mavg.py index 1d92964615..13d2b4d420 100644 --- a/tests/system-test/2-query/mavg.py +++ b/tests/system-test/2-query/mavg.py @@ -417,8 +417,8 @@ class TDTestCase: # err9 = {"col": "st1"} # self.checkmavg(**err9) # col: tag - err10 = {"col": 1} - self.checkmavg(**err10) # col: value + # err10 = {"col": 1} + # self.checkmavg(**err10) # col: value err11 = {"col": "NULL"} self.checkmavg(**err11) # col: NULL err12 = {"col": "%_"} @@ -660,6 +660,14 @@ class TDTestCase: tdDnodes.start(index) self.mavg_current_query() self.mavg_error_query() + tdSql.query("select mavg(1,1) from t1") + tdSql.checkRows(7) + tdSql.checkData(0,0,1.000000000) + tdSql.checkData(1,0,1.000000000) + tdSql.checkData(5,0,1.000000000) + + tdSql.query("select mavg(abs(c1),1) from t1") + tdSql.checkRows(4) def run(self): import traceback From 5033cf0c8419bd7493ad15b687e27f9155214841 Mon Sep 17 00:00:00 2001 From: "wenzhouwww@live.cn" Date: Mon, 6 Jun 2022 14:04:51 +0800 Subject: [PATCH 05/11] update case for sample --- tests/system-test/2-query/sample.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/tests/system-test/2-query/sample.py b/tests/system-test/2-query/sample.py index 94e06347d2..4b651b56e7 100644 --- a/tests/system-test/2-query/sample.py +++ b/tests/system-test/2-query/sample.py @@ -427,10 +427,10 @@ class TDTestCase: # err9 = {"col": "st1"} # self.checksample(**err9) # col: tag tdSql.query(" select sample(st1 ,1) from t1 ") - err10 = {"col": 1} - self.checksample(**err10) # col: value - err11 = {"col": "NULL"} - self.checksample(**err11) # col: NULL + # err10 = {"col": 1} + # self.checksample(**err10) # col: value + # err11 = {"col": "NULL"} + # self.checksample(**err11) # col: NULL err12 = {"col": "%_"} self.checksample(**err12) # col: %_ err13 = {"col": "c3"} @@ -445,12 +445,12 @@ class TDTestCase: self.checksample(**err17) # nchar col err18 = {"col": "c6"} self.checksample(**err18) # bool col - err19 = {"col": "'c1'"} - self.checksample(**err19) # col: string + # err19 = {"col": "'c1'"} + # self.checksample(**err19) # col: string err20 = {"col": None} self.checksample(**err20) # col: None - err21 = {"col": "''"} - self.checksample(**err21) # col: '' + # err21 = {"col": "''"} + # self.checksample(**err21) # col: '' err22 = {"col": "tt1.c1"} self.checksample(**err22) # not table_expr col err23 = {"col": "t1"} @@ -459,10 +459,10 @@ class TDTestCase: self.checksample(**err24) # stbname err25 = {"col": "db"} self.checksample(**err25) # datbasename - err26 = {"col": "True"} - self.checksample(**err26) # col: BOOL 1 - err27 = {"col": True} - self.checksample(**err27) # col: BOOL 2 + # err26 = {"col": "True"} + # self.checksample(**err26) # col: BOOL 1 + # err27 = {"col": True} + # self.checksample(**err27) # col: BOOL 2 err28 = {"col": "*"} self.checksample(**err28) # col: all col err29 = {"func": "sample[", "r_comm": "]"} @@ -678,7 +678,7 @@ class TDTestCase: tdSql.error(" select sample(c1,tbname) from t1 ") tdSql.error(" select sample(c1,ts) from t1 ") tdSql.error(" select sample(c1,false) from t1 ") - tdSql.error(" select sample(123,1) from t1 ") + tdSql.query(" select sample(123,1) from t1 ") tdSql.query(" select sample(c1,2) from t1 ") tdSql.checkRows(2) From bd3e0a76fb3c23cf3d1ce955b7d4cd78598341ce Mon Sep 17 00:00:00 2001 From: "wenzhouwww@live.cn" Date: Mon, 6 Jun 2022 14:13:56 +0800 Subject: [PATCH 06/11] update case for statedurcation --- tests/system-test/2-query/function_stateduration.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tests/system-test/2-query/function_stateduration.py b/tests/system-test/2-query/function_stateduration.py index b25a658469..9aa6fdaefa 100644 --- a/tests/system-test/2-query/function_stateduration.py +++ b/tests/system-test/2-query/function_stateduration.py @@ -85,8 +85,8 @@ class TDTestCase: "select stateduration(c1 ,'GT','*',1s) from t1", "select stateduration(c1 ,'GT',ts,1s) from t1", "select stateduration(c1 ,'GT',max(c1),1s) from t1", - "select stateduration(abs(c1) ,'GT',1,1s) from t1", - "select stateduration(c1+2 ,'GT',1,1s) from t1", + # "select stateduration(abs(c1) ,'GT',1,1s) from t1", + # "select stateduration(c1+2 ,'GT',1,1s) from t1", "select stateduration(c1 ,'GT',1,1u) from t1", "select stateduration(c1 ,'GT',1,now) from t1", "select stateduration(c1 ,'GT','1',1s) from t1", @@ -323,6 +323,11 @@ class TDTestCase: tdSql.checkData(0, 0, None) tdSql.checkData(1, 0, 0.000000000) tdSql.checkData(3, 0, -86404.000000000) + + tdSql.query("select stateduration(abs(c1) ,'GT',1,1s) from t1") + tdSql.checkRows(12) + tdSql.query("select stateduration(c1+2 ,'GT',1,1s) from t1") + tdSql.checkRows(12) # bug for stable From 4f33ecbf23129cf576d67e93e80dd36dbff06fdb Mon Sep 17 00:00:00 2001 From: "wenzhouwww@live.cn" Date: Mon, 6 Jun 2022 14:15:20 +0800 Subject: [PATCH 07/11] update case --- tests/system-test/2-query/statecount.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system-test/2-query/statecount.py b/tests/system-test/2-query/statecount.py index 2634d9a9ab..ccda55f7ba 100644 --- a/tests/system-test/2-query/statecount.py +++ b/tests/system-test/2-query/statecount.py @@ -85,8 +85,8 @@ class TDTestCase: "select statecount(c1 ,'GT','*') from t1", "select statecount(c1 ,'GT',ts) from t1", "select statecount(c1 ,'GT',max(c1)) from t1", - "select statecount(abs(c1) ,'GT',1) from t1", - "select statecount(c1+2 ,'GT',1) from t1", + # "select statecount(abs(c1) ,'GT',1) from t1", + # "select statecount(c1+2 ,'GT',1) from t1", "select statecount(c1 ,'GT',1,1u) from t1", "select statecount(c1 ,'GT',1,now) from t1", "select statecount(c1 ,'GT','1') from t1", From 37b7f06b4a663b4b1864671c400e410527d028f4 Mon Sep 17 00:00:00 2001 From: "wenzhouwww@live.cn" Date: Mon, 6 Jun 2022 14:18:36 +0800 Subject: [PATCH 08/11] update case for elapsed function --- tests/system-test/2-query/elapsed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/2-query/elapsed.py b/tests/system-test/2-query/elapsed.py index 017090128d..1553e06914 100644 --- a/tests/system-test/2-query/elapsed.py +++ b/tests/system-test/2-query/elapsed.py @@ -141,7 +141,7 @@ class TDTestCase: tablenames = ["sub_table1_1","sub_table1_2","sub_table1_3","sub_table2_1","sub_table2_2","sub_table2_3","regular_table_1","regular_table_2","regular_table_3"] abnormal_list = ["()","(NULL)","(*)","(abc)","( , )","(NULL,*)","( ,NULL)","(%)","(+)","(*,)","(*, /)","(ts,*)" "(ts,tbname*10)","(ts,tagname)", - "(ts,2d+3m-2s,NULL)","(ts+1d,10s)","(ts+10d,NULL)" ,"(ts,now -1m%1d)","(ts+10d)","(ts+10d,_c0)","(ts+10d,)","(ts,%)","(ts, , m)","(ts,abc)","(ts,/)","(ts,*)","(ts,1s,100)", + "(ts,2d+3m-2s,NULL)","(ts+10d,NULL)" ,"(ts,now -1m%1d)","(ts+10d,_c0)","(ts+10d,)","(ts,%)","(ts, , m)","(ts,abc)","(ts,/)","(ts,*)","(ts,1s,100)", "(ts,1s,abc)","(ts,1s,_c0)","(ts,1s,*)","(ts,1s,NULL)","(ts,,_c0)","(ts,tbname,ts)","(ts,0,tbname)","('2021-11-18 00:00:10')","('2021-11-18 00:00:10', 1s)", "('2021-11-18T00:00:10+0800', '1s')","('2021-11-18T00:00:10Z', '1s')","('2021-11-18T00:00:10+0800', 10000000d,)","('ts', ,2021-11-18T00:00:10+0800, )"] From 62caafa35687466d0554a630ff283eb55f1f30d3 Mon Sep 17 00:00:00 2001 From: "wenzhouwww@live.cn" Date: Mon, 6 Jun 2022 15:08:43 +0800 Subject: [PATCH 09/11] update case for hyperlog function --- tests/system-test/2-query/hyperloglog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/2-query/hyperloglog.py b/tests/system-test/2-query/hyperloglog.py index 35703e441d..b20ec35f07 100644 --- a/tests/system-test/2-query/hyperloglog.py +++ b/tests/system-test/2-query/hyperloglog.py @@ -223,7 +223,7 @@ class TDTestCase: tdLog.printNoPrefix("===step 0: err case, must return err") tdSql.error( "select hyperloglog() from ct1" ) tdSql.error( "select hyperloglog(c1, c2) from ct2" ) - tdSql.error( "select hyperloglog(1) from ct2" ) + # tdSql.error( "select hyperloglog(1) from ct2" ) tdSql.error( f"select hyperloglog({NUM_COL[0]}, {NUM_COL[1]}) from ct4" ) tdSql.error( ''' select hyperloglog(['c1 + c1', 'c1 + c2', 'c1 + c3', 'c1 + c4', 'c1 + c5', 'c1 + c6', 'c1 + c7', 'c1 + c8', 'c1 + c9', 'c1 + c10']) from ct1 From 98a9789c7e40456aef8f549d06022d7a9032e97e Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Mon, 6 Jun 2022 19:55:28 +0800 Subject: [PATCH 10/11] fix: top function with order by clause --- source/libs/planner/src/planLogicCreater.c | 2 +- source/libs/planner/test/planIntervalTest.cpp | 2 ++ source/libs/planner/test/planTestUtil.cpp | 6 +++++- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index cda2ace5ba..329712bc28 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -458,7 +458,7 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { // top/bottom are both an aggregate function and a indefinite rows function - if (!pSelect->hasIndefiniteRowsFunc || pSelect->hasAggFuncs) { + if (!pSelect->hasIndefiniteRowsFunc || pSelect->hasAggFuncs || NULL != pSelect->pWindow) { return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/test/planIntervalTest.cpp b/source/libs/planner/test/planIntervalTest.cpp index a04f47741e..edd735922b 100644 --- a/source/libs/planner/test/planIntervalTest.cpp +++ b/source/libs/planner/test/planIntervalTest.cpp @@ -50,6 +50,8 @@ TEST_F(PlanIntervalTest, selectFunc) { run("SELECT MAX(c1), MIN(c1) FROM t1 INTERVAL(10s)"); // select function along with the columns of select row, and with INTERVAL clause run("SELECT MAX(c1), c2 FROM t1 INTERVAL(10s)"); + + run("SELECT TOP(c1, 1) FROM t1 INTERVAL(10s) ORDER BY c1"); } TEST_F(PlanIntervalTest, stable) { diff --git a/source/libs/planner/test/planTestUtil.cpp b/source/libs/planner/test/planTestUtil.cpp index e2082d4936..84c46cb9bb 100644 --- a/source/libs/planner/test/planTestUtil.cpp +++ b/source/libs/planner/test/planTestUtil.cpp @@ -81,6 +81,8 @@ int32_t getLogLevel() { return g_logLevel; } class PlannerTestBaseImpl { public: + PlannerTestBaseImpl() : sqlNo_(0) {} + void useDb(const string& acctId, const string& db) { caseEnv_.acctId_ = acctId; caseEnv_.db_ = db; @@ -88,6 +90,7 @@ class PlannerTestBaseImpl { } void run(const string& sql) { + ++sqlNo_; if (caseEnv_.nsql_ > 0) { --(caseEnv_.nsql_); return; @@ -229,7 +232,7 @@ class PlannerTestBaseImpl { return; } - cout << "==========================================sql : [" << stmtEnv_.sql_ << "]" << endl; + cout << "========================================== " << sqlNo_ << " sql : [" << stmtEnv_.sql_ << "]" << endl; if (DUMP_MODULE_ALL == module || DUMP_MODULE_PARSER == module) { if (res_.prepareAst_.empty()) { @@ -382,6 +385,7 @@ class PlannerTestBaseImpl { caseEnv caseEnv_; stmtEnv stmtEnv_; stmtRes res_; + int32_t sqlNo_; }; PlannerTestBase::PlannerTestBase() : impl_(new PlannerTestBaseImpl()) {} From 26d2c9e63d105334d4bb4acc435cb3b770a17731 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Mon, 6 Jun 2022 20:19:04 +0800 Subject: [PATCH 11/11] fix: top function with order by clause --- source/libs/planner/test/planTestUtil.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/libs/planner/test/planTestUtil.cpp b/source/libs/planner/test/planTestUtil.cpp index 84c46cb9bb..f5c8b58e43 100644 --- a/source/libs/planner/test/planTestUtil.cpp +++ b/source/libs/planner/test/planTestUtil.cpp @@ -190,6 +190,8 @@ class PlannerTestBaseImpl { string acctId_; string db_; int32_t nsql_; + + caseEnv() : nsql_(0) {} }; struct stmtEnv { @@ -197,6 +199,7 @@ class PlannerTestBaseImpl { array msgBuf_; SQuery* pQuery_; + stmtEnv() : pQuery_(nullptr) {} ~stmtEnv() { qDestroyQuery(pQuery_); } };