Merge pull request #14684 from taosdata/szhou/feature/push-project-condition
feat: add filter to operator merge aligned interval agg
This commit is contained in:
commit
89757d85d5
|
@ -806,7 +806,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI
|
||||||
|
|
||||||
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||||
SExecTaskInfo* pTaskInfo);
|
SNode* pCondition, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
|
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
|
||||||
|
|
|
@ -4462,7 +4462,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
|
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
|
||||||
|
|
||||||
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||||
pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo);
|
pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pPhyNode->pConditions, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
|
||||||
SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
|
SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
|
|
|
@ -4375,6 +4375,8 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo {
|
||||||
uint64_t groupId;
|
uint64_t groupId;
|
||||||
SSDataBlock* prefetchedBlock;
|
SSDataBlock* prefetchedBlock;
|
||||||
bool inputBlocksFinished;
|
bool inputBlocksFinished;
|
||||||
|
|
||||||
|
SNode* pCondition;
|
||||||
} SMergeAlignedIntervalAggOperatorInfo;
|
} SMergeAlignedIntervalAggOperatorInfo;
|
||||||
|
|
||||||
void destroyMergeAlignedIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
void destroyMergeAlignedIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
|
@ -4512,8 +4514,8 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
getTableScanInfo(pOperator, &iaInfo->order, &scanFlag);
|
getTableScanInfo(pOperator, &iaInfo->order, &scanFlag);
|
||||||
setInputDataBlock(pOperator, pSup->pCtx, pBlock, iaInfo->order, scanFlag, true);
|
setInputDataBlock(pOperator, pSup->pCtx, pBlock, iaInfo->order, scanFlag, true);
|
||||||
doMergeAlignedIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);
|
doMergeAlignedIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);
|
||||||
|
doFilter(miaInfo->pCondition, pRes);
|
||||||
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
|
if (pRes->info.rows > 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4521,7 +4523,7 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
pRes->info.groupId = miaInfo->groupId;
|
pRes->info.groupId = miaInfo->groupId;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRes->info.rows == 0) {
|
if (miaInfo->inputBlocksFinished) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4532,7 +4534,7 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo,
|
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo,
|
||||||
int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
|
int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
|
||||||
int32_t primaryTsSlotId, SExecTaskInfo* pTaskInfo) {
|
int32_t primaryTsSlotId, SNode* pCondition, SExecTaskInfo* pTaskInfo) {
|
||||||
SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
|
SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (miaInfo == NULL || pOperator == NULL) {
|
if (miaInfo == NULL || pOperator == NULL) {
|
||||||
|
@ -4547,6 +4549,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
|
SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
|
||||||
SExprSupp* pSup = &pOperator->exprSupp;
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
|
||||||
|
miaInfo->pCondition = pCondition;
|
||||||
iaInfo->win = pTaskInfo->window;
|
iaInfo->win = pTaskInfo->window;
|
||||||
iaInfo->order = TSDB_ORDER_ASC;
|
iaInfo->order = TSDB_ORDER_ASC;
|
||||||
iaInfo->interval = *pInterval;
|
iaInfo->interval = *pInterval;
|
||||||
|
|
Loading…
Reference in New Issue