fix(query): enable filter in projection operator.
This commit is contained in:
parent
2b3ff4d888
commit
01e198231d
|
@ -497,7 +497,7 @@ typedef struct SProjectOperatorInfo {
|
|||
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
|
||||
SOptrBasicInfo binfo;
|
||||
SAggSupporter aggSup;
|
||||
|
||||
SNode* pFilterNode; // filter info, which is push down by optimizer
|
||||
SSDataBlock* existDataBlock;
|
||||
SArray* pPseudoColInfo;
|
||||
SLimit limit;
|
||||
|
@ -700,7 +700,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scan
|
|||
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
|
||||
|
||||
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchInfo);
|
||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
|
||||
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
|
||||
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols);
|
||||
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
|
||||
|
@ -732,7 +732,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
|||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
|
||||
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SNode* pCondition, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SArray* pIndexMap, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
|
|
|
@ -1824,7 +1824,8 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
|
|||
}
|
||||
|
||||
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep);
|
||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchInfo) {
|
||||
|
||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
|
||||
if (pFilterNode == NULL) {
|
||||
return;
|
||||
}
|
||||
|
@ -3669,6 +3670,8 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
|||
longjmp(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
doFilter(pProjectInfo->pFilterNode, pBlock);
|
||||
|
||||
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, scanFlag, false);
|
||||
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
|
||||
|
||||
|
@ -4044,7 +4047,7 @@ static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols)
|
|||
}
|
||||
|
||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num,
|
||||
SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit,
|
||||
SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SNode* pCondition,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
@ -4056,8 +4059,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
|
|||
pInfo->slimit = *pSlimit;
|
||||
pInfo->curOffset = pLimit->offset;
|
||||
pInfo->curSOffset = pSlimit->offset;
|
||||
|
||||
pInfo->binfo.pRes = pResBlock;
|
||||
pInfo->pFilterNode= pCondition;
|
||||
|
||||
int32_t numOfCols = num;
|
||||
int32_t numOfRows = 4096;
|
||||
|
@ -4486,7 +4489,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset};
|
||||
SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset};
|
||||
pOptr = createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo);
|
||||
pOptr = createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit, pProjPhyNode->node.pConditions, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) {
|
||||
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
|
||||
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
|
||||
|
|
|
@ -341,7 +341,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
|||
|
||||
while(1) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
doFilter(pInfo->pCondition, pRes, NULL);
|
||||
doFilter(pInfo->pCondition, pRes);
|
||||
|
||||
bool hasRemain = hashRemainDataInGroupInfo(&pInfo->groupResInfo);
|
||||
if (!hasRemain) {
|
||||
|
|
|
@ -254,7 +254,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
|||
}
|
||||
|
||||
int64_t st = taosGetTimestampMs();
|
||||
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo);
|
||||
doFilter(pTableScanInfo->pFilterNode, pBlock);
|
||||
|
||||
int64_t et = taosGetTimestampMs();
|
||||
pTableScanInfo->readRecorder.filterTime += (et - st);
|
||||
|
@ -859,7 +859,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes);
|
||||
}
|
||||
|
||||
doFilter(pInfo->pCondition, pInfo->pRes, NULL);
|
||||
doFilter(pInfo->pCondition, pInfo->pRes);
|
||||
blockDataUpdateTsWindow(pInfo->pRes, 0);
|
||||
break;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue