Merge pull request #14645 from taosdata/szhou/feature/push-project-condition
feat: add filter to fill operator
This commit is contained in:
commit
fd48d2b68a
|
@ -528,6 +528,7 @@ typedef struct SFillOperatorInfo {
|
|||
SSDataBlock* existNewGroupBlock;
|
||||
bool multigroupResult;
|
||||
STimeWindow win;
|
||||
SNode* pCondition;
|
||||
} SFillOperatorInfo;
|
||||
|
||||
typedef struct SGroupbyOperatorInfo {
|
||||
|
|
|
@ -3364,18 +3364,13 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInf
|
|||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
|
||||
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
||||
SFillOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||
SSDataBlock* pResBlock = pInfo->pRes;
|
||||
|
||||
blockDataCleanup(pResBlock);
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// todo handle different group data interpolation
|
||||
bool n = false;
|
||||
bool* newgroup = &n;
|
||||
|
@ -3440,6 +3435,40 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
|
||||
SFillOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||
SSDataBlock* pResBlock = pInfo->pRes;
|
||||
|
||||
blockDataCleanup(pResBlock);
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
SSDataBlock* fillResult = doFillImpl(pOperator);
|
||||
if (fillResult != NULL) {
|
||||
doFilter(pInfo->pCondition, fillResult);
|
||||
}
|
||||
|
||||
if (fillResult == NULL) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
break;
|
||||
}
|
||||
|
||||
if (fillResult->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
size_t rows = pResBlock->info.rows;
|
||||
pOperator->resultInfo.totalRows += rows;
|
||||
|
||||
return (rows == 0)? NULL:pResBlock;
|
||||
}
|
||||
|
||||
static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
|
||||
for (int32_t i = 0; i < numOfExprs; ++i) {
|
||||
SExprInfo* pExprInfo = &pExpr[i];
|
||||
|
@ -3958,6 +3987,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
|||
|
||||
pInfo->pRes = pResBlock;
|
||||
pInfo->multigroupResult = multigroupResult;
|
||||
pInfo->pCondition = pPhyFillNode->node.pConditions;
|
||||
pOperator->name = "FillOperator";
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
|
|
Loading…
Reference in New Issue