Merge pull request #14651 from taosdata/szhou/feature/push-project-condition
feat: add filter to operator
This commit is contained in:
commit
5109994cab
|
@ -518,6 +518,7 @@ typedef struct SIndefOperatorInfo {
|
|||
SAggSupporter aggSup;
|
||||
SArray* pPseudoColInfo;
|
||||
SExprSupp scalarSup;
|
||||
SNode* pCondition;
|
||||
} SIndefOperatorInfo;
|
||||
|
||||
typedef struct SFillOperatorInfo {
|
||||
|
|
|
@ -3371,6 +3371,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
|||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||
SSDataBlock* pResBlock = pInfo->pRes;
|
||||
|
||||
blockDataCleanup(pResBlock);
|
||||
|
||||
// todo handle different group data interpolation
|
||||
bool n = false;
|
||||
bool* newgroup = &n;
|
||||
|
@ -3439,16 +3441,13 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
|
|||
SFillOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||
SSDataBlock* pResBlock = pInfo->pRes;
|
||||
|
||||
blockDataCleanup(pResBlock);
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SSDataBlock* fillResult = NULL;
|
||||
while (true) {
|
||||
SSDataBlock* fillResult = doFillImpl(pOperator);
|
||||
fillResult = doFillImpl(pOperator);
|
||||
if (fillResult != NULL) {
|
||||
doFilter(pInfo->pCondition, fillResult);
|
||||
}
|
||||
|
@ -3463,10 +3462,12 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
size_t rows = pResBlock->info.rows;
|
||||
pOperator->resultInfo.totalRows += rows;
|
||||
if (fillResult != NULL) {
|
||||
size_t rows = fillResult->info.rows;
|
||||
pOperator->resultInfo.totalRows += rows;
|
||||
}
|
||||
|
||||
return (rows == 0)? NULL:pResBlock;
|
||||
return fillResult;
|
||||
}
|
||||
|
||||
static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
|
||||
|
@ -3861,6 +3862,8 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
doFilter(pIndefInfo->pCondition, pInfo->pRes);
|
||||
|
||||
size_t rows = pInfo->pRes->info.rows;
|
||||
pOperator->resultInfo.totalRows += rows;
|
||||
|
||||
|
@ -3914,6 +3917,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
|||
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
|
||||
pInfo->pCondition = pPhyNode->node.pConditions;
|
||||
|
||||
pOperator->name = "IndefinitOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
|
||||
|
|
Loading…
Reference in New Issue