feat: add filter to fill operator

This commit is contained in:
slzhou 2022-07-07 14:30:08 +08:00
parent f3c3b0f148
commit cddc989808
2 changed files with 37 additions and 6 deletions

View File

@ -528,6 +528,7 @@ typedef struct SFillOperatorInfo {
SSDataBlock* existNewGroupBlock;
bool multigroupResult;
STimeWindow win;
SNode* pCondition;
} SFillOperatorInfo;
typedef struct SGroupbyOperatorInfo {

View File

@ -3364,18 +3364,13 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInf
}
}
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
SFillOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SResultInfo* pResultInfo = &pOperator->resultInfo;
SSDataBlock* pResBlock = pInfo->pRes;
blockDataCleanup(pResBlock);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
// todo handle different group data interpolation
bool n = false;
bool* newgroup = &n;
@ -3440,6 +3435,40 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
}
}
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
SFillOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SResultInfo* pResultInfo = &pOperator->resultInfo;
SSDataBlock* pResBlock = pInfo->pRes;
blockDataCleanup(pResBlock);
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
while (true) {
SSDataBlock* fillResult = doFillImpl(pOperator);
if (fillResult != NULL) {
doFilter(pInfo->pCondition, fillResult);
}
if (fillResult == NULL) {
doSetOperatorCompleted(pOperator);
break;
}
if (fillResult->info.rows > 0) {
break;
}
}
size_t rows = pResBlock->info.rows;
pOperator->resultInfo.totalRows += rows;
return (rows == 0)? NULL:pResBlock;
}
static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExprInfo = &pExpr[i];
@ -3958,6 +3987,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
pInfo->pRes = pResBlock;
pInfo->multigroupResult = multigroupResult;
pInfo->pCondition = pPhyFillNode->node.pConditions;
pOperator->name = "FillOperator";
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;