From 461db22807bf6d7bb57b2dbcf5fcd72085fccee2 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 21 Jul 2022 20:17:30 +0800 Subject: [PATCH] fix: the problem of data loss when interval is used for outer query --- source/libs/planner/src/planLogicCreater.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 1663266fb7..7f69e77299 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -660,7 +660,7 @@ static int32_t createWindowLogicNodeByState(SLogicPlanContext* pCxt, SStateWindo pWindow->winType = WINDOW_TYPE_STATE; pWindow->node.groupAction = GROUP_ACTION_KEEP; - pWindow->node.requireDataOrder = DATA_ORDER_LEVEL_IN_GROUP; + pWindow->node.requireDataOrder = pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_IN_BLOCK : DATA_ORDER_LEVEL_IN_GROUP; pWindow->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP; pWindow->pStateExpr = nodesCloneNode(pState->pExpr); pWindow->pTspk = nodesCloneNode(pState->pCol); @@ -683,7 +683,7 @@ static int32_t createWindowLogicNodeBySession(SLogicPlanContext* pCxt, SSessionW pWindow->sessionGap = ((SValueNode*)pSession->pGap)->datum.i; pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? SESSION_ALGO_STREAM_SINGLE : SESSION_ALGO_MERGE; pWindow->node.groupAction = GROUP_ACTION_KEEP; - pWindow->node.requireDataOrder = DATA_ORDER_LEVEL_IN_GROUP; + pWindow->node.requireDataOrder = pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_IN_BLOCK : DATA_ORDER_LEVEL_IN_GROUP; pWindow->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP; pWindow->pTspk = nodesCloneNode((SNode*)pSession->pCol); @@ -1390,7 +1390,7 @@ static void setLogicSubplanType(bool hasScan, SLogicSubplan* pSubplan) { } static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel requirement) { - if (requirement <= DATA_ORDER_LEVEL_IN_BLOCK) { + if (requirement <= pScan->node.resultDataOrder) { return TSDB_CODE_SUCCESS; } pScan->scanType = SCAN_TYPE_TABLE_MERGE; @@ -1416,7 +1416,7 @@ static int32_t adjustProjectDataRequirement(SProjectLogicNode* pProject, EDataOr } static int32_t adjustIntervalDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { - if (requirement <= DATA_ORDER_LEVEL_IN_GROUP) { + if (requirement <= pWindow->node.resultDataOrder) { return TSDB_CODE_SUCCESS; } pWindow->node.resultDataOrder = requirement; @@ -1425,7 +1425,7 @@ static int32_t adjustIntervalDataRequirement(SWindowLogicNode* pWindow, EDataOrd } static int32_t adjustSessionDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { - if (requirement <= DATA_ORDER_LEVEL_IN_GROUP) { + if (requirement <= pWindow->node.resultDataOrder) { return TSDB_CODE_SUCCESS; } pWindow->node.resultDataOrder = requirement; @@ -1434,7 +1434,7 @@ static int32_t adjustSessionDataRequirement(SWindowLogicNode* pWindow, EDataOrde } static int32_t adjustStateDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { - if (requirement <= DATA_ORDER_LEVEL_IN_GROUP) { + if (requirement <= pWindow->node.resultDataOrder) { return TSDB_CODE_SUCCESS; } pWindow->node.resultDataOrder = requirement; @@ -1457,7 +1457,7 @@ static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrder } static int32_t adjustFillDataRequirement(SFillLogicNode* pFill, EDataOrderLevel requirement) { - if (requirement <= DATA_ORDER_LEVEL_IN_GROUP) { + if (requirement <= pFill->node.requireDataOrder) { return TSDB_CODE_SUCCESS; } pFill->node.resultDataOrder = requirement; @@ -1488,7 +1488,7 @@ static int32_t adjustIndefRowsDataRequirement(SIndefRowsFuncLogicNode* pIndef, E } static int32_t adjustInterpDataRequirement(SInterpFuncLogicNode* pInterp, EDataOrderLevel requirement) { - if (requirement <= DATA_ORDER_LEVEL_IN_GROUP) { + if (requirement <= pInterp->node.requireDataOrder) { return TSDB_CODE_SUCCESS; } pInterp->node.resultDataOrder = requirement;