From d7c454932498325b197cf0af907057f8ee2e35d1 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 22 Jul 2022 10:38:11 +0800 Subject: [PATCH] fix: the problem of data loss when interval is used for outer query --- source/libs/nodes/src/nodesCodeFuncs.c | 21 +++ source/libs/planner/inc/planInt.h | 1 + source/libs/planner/src/planLogicCreater.c | 167 +-------------------- source/libs/planner/src/planOptimizer.c | 8 + source/libs/planner/src/planUtil.c | 167 +++++++++++++++++++++ 5 files changed, 198 insertions(+), 166 deletions(-) diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index e3c50de654..c92e6908f1 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -504,6 +504,9 @@ static const char* jkLogicPlanConditions = "Conditions"; static const char* jkLogicPlanChildren = "Children"; static const char* jkLogicPlanLimit = "Limit"; static const char* jkLogicPlanSlimit = "SLimit"; +static const char* jkLogicPlanRequireDataOrder = "RequireDataOrder"; +static const char* jkLogicPlanResultDataOrder = "ResultDataOrder"; +static const char* jkLogicPlanGroupAction = "GroupAction"; static int32_t logicPlanNodeToJson(const void* pObj, SJson* pJson) { const SLogicNode* pNode = (const SLogicNode*)pObj; @@ -521,6 +524,15 @@ static int32_t logicPlanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkLogicPlanSlimit, nodeToJson, pNode->pSlimit); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkLogicPlanRequireDataOrder, pNode->requireDataOrder); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkLogicPlanResultDataOrder, pNode->resultDataOrder); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkLogicPlanGroupAction, pNode->groupAction); + } return code; } @@ -541,6 +553,15 @@ static int32_t jsonToLogicPlanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkLogicPlanSlimit, &pNode->pSlimit); } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkLogicPlanRequireDataOrder, pNode->requireDataOrder, code); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkLogicPlanResultDataOrder, pNode->resultDataOrder, code); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkLogicPlanGroupAction, pNode->groupAction, code); + } return code; } diff --git a/source/libs/planner/inc/planInt.h b/source/libs/planner/inc/planInt.h index b7fba83235..88c7c26276 100644 --- a/source/libs/planner/inc/planInt.h +++ b/source/libs/planner/inc/planInt.h @@ -35,6 +35,7 @@ int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...); int32_t createColumnByRewriteExprs(SNodeList* pExprs, SNodeList** pList); int32_t createColumnByRewriteExpr(SNode* pExpr, SNodeList** pList); int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* pNew); +int32_t adjustLogicNodeDataRequirement(SLogicNode* pNode, EDataOrderLevel requirement); int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan); int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 0bd0e87194..78e779565e 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1389,171 +1389,6 @@ static void setLogicSubplanType(bool hasScan, SLogicSubplan* pSubplan) { } } -static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel requirement) { - if (requirement <= pScan->node.resultDataOrder) { - return TSDB_CODE_SUCCESS; - } - if (TSDB_SUPER_TABLE == pScan->tableType) { - pScan->scanType = SCAN_TYPE_TABLE_MERGE; - } - pScan->node.resultDataOrder = requirement; - return TSDB_CODE_SUCCESS; -} - -static int32_t adjustJoinDataRequirement(SJoinLogicNode* pJoin, EDataOrderLevel requirement) { - return TSDB_CODE_SUCCESS; -} - -static int32_t adjustAggDataRequirement(SAggLogicNode* pAgg, EDataOrderLevel requirement) { - if (requirement > DATA_ORDER_LEVEL_NONE) { - return TSDB_CODE_PLAN_INTERNAL_ERROR; - } - return TSDB_CODE_SUCCESS; -} - -static int32_t adjustProjectDataRequirement(SProjectLogicNode* pProject, EDataOrderLevel requirement) { - pProject->node.resultDataOrder = requirement; - pProject->node.requireDataOrder = requirement; - return TSDB_CODE_SUCCESS; -} - -static int32_t adjustIntervalDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { - if (requirement <= pWindow->node.resultDataOrder) { - return TSDB_CODE_SUCCESS; - } - pWindow->node.resultDataOrder = requirement; - pWindow->node.requireDataOrder = requirement; - return TSDB_CODE_SUCCESS; -} - -static int32_t adjustSessionDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { - if (requirement <= pWindow->node.resultDataOrder) { - return TSDB_CODE_SUCCESS; - } - pWindow->node.resultDataOrder = requirement; - pWindow->node.requireDataOrder = requirement; - return TSDB_CODE_SUCCESS; -} - -static int32_t adjustStateDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { - if (requirement <= pWindow->node.resultDataOrder) { - return TSDB_CODE_SUCCESS; - } - pWindow->node.resultDataOrder = requirement; - pWindow->node.requireDataOrder = requirement; - return TSDB_CODE_SUCCESS; -} - -static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { - switch (pWindow->winType) { - case WINDOW_TYPE_INTERVAL: - return adjustIntervalDataRequirement(pWindow, requirement); - case WINDOW_TYPE_SESSION: - return adjustSessionDataRequirement(pWindow, requirement); - case WINDOW_TYPE_STATE: - return adjustStateDataRequirement(pWindow, requirement); - default: - break; - } - return TSDB_CODE_PLAN_INTERNAL_ERROR; -} - -static int32_t adjustFillDataRequirement(SFillLogicNode* pFill, EDataOrderLevel requirement) { - if (requirement <= pFill->node.requireDataOrder) { - return TSDB_CODE_SUCCESS; - } - pFill->node.resultDataOrder = requirement; - pFill->node.requireDataOrder = requirement; - return TSDB_CODE_SUCCESS; -} - -static int32_t adjustSortDataRequirement(SSortLogicNode* pSort, EDataOrderLevel requirement) { - return TSDB_CODE_SUCCESS; -} - -static int32_t adjustPartitionDataRequirement(SPartitionLogicNode* pPart, EDataOrderLevel requirement) { - if (DATA_ORDER_LEVEL_GLOBAL == requirement) { - return TSDB_CODE_PLAN_INTERNAL_ERROR; - } - pPart->node.resultDataOrder = requirement; - pPart->node.requireDataOrder = requirement; - return TSDB_CODE_SUCCESS; -} - -static int32_t adjustIndefRowsDataRequirement(SIndefRowsFuncLogicNode* pIndef, EDataOrderLevel requirement) { - if (requirement <= pIndef->node.resultDataOrder) { - return TSDB_CODE_SUCCESS; - } - pIndef->node.resultDataOrder = requirement; - pIndef->node.requireDataOrder = requirement; - return TSDB_CODE_SUCCESS; -} - -static int32_t adjustInterpDataRequirement(SInterpFuncLogicNode* pInterp, EDataOrderLevel requirement) { - if (requirement <= pInterp->node.requireDataOrder) { - return TSDB_CODE_SUCCESS; - } - pInterp->node.resultDataOrder = requirement; - pInterp->node.requireDataOrder = requirement; - return TSDB_CODE_SUCCESS; -} - -static int32_t adjustLogicNodeDataRequirementImpl(SLogicNode* pNode, EDataOrderLevel requirement) { - int32_t code = TSDB_CODE_SUCCESS; - switch (nodeType(pNode)) { - case QUERY_NODE_LOGIC_PLAN_SCAN: - code = adjustScanDataRequirement((SScanLogicNode*)pNode, requirement); - break; - case QUERY_NODE_LOGIC_PLAN_JOIN: - code = adjustJoinDataRequirement((SJoinLogicNode*)pNode, requirement); - break; - case QUERY_NODE_LOGIC_PLAN_AGG: - code = adjustAggDataRequirement((SAggLogicNode*)pNode, requirement); - break; - case QUERY_NODE_LOGIC_PLAN_PROJECT: - code = adjustProjectDataRequirement((SProjectLogicNode*)pNode, requirement); - break; - case QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY: - case QUERY_NODE_LOGIC_PLAN_EXCHANGE: - case QUERY_NODE_LOGIC_PLAN_MERGE: - break; - case QUERY_NODE_LOGIC_PLAN_WINDOW: - code = adjustWindowDataRequirement((SWindowLogicNode*)pNode, requirement); - break; - case QUERY_NODE_LOGIC_PLAN_FILL: - code = adjustFillDataRequirement((SFillLogicNode*)pNode, requirement); - break; - case QUERY_NODE_LOGIC_PLAN_SORT: - code = adjustSortDataRequirement((SSortLogicNode*)pNode, requirement); - break; - case QUERY_NODE_LOGIC_PLAN_PARTITION: - code = adjustPartitionDataRequirement((SPartitionLogicNode*)pNode, requirement); - break; - case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC: - code = adjustIndefRowsDataRequirement((SIndefRowsFuncLogicNode*)pNode, requirement); - break; - case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC: - code = adjustInterpDataRequirement((SInterpFuncLogicNode*)pNode, requirement); - break; - default: - break; - } - if (TSDB_CODE_SUCCESS == code) { - SNode* pChild = NULL; - FOREACH(pChild, pNode->pChildren) { - code = adjustLogicNodeDataRequirementImpl((SLogicNode*)pChild, pNode->requireDataOrder); - if (TSDB_CODE_SUCCESS != code) { - break; - } - } - } - return code; -} - -static int32_t adjustLogicNodeDataRequirement(SLogicNode* pNode) { - return adjustLogicNodeDataRequirementImpl(pNode, DATA_ORDER_LEVEL_NONE); -} - int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) { SLogicPlanContext cxt = {.pPlanCxt = pCxt, .pCurrRoot = NULL, .hasScan = false}; @@ -1569,7 +1404,7 @@ int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) { if (TSDB_CODE_SUCCESS == code) { setLogicNodeParent(pSubplan->pNode); setLogicSubplanType(cxt.hasScan, pSubplan); - code = adjustLogicNodeDataRequirement(pSubplan->pNode); + code = adjustLogicNodeDataRequirement(pSubplan->pNode, DATA_ORDER_LEVEL_NONE); } if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 8079513e4d..b7b8cfce33 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1906,6 +1906,8 @@ static int32_t rewriteUniqueOptCreateAgg(SIndefRowsFuncLogicNode* pIndef, SLogic TSWAP(pAgg->node.pChildren, pIndef->node.pChildren); optResetParent((SLogicNode*)pAgg); pAgg->node.precision = pIndef->node.precision; + pAgg->node.requireDataOrder = DATA_ORDER_LEVEL_IN_BLOCK; // first function requirement + pAgg->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; int32_t code = TSDB_CODE_SUCCESS; bool hasSelectPrimaryKey = false; @@ -1978,6 +1980,8 @@ static int32_t rewriteUniqueOptCreateProject(SIndefRowsFuncLogicNode* pIndef, SL TSWAP(pProject->node.pTargets, pIndef->node.pTargets); pProject->node.precision = pIndef->node.precision; + pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; + pProject->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; int32_t code = TSDB_CODE_SUCCESS; SNode* pNode = NULL; @@ -2012,6 +2016,10 @@ static int32_t rewriteUniqueOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* if (TSDB_CODE_SUCCESS == code) { code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pIndef, pProject); } + if (TSDB_CODE_SUCCESS == code) { + code = adjustLogicNodeDataRequirement( + pProject, NULL == pProject->pParent ? DATA_ORDER_LEVEL_NONE : pProject->pParent->requireDataOrder); + } if (TSDB_CODE_SUCCESS == code) { nodesDestroyNode((SNode*)pIndef); } else { diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index 77e4e05530..4214377101 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -121,3 +121,170 @@ int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* } return TSDB_CODE_PLAN_INTERNAL_ERROR; } + +static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel requirement) { + if (SCAN_TYPE_TABLE != pScan->scanType || SCAN_TYPE_TABLE_MERGE != pScan->scanType) { + return TSDB_CODE_SUCCESS; + } + // The lowest sort level of scan output data is DATA_ORDER_LEVEL_IN_BLOCK + if (requirement < DATA_ORDER_LEVEL_IN_BLOCK) { + requirement = DATA_ORDER_LEVEL_IN_BLOCK; + } + if (DATA_ORDER_LEVEL_IN_BLOCK == requirement) { + pScan->scanType = SCAN_TYPE_TABLE; + } else if (TSDB_SUPER_TABLE == pScan->tableType) { + pScan->scanType = SCAN_TYPE_TABLE_MERGE; + } + pScan->node.resultDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustJoinDataRequirement(SJoinLogicNode* pJoin, EDataOrderLevel requirement) { + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustAggDataRequirement(SAggLogicNode* pAgg, EDataOrderLevel requirement) { + if (requirement > DATA_ORDER_LEVEL_NONE) { + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustProjectDataRequirement(SProjectLogicNode* pProject, EDataOrderLevel requirement) { + pProject->node.resultDataOrder = requirement; + pProject->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustIntervalDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { + if (requirement <= pWindow->node.resultDataOrder) { + return TSDB_CODE_SUCCESS; + } + pWindow->node.resultDataOrder = requirement; + pWindow->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustSessionDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { + if (requirement <= pWindow->node.resultDataOrder) { + return TSDB_CODE_SUCCESS; + } + pWindow->node.resultDataOrder = requirement; + pWindow->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustStateDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { + if (requirement <= pWindow->node.resultDataOrder) { + return TSDB_CODE_SUCCESS; + } + pWindow->node.resultDataOrder = requirement; + pWindow->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { + switch (pWindow->winType) { + case WINDOW_TYPE_INTERVAL: + return adjustIntervalDataRequirement(pWindow, requirement); + case WINDOW_TYPE_SESSION: + return adjustSessionDataRequirement(pWindow, requirement); + case WINDOW_TYPE_STATE: + return adjustStateDataRequirement(pWindow, requirement); + default: + break; + } + return TSDB_CODE_PLAN_INTERNAL_ERROR; +} + +static int32_t adjustFillDataRequirement(SFillLogicNode* pFill, EDataOrderLevel requirement) { + if (requirement <= pFill->node.requireDataOrder) { + return TSDB_CODE_SUCCESS; + } + pFill->node.resultDataOrder = requirement; + pFill->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustSortDataRequirement(SSortLogicNode* pSort, EDataOrderLevel requirement) { + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustPartitionDataRequirement(SPartitionLogicNode* pPart, EDataOrderLevel requirement) { + if (DATA_ORDER_LEVEL_GLOBAL == requirement) { + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + pPart->node.resultDataOrder = requirement; + pPart->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustIndefRowsDataRequirement(SIndefRowsFuncLogicNode* pIndef, EDataOrderLevel requirement) { + if (requirement <= pIndef->node.resultDataOrder) { + return TSDB_CODE_SUCCESS; + } + pIndef->node.resultDataOrder = requirement; + pIndef->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustInterpDataRequirement(SInterpFuncLogicNode* pInterp, EDataOrderLevel requirement) { + if (requirement <= pInterp->node.requireDataOrder) { + return TSDB_CODE_SUCCESS; + } + pInterp->node.resultDataOrder = requirement; + pInterp->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +int32_t adjustLogicNodeDataRequirement(SLogicNode* pNode, EDataOrderLevel requirement) { + int32_t code = TSDB_CODE_SUCCESS; + switch (nodeType(pNode)) { + case QUERY_NODE_LOGIC_PLAN_SCAN: + code = adjustScanDataRequirement((SScanLogicNode*)pNode, requirement); + break; + case QUERY_NODE_LOGIC_PLAN_JOIN: + code = adjustJoinDataRequirement((SJoinLogicNode*)pNode, requirement); + break; + case QUERY_NODE_LOGIC_PLAN_AGG: + code = adjustAggDataRequirement((SAggLogicNode*)pNode, requirement); + break; + case QUERY_NODE_LOGIC_PLAN_PROJECT: + code = adjustProjectDataRequirement((SProjectLogicNode*)pNode, requirement); + break; + case QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY: + case QUERY_NODE_LOGIC_PLAN_EXCHANGE: + case QUERY_NODE_LOGIC_PLAN_MERGE: + break; + case QUERY_NODE_LOGIC_PLAN_WINDOW: + code = adjustWindowDataRequirement((SWindowLogicNode*)pNode, requirement); + break; + case QUERY_NODE_LOGIC_PLAN_FILL: + code = adjustFillDataRequirement((SFillLogicNode*)pNode, requirement); + break; + case QUERY_NODE_LOGIC_PLAN_SORT: + code = adjustSortDataRequirement((SSortLogicNode*)pNode, requirement); + break; + case QUERY_NODE_LOGIC_PLAN_PARTITION: + code = adjustPartitionDataRequirement((SPartitionLogicNode*)pNode, requirement); + break; + case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC: + code = adjustIndefRowsDataRequirement((SIndefRowsFuncLogicNode*)pNode, requirement); + break; + case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC: + code = adjustInterpDataRequirement((SInterpFuncLogicNode*)pNode, requirement); + break; + default: + break; + } + if (TSDB_CODE_SUCCESS == code) { + SNode* pChild = NULL; + FOREACH(pChild, pNode->pChildren) { + code = adjustLogicNodeDataRequirement((SLogicNode*)pChild, pNode->requireDataOrder); + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + } + return code; +}