diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index cea1bbbb5a..0294446502 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -586,6 +586,7 @@ typedef struct SSessionAggOperatorInfo { int64_t gap; // session window gap int32_t tsSlotId; // primary timestamp slot id STimeWindowAggSupp twAggSup; + SNode *pCondition; } SSessionAggOperatorInfo; typedef struct SResultWindowInfo { @@ -646,6 +647,7 @@ typedef struct SStateWindowOperatorInfo { int32_t tsSlotId; // primary timestamp column slot id STimeWindowAggSupp twAggSup; // bool reptScan; + const SNode *pCondition; } SStateWindowOperatorInfo; typedef struct SStreamStateAggOperatorInfo { @@ -803,7 +805,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, - SExecTaskInfo* pTaskInfo); + SNode* pCondition, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); @@ -817,7 +819,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, - SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo); + SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, + SColumn* pStateKeyCol, SNode* pCondition, SExecTaskInfo* pTaskInfo); SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9afe927825..55bb86c5e1 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4452,7 +4452,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; pOptr = - createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo); + createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pPhyNode->pConditions, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) { pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) { @@ -4474,7 +4474,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; SColumn col = extractColumnFromColumnNode(pColNode); - pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pTaskInfo); + pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) { pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 3ce7b66ddf..e262a529bb 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -302,14 +302,21 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->binfo.pRes; if (pOperator->status == OP_RES_TO_RETURN) { - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + while(1) { + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + doFilter(pInfo->pCondition, pRes); - size_t rows = pRes->info.rows; - if (rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { - doSetOperatorCompleted(pOperator); + bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + if (!hasRemain) { + doSetOperatorCompleted(pOperator); + break; + } + + if (pRes->info.rows > 0) { + break; + } } - - pOperator->resultInfo.totalRows += rows; + pOperator->resultInfo.totalRows += pRes->info.rows; return (pRes->info.rows == 0)? NULL:pRes; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 4683d66260..ce45e0005f 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1146,13 +1146,22 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { - doSetOperatorCompleted(pOperator); - return NULL; - } + while(1) { + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + doFilter(pInfo->pCondition, pBInfo->pRes); - return pBInfo->pRes; + bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + if (!hasRemain) { + doSetOperatorCompleted(pOperator); + break; + } + + if (pBInfo->pRes->info.rows > 0) { + break; + } + } + pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows; + return (pBInfo->pRes->info.rows == 0)? NULL:pBInfo->pRes; } int32_t order = TSDB_ORDER_ASC; @@ -1178,15 +1187,22 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { - doSetOperatorCompleted(pOperator); + while(1) { + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + doFilter(pInfo->pCondition, pBInfo->pRes); + + bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + if (!hasRemain) { + doSetOperatorCompleted(pOperator); + break; + } + + if (pBInfo->pRes->info.rows > 0) { + break; + } } - - size_t rows = pBInfo->pRes->info.rows; - pOperator->resultInfo.totalRows += rows; - - return (rows == 0) ? NULL : pBInfo->pRes; + pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows; + return (pBInfo->pRes->info.rows == 0)? NULL:pBInfo->pRes; } static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { @@ -1880,12 +1896,22 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { SExprSupp* pSup = &pOperator->exprSupp; if (pOperator->status == OP_RES_TO_RETURN) { - doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { - doSetOperatorCompleted(pOperator); - } + while(1) { + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + doFilter(pInfo->pCondition, pBInfo->pRes); - return pBInfo->pRes->info.rows > 0 ? pBInfo->pRes : NULL; + bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + if (!hasRemain) { + doSetOperatorCompleted(pOperator); + break; + } + + if (pBInfo->pRes->info.rows > 0) { + break; + } + } + pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows; + return (pBInfo->pRes->info.rows == 0)? NULL:pBInfo->pRes; } int64_t st = taosGetTimestampUs(); @@ -1914,15 +1940,22 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { - doSetOperatorCompleted(pOperator); + while(1) { + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + doFilter(pInfo->pCondition, pBInfo->pRes); + + bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + if (!hasRemain) { + doSetOperatorCompleted(pOperator); + break; + } + + if (pBInfo->pRes->info.rows > 0) { + break; + } } - - size_t rows = pBInfo->pRes->info.rows; - pOperator->resultInfo.totalRows += rows; - - return (rows == 0) ? NULL : pBInfo->pRes; + pOperator->resultInfo.totalRows += pBInfo->pRes->info.rows; + return (pBInfo->pRes->info.rows == 0)? NULL:pBInfo->pRes; } static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) { @@ -2235,7 +2268,7 @@ _error: SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, STimeWindowAggSupp* pTwAggSup, int32_t tsSlotId, - SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo) { + SColumn* pStateKeyCol, SNode* pCondition, SExecTaskInfo* pTaskInfo) { SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -2246,6 +2279,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf pInfo->stateKey.type = pInfo->stateCol.type; pInfo->stateKey.bytes = pInfo->stateCol.bytes; pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes); + pInfo->pCondition = pCondition; if (pInfo->stateKey.pData == NULL) { goto _error; } @@ -2289,7 +2323,7 @@ void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, - STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { + STimeWindowAggSupp* pTwAggSupp, SNode* pCondition, SExecTaskInfo* pTaskInfo) { SSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSessionAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -2315,6 +2349,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo pInfo->binfo.pRes = pResBlock; pInfo->winSup.prevTs = INT64_MIN; pInfo->reptScan = false; + pInfo->pCondition = pCondition; pOperator->name = "SessionWindowAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION; pOperator->blocking = true; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 458e0f545d..67b79d5b1c 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -890,7 +890,7 @@ static int32_t pushDownCondOptDealProject(SOptimizeContext* pCxt, SProjectLogicN return code; } -static int32_t pushDownCondOptDealLogicNode(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { +static int32_t pushDownCondOptTrivialPushDown(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { if (NULL == pLogicNode->pConditions || OPTIMIZE_FLAG_TEST_MASK(pLogicNode->optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { return TSDB_CODE_SUCCESS; @@ -921,7 +921,7 @@ static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLog break; case QUERY_NODE_LOGIC_PLAN_SORT: case QUERY_NODE_LOGIC_PLAN_PARTITION: - code = pushDownCondOptDealLogicNode(pCxt, pLogicNode); + code = pushDownCondOptTrivialPushDown(pCxt, pLogicNode); break; default: break;