enh(query): support multi-groups result merge into one datablock.

This commit is contained in:
Haojun Liao 2022-08-02 16:52:36 +08:00
parent b683754ff7
commit 70284f5b02
1 changed files with 32 additions and 7 deletions

View File

@ -4685,20 +4685,16 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder);
}
static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperator->info;
SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExprSupp* pSup = &pOperator->exprSupp;
SSDataBlock* pRes = iaInfo->binfo.pRes;
blockDataCleanup(pRes);
SOperatorInfo* downstream = pOperator->pDownstream[0];
int32_t scanFlag = MAIN_SCAN;
@ -4749,6 +4745,35 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
pRes->info.groupId = miaInfo->groupId;
miaInfo->hasGroupId = false;
}
static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info;
SIntervalAggOperatorInfo* iaInfo = pMiaInfo->intervalAggOperatorInfo;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SSDataBlock* pRes = iaInfo->binfo.pRes;
blockDataCleanup(pRes);
if (iaInfo->binfo.mergeResultBlock) {
while(1) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break;
}
doMergeAlignedIntervalAgg(pOperator);
}
} else {
doMergeAlignedIntervalAgg(pOperator);
}
size_t rows = pRes->info.rows;
pOperator->resultInfo.totalRows += rows;
@ -4812,7 +4837,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->info = miaInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeAlignedIntervalAgg, NULL, NULL,
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL,
destroyMergeAlignedIntervalOperatorInfo, NULL, NULL, NULL);
code = appendDownstream(pOperator, &downstream, 1);