diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 812428920a..5dcfb400d9 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -4685,20 +4685,16 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); } -static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { + +static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperator->info; SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo; - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } SExprSupp* pSup = &pOperator->exprSupp; SSDataBlock* pRes = iaInfo->binfo.pRes; - blockDataCleanup(pRes); - SOperatorInfo* downstream = pOperator->pDownstream[0]; int32_t scanFlag = MAIN_SCAN; @@ -4749,6 +4745,35 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { pRes->info.groupId = miaInfo->groupId; miaInfo->hasGroupId = false; +} + +static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info; + SIntervalAggOperatorInfo* iaInfo = pMiaInfo->intervalAggOperatorInfo; + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SSDataBlock* pRes = iaInfo->binfo.pRes; + blockDataCleanup(pRes); + + if (iaInfo->binfo.mergeResultBlock) { + while(1) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + break; + } + + doMergeAlignedIntervalAgg(pOperator); + } + } else { + doMergeAlignedIntervalAgg(pOperator); + } size_t rows = pRes->info.rows; pOperator->resultInfo.totalRows += rows; @@ -4812,7 +4837,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, pOperator->exprSupp.numOfExprs = numOfCols; pOperator->info = miaInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeAlignedIntervalAgg, NULL, NULL, + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL, destroyMergeAlignedIntervalOperatorInfo, NULL, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1);