feat: add filter to interval agg operator
This commit is contained in:
parent
bd819f5c88
commit
c710eede6f
|
@ -442,6 +442,8 @@ typedef struct SIntervalAggOperatorInfo {
|
|||
SArray* pDelWins; // SWinRes
|
||||
int32_t delIndex;
|
||||
SSDataBlock* pDelRes;
|
||||
|
||||
SNode *pCondition;
|
||||
} SIntervalAggOperatorInfo;
|
||||
|
||||
typedef struct SStreamFinalIntervalOperatorInfo {
|
||||
|
|
|
@ -1208,10 +1208,17 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
|
||||
if (pBlock->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
while (1) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
doFilter(pInfo->pCondition, pBlock);
|
||||
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
|
||||
if (!hasRemain) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
break;
|
||||
}
|
||||
if (pBlock->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
size_t rows = pBlock->info.rows;
|
||||
|
@ -1662,6 +1669,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
pInfo->execModel = pTaskInfo->execModel;
|
||||
pInfo->twAggSup = *pTwAggSupp;
|
||||
pInfo->ignoreExpiredData = pPhyNode->window.igExpired;
|
||||
pInfo->pCondition = pPhyNode->window.node.pConditions;
|
||||
|
||||
if (pPhyNode->window.pExprs != NULL) {
|
||||
int32_t numOfScalar = 0;
|
||||
|
|
Loading…
Reference in New Issue