feat: add filter to fill operator

This commit is contained in:
slzhou 2022-07-07 14:59:52 +08:00
parent cddc989808
commit 9eb4f16792
1 changed files with 9 additions and 8 deletions

View File

@ -3371,6 +3371,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
SResultInfo* pResultInfo = &pOperator->resultInfo; SResultInfo* pResultInfo = &pOperator->resultInfo;
SSDataBlock* pResBlock = pInfo->pRes; SSDataBlock* pResBlock = pInfo->pRes;
blockDataCleanup(pResBlock);
// todo handle different group data interpolation // todo handle different group data interpolation
bool n = false; bool n = false;
bool* newgroup = &n; bool* newgroup = &n;
@ -3439,16 +3441,13 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
SFillOperatorInfo* pInfo = pOperator->info; SFillOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SResultInfo* pResultInfo = &pOperator->resultInfo;
SSDataBlock* pResBlock = pInfo->pRes;
blockDataCleanup(pResBlock);
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
SSDataBlock* fillResult = NULL;
while (true) { while (true) {
SSDataBlock* fillResult = doFillImpl(pOperator); fillResult = doFillImpl(pOperator);
if (fillResult != NULL) { if (fillResult != NULL) {
doFilter(pInfo->pCondition, fillResult); doFilter(pInfo->pCondition, fillResult);
} }
@ -3463,10 +3462,12 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
} }
} }
size_t rows = pResBlock->info.rows; if (fillResult != NULL) {
pOperator->resultInfo.totalRows += rows; size_t rows = fillResult->info.rows;
pOperator->resultInfo.totalRows += rows;
}
return (rows == 0)? NULL:pResBlock; return fillResult;
} }
static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) { static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {