diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index ebb7b50692..5d0121aa15 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1172,11 +1172,7 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pColumn->nullbitmap = tmp; memset(&pColumn->nullbitmap[oldLen], 0, BitmapLen(numOfRows) - oldLen); - if (pColumn->info.type == TSDB_DATA_TYPE_NULL) { - return TSDB_CODE_SUCCESS; - } - - assert(pColumn->info.bytes); + ASSERT(pColumn->info.bytes); tmp = taosMemoryRealloc(pColumn->pData, numOfRows * pColumn->info.bytes); if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e20c8ee955..dedf1630af 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -305,7 +305,6 @@ typedef struct STableScanInfo { SFileBlockLoadRecorder readRecorder; SScanInfo scanInfo; int32_t scanTimes; - SNode* pFilterNode; // filter info, which is push down by optimizer SSDataBlock* pResBlock; SColMatchInfo matchInfo; SExprSupp pseudoSup; @@ -342,7 +341,6 @@ typedef struct STableMergeScanInfo { int64_t numOfRows; SScanInfo scanInfo; int32_t scanTimes; - SNode* pFilterNode; // filter info, which is push down by optimizer SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context SResultRowInfo* pResultRowInfo; int32_t* rowEntryInfoOffset; @@ -460,7 +458,6 @@ typedef struct SStreamScanInfo { SReadHandle readHandle; SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SColMatchInfo matchInfo; - SNode* pCondition; SArray* pBlockLists; // multiple SSDatablock. SSDataBlock* pRes; // result SSDataBlock @@ -564,7 +561,6 @@ typedef struct SIntervalAggOperatorInfo { EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] STimeWindowAggSupp twAggSup; SArray* pPrevValues; // SArray used to keep the previous not null value for interpolation. - SNode* pCondition; } SIntervalAggOperatorInfo; typedef struct SMergeAlignedIntervalAggOperatorInfo { @@ -573,12 +569,10 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo { uint64_t groupId; // current groupId int64_t curTs; // current ts SSDataBlock* prefetchedBlock; - SNode* pCondition; SResultRow* pResultRow; } SMergeAlignedIntervalAggOperatorInfo; typedef struct SStreamIntervalOperatorInfo { - // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; // basic info SAggSupporter aggSup; // aggregate supporter SExprSupp scalarSupp; // supporter for perform scalar function @@ -604,26 +598,21 @@ typedef struct SStreamIntervalOperatorInfo { } SStreamIntervalOperatorInfo; typedef struct SAggOperatorInfo { - // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode - SOptrBasicInfo binfo; - SAggSupporter aggSup; - + SOptrBasicInfo binfo; + SAggSupporter aggSup; STableQueryInfo* current; uint64_t groupId; SGroupResInfo groupResInfo; SExprSupp scalarExprSup; - SNode* pCondition; } SAggOperatorInfo; typedef struct SProjectOperatorInfo { SOptrBasicInfo binfo; SAggSupporter aggSup; - SNode* pFilterNode; // filter info, which is push down by optimizer SArray* pPseudoColInfo; SLimitInfo limitInfo; bool mergeDataBlocks; SSDataBlock* pFinalRes; - SNode* pCondition; } SProjectOperatorInfo; typedef struct SIndefOperatorInfo { @@ -631,10 +620,8 @@ typedef struct SIndefOperatorInfo { SAggSupporter aggSup; SArray* pPseudoColInfo; SExprSupp scalarSup; - SNode* pCondition; uint64_t groupId; - - SSDataBlock* pNextGroupRes; + SSDataBlock* pNextGroupRes; } SIndefOperatorInfo; typedef struct SFillOperatorInfo { @@ -645,7 +632,6 @@ typedef struct SFillOperatorInfo { void** p; SSDataBlock* existNewGroupBlock; STimeWindow win; - SNode* pCondition; SColMatchInfo matchInfo; int32_t primaryTsCol; int32_t primarySrcSlotId; @@ -660,7 +646,6 @@ typedef struct SGroupbyOperatorInfo { SAggSupporter aggSup; SArray* pGroupCols; // group by columns, SArray SArray* pGroupColVals; // current group column values, SArray - SNode* pCondition; bool isInit; // denote if current val is initialized or not char* keyBuf; // group by keys for hash int32_t groupKeyLen; // total group by column width @@ -710,7 +695,6 @@ typedef struct SSessionAggOperatorInfo { int64_t gap; // session window gap int32_t tsSlotId; // primary timestamp slot id STimeWindowAggSupp twAggSup; - const SNode* pCondition; } SSessionAggOperatorInfo; typedef struct SResultWindowInfo { @@ -784,7 +768,6 @@ typedef struct SStreamFillOperatorInfo { SSDataBlock* pSrcDelBlock; int32_t srcDelRowIndex; SSDataBlock* pDelRes; - SNode* pCondition; SColMatchInfo matchInfo; int32_t primaryTsCol; int32_t primarySrcSlotId; @@ -821,7 +804,6 @@ typedef struct SStateWindowOperatorInfo { SStateKeys stateKey; int32_t tsSlotId; // primary timestamp column slot id STimeWindowAggSupp twAggSup; - const SNode* pCondition; } SStateWindowOperatorInfo; typedef struct SSortOperatorInfo { @@ -834,7 +816,6 @@ typedef struct SSortOperatorInfo { int64_t startTs; // sort start time uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. SLimitInfo limitInfo; - SNode* pCondition; } SSortOperatorInfo; typedef struct SJoinOperatorInfo { @@ -895,7 +876,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); void doSetOperatorCompleted(SOperatorInfo* pOperator); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo); +void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock, int32_t rows, const char* idStr); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index deb628f30e..7e93232e0c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1039,39 +1039,24 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo) { - if (pFilterNode == NULL || pBlock->info.rows == 0) { +void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) { + if (pFilterInfo == NULL || pBlock->info.rows == 0) { return; } - SFilterInfo* filter = pFilterInfo; - int64_t st = taosGetTimestampUs(); - - // todo move to the initialization function - int32_t code = 0; - bool needFree = false; - if (filter == NULL) { - needFree = true; - code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); - } - SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; - code = filterSetDataFromSlotId(filter, ¶m1); + int32_t code = filterSetDataFromSlotId(pFilterInfo, ¶m1); SColumnInfoData* p = NULL; int32_t status = 0; // todo the keep seems never to be True?? - bool keep = filterExecute(filter, pBlock, &p, NULL, param1.numOfCols, &status); - - if (needFree) { - filterFreeInfo(filter); - } - + bool keep = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status); extractQualifiedTupleByFilterResult(pBlock, p, keep, status); if (pColMatchInfo != NULL) { - for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo->pList); ++i) { + size_t size = taosArrayGetSize(pColMatchInfo->pList); + for (int32_t i = 0; i < size; ++i) { SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i); if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId); @@ -2393,7 +2378,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); - doFilter(pAggInfo->pCondition, pInfo->pRes, NULL, NULL); + doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); if (!hasRemainResults(&pAggInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); @@ -2726,7 +2711,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { break; } - doFilter(pInfo->pCondition, fillResult, &pInfo->matchInfo, NULL); + doFilter(fillResult, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo ); if (fillResult->info.rows > 0) { break; } @@ -2945,9 +2930,13 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN goto _error; } + code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock; pInfo->groupId = UINT64_MAX; - pInfo->pCondition = pAggNode->node.pConditions; pOperator->name = "TableAggregate"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG; pOperator->blocking = true; @@ -3173,7 +3162,11 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pInfo->pFinalRes = createOneDataBlock(pInfo->pRes, false); blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity); - pInfo->pCondition = pPhyFillNode->node.pConditions; + code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pOperator->name = "FillOperator"; pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index ecc9c7e7dd..891eb6e7a4 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "filter.h" #include "function.h" #include "os.h" #include "tname.h" @@ -312,7 +313,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->binfo.pRes; while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pRes, NULL, NULL); + doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); if (!hasRemainResults(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); @@ -411,8 +412,6 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* } pInfo->pGroupCols = extractColumnInfo(pAggNode->pGroupKeys); - pInfo->pCondition = pAggNode->node.pConditions; - int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -433,6 +432,11 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* goto _error; } + code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + initResultRowInfo(&pInfo->binfo.resultRowInfo); pOperator->name = "GroupbyAggOperator"; diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 8dac44684f..45d76dce74 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "filter.h" #include "executorimpl.h" #include "function.h" #include "os.h" @@ -108,6 +109,11 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pInfo->pCondAfterMerge = NULL; } + code = filterInitFromNode(pInfo->pCondAfterMerge, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pInfo->inputOrder = TSDB_ORDER_ASC; if (pJoinNode->inputTsOrder == ORDER_ASC) { pInfo->inputOrder = TSDB_ORDER_ASC; @@ -400,8 +406,8 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { if (numOfNewRows == 0) { break; } - if (pJoinInfo->pCondAfterMerge != NULL) { - doFilter(pJoinInfo->pCondAfterMerge, pRes, NULL, NULL); + if (pOperator->exprSupp.pFilterInfo != NULL) { + doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); } if (pRes->info.rows >= pOperator->resultInfo.threshold) { break; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index ba076233aa..b2865d15f0 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "filter.h" #include "executorimpl.h" #include "functionMgt.h" @@ -71,13 +72,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys pInfo->binfo.pRes = pResBlock; pInfo->pFinalRes = createOneDataBlock(pResBlock, false); - pInfo->pFilterNode = pProjPhyNode->node.pConditions; - - if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) { - pInfo->mergeDataBlocks = false; - } else { - pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock; - } + pInfo->mergeDataBlocks = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM)? false:pProjPhyNode->mergeDataBlock; int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -97,6 +92,11 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys initBasicInfo(&pInfo->binfo, pResBlock); setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols); + code = filterInitFromNode((SNode*)pProjPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols); pOperator->name = "ProjectOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; @@ -313,7 +313,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } // do apply filter - doFilter(pProjectInfo->pFilterNode, pFinalRes, NULL, NULL); + doFilter(pFinalRes, pOperator->exprSupp.pFilterInfo, NULL); // when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint. if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) { @@ -323,7 +323,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } else { // do apply filter if (pRes->info.rows > 0) { - doFilter(pProjectInfo->pFilterNode, pRes, NULL, NULL); + doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); if (pRes->info.rows == 0) { continue; } @@ -392,7 +392,12 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy } setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr); - pInfo->pCondition = pPhyNode->node.pConditions; + code = filterInitFromNode((SNode*)pPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + pInfo->binfo.pRes = pResBlock; pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); pOperator->name = "IndefinitOperator"; @@ -515,7 +520,7 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { } } - doFilter(pIndefInfo->pCondition, pInfo->pRes, NULL, NULL); + doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); size_t rows = pInfo->pRes->info.rows; if (rows > 0 || pOperator->status == OP_EXEC_DONE) { break; @@ -617,7 +622,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) { } pRes->info.rows = 1; - doFilter(pProjectInfo->pFilterNode, pRes, NULL, NULL); + doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); /*int32_t status = */ doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 937069b5cc..447bd801bd 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -115,8 +115,10 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrChildTable, const char* dbname, const char* tableName, int32_t* pNumOfRows, const SSDataBlock* dataBlock); -static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock); -bool processBlockWithProbability(const SSampleExecInfo* pInfo) { +static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock, + SFilterInfo* pFilterInfo); + +bool processBlockWithProbability(const SSampleExecInfo* pInfo) { #if 0 if (pInfo->sampleRatio == 1) { return true; @@ -280,19 +282,13 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* return TSDB_CODE_SUCCESS; } -static bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDataAgg** pColsAgg, int32_t numOfCols, +static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols, int32_t numOfRows) { - if (pColsAgg == NULL || pFilterNode == NULL) { + if (pColsAgg == NULL || pFilterInfo == NULL) { return true; } - SFilterInfo* filter = NULL; - - // todo move to the initialization function - int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); - bool keep = filterRangeExecute(filter, pColsAgg, numOfCols, numOfRows); - - filterFreeInfo(filter); + bool keep = filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows); return keep; } @@ -386,7 +382,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca bool loadSMA = false; *status = pInfo->dataBlockLoadFlag; - if (pTableScanInfo->pFilterNode != NULL || + if (pOperator->exprSupp.pFilterInfo != NULL || overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) { (*status) = FUNC_DATA_REQUIRED_DATA_LOAD; } @@ -424,11 +420,11 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD); // try to filter data block according to sma info - if (pTableScanInfo->pFilterNode != NULL && (!loadSMA)) { + if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) { bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo); if (success) { size_t size = taosArrayGetSize(pBlock->pDataBlock); - bool keep = doFilterByBlockSMA(pTableScanInfo->pFilterNode, pBlock->pBlockAgg, size, pBlockInfo->rows); + bool keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows); if (!keep) { qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); @@ -468,9 +464,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca // restore the previous value pCost->totalRows -= pBlock->info.rows; - if (pTableScanInfo->pFilterNode != NULL) { + if (pOperator->exprSupp.pFilterInfo != NULL) { int64_t st = taosGetTimestampUs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, pOperator->exprSupp.pFilterInfo); + doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo); double el = (taosGetTimestampUs() - st) / 1000.0; pTableScanInfo->readRecorder.filterTime += el; @@ -861,12 +857,9 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->pResBlock = createResDataBlock(pDescNode); blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); - pInfo->pFilterNode = pScanNode->node.pConditions; - if (pInfo->pFilterNode != NULL) { - code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; } pInfo->scanFlag = MAIN_SCAN; @@ -1295,7 +1288,7 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32 return NULL; } - doFilter(pInfo->pCondition, pResult, NULL, NULL); + doFilter(pResult, pInfo->pTableScanOp->exprSupp.pFilterInfo, NULL); if (pResult->info.rows == 0) { continue; } @@ -1639,7 +1632,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock } if (filter) { - doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL); + doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); } blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); @@ -2091,7 +2084,7 @@ FETCH_NEXT_BLOCK: } } - doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL); + doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { @@ -2456,9 +2449,13 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); } + code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pInfo->pRes = createResDataBlock(pDescNode); pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); - pInfo->pCondition = pScanPhyNode->node.pConditions; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; pInfo->groupId = 0; @@ -2599,13 +2596,13 @@ static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) { return TSDB_CODE_SUCCESS; } -static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { - if (pInfo->pCondition == NULL) { - return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; +static SSDataBlock* doFilterResult(SSDataBlock* pDataBlock, SFilterInfo* pFilterInfo) { + if (pFilterInfo == NULL) { + return pDataBlock->info.rows == 0 ? NULL : pDataBlock; } - doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL); - return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; + doFilter(pDataBlock, pFilterInfo, NULL); + return pDataBlock->info.rows == 0 ? NULL : pDataBlock; } static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName) { @@ -2810,7 +2807,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { metaReaderClear(&smrSuperTable); metaReaderClear(&smrChildTable); if (numOfRows > 0) { - relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock); + relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo); numOfRows = 0; } blockDataDestroy(dataBlock); @@ -2850,7 +2847,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { metaReaderClear(&smrSuperTable); if (numOfRows >= pOperator->resultInfo.capacity) { - relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock); + relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo); numOfRows = 0; if (pInfo->pRes->info.rows > 0) { @@ -2860,7 +2857,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { } if (numOfRows > 0) { - relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock); + relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo); numOfRows = 0; } @@ -2875,13 +2872,13 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; } -static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock) { +void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock, + SFilterInfo* pFilterInfo) { dataBlock->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows; relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, dataBlock->pDataBlock, false); - doFilterResult(pInfo); - + doFilterResult(pInfo->pRes, pFilterInfo); blockDataCleanup(dataBlock); } @@ -3635,7 +3632,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { pInfo->pRes->info.rows = numOfRows; relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); - doFilterResult(pInfo); + doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); blockDataCleanup(p); numOfRows = 0; @@ -3651,7 +3648,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { pInfo->pRes->info.rows = numOfRows; relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); - doFilterResult(pInfo); + doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); blockDataCleanup(p); numOfRows = 0; @@ -3668,6 +3665,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; } + static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -3811,7 +3809,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { pInfo->pRes->info.rows = numOfRows; relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); - doFilterResult(pInfo); + doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); blockDataCleanup(p); numOfRows = 0; @@ -3827,7 +3825,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { pInfo->pRes->info.rows = numOfRows; relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); - doFilterResult(pInfo); + doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); blockDataCleanup(p); numOfRows = 0; @@ -3858,8 +3856,7 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { // the retrieve is executed on the mnode, so return tables that belongs to the information schema database. if (pInfo->readHandle.mnd != NULL) { buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity); - - doFilterResult(pInfo); + doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; doSetOperatorCompleted(pOperator); @@ -3994,7 +3991,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator); // todo log the filter info - doFilterResult(pInfo); + doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); taosMemoryFree(pRsp); if (pInfo->pRes->info.rows > 0) { return pInfo->pRes; @@ -4092,9 +4089,15 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan pInfo->sysInfo = pScanPhyNode->sysInfo; pInfo->showRewrite = pScanPhyNode->showRewrite; pInfo->pRes = createResDataBlock(pDescNode); + pInfo->pCondition = pScanNode->node.pConditions; + code = filterInitFromNode(pScanNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } initResultSizeInfo(&pOperator->resultInfo, 4096); + blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); tNameAssign(&pInfo->name, &pScanNode->tableName); const char* name = tNameGetTableName(&pInfo->name); @@ -4102,7 +4105,6 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0) { pInfo->readHandle = *(SReadHandle*)readHandle; - blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); } else { tsem_init(&pInfo->ready, 0, 0); pInfo->epSet = pScanPhyNode->mgmtEpSet; @@ -4118,7 +4120,6 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL); - return pOperator; _error: @@ -4126,7 +4127,7 @@ _error: destroySysScanOperator(pInfo); } taosMemoryFreeClear(pOperator); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + pTaskInfo->code = code; return NULL; } @@ -4280,7 +4281,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc pCost->totalRows += pBlock->info.rows; *status = pInfo->dataBlockLoadFlag; - if (pTableScanInfo->pFilterNode != NULL || + if (pOperator->exprSupp.pFilterInfo != NULL || overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) { (*status) = FUNC_DATA_REQUIRED_DATA_LOAD; } @@ -4366,9 +4367,9 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc T_LONG_JMP(pTaskInfo->env, code); } - if (pTableScanInfo->pFilterNode != NULL) { + if (pOperator->exprSupp.pFilterInfo!= NULL) { int64_t st = taosGetTimestampMs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, NULL); + doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo); double el = (taosGetTimestampUs() - st) / 1000.0; pTableScanInfo->readRecorder.filterTime += el; @@ -4741,7 +4742,13 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.seed = taosGetTimestampSec(); pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; - pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; + + + code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pInfo->tableListInfo = pTableListInfo; pInfo->scanFlag = MAIN_SCAN; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 7abf05e7d6..2f41ee1495 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "filter.h" #include "executorimpl.h" #include "tdatablock.h" @@ -42,12 +43,14 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); - initResultSizeInfo(&pOperator->resultInfo, 1024); + code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } pInfo->binfo.pRes = pResBlock; pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); - pInfo->pCondition = pSortNode->node.pConditions; initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo); pOperator->name = "SortOperator"; @@ -215,7 +218,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { return NULL; } - doFilter(pInfo->pCondition, pBlock, &pInfo->matchInfo, NULL); + doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo); if (blockDataGetNumOfRows(pBlock) == 0) { continue; } diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 23847928da..ebc3a962d3 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "filter.h" #include "os.h" #include "query.h" #include "taosdef.h" @@ -1499,7 +1500,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { } doStreamFillImpl(pOperator); - doFilter(pInfo->pCondition, pInfo->pRes, &pInfo->matchInfo, NULL); + doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo); memcpy(pInfo->pRes->info.parTbName, pInfo->pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN); pOperator->resultInfo.totalRows += pInfo->pRes->info.rows; if (pInfo->pRes->info.rows > 0) { @@ -1677,7 +1678,12 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi int32_t numOfOutputCols = 0; int32_t code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); - pInfo->pCondition = pPhyFillNode->node.pConditions; + + code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index a0d3de5676..88f3e4cff9 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -12,6 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include "filter.h" #include "executorimpl.h" #include "function.h" #include "functionMgt.h" @@ -1227,7 +1228,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL); + doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { @@ -1265,7 +1266,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBlock, NULL, NULL); + doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { @@ -1747,7 +1748,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh pInfo->interval = interval; pInfo->execModel = pTaskInfo->execModel; pInfo->twAggSup = as; - pInfo->pCondition = pPhyNode->window.node.pConditions; pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock; if (pPhyNode->window.pExprs != NULL) { @@ -1759,6 +1759,11 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh } } + code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + if (isStream) { ASSERT(num > 0); initStreamFunciton(pSup->pCtx, pSup->numOfExprs); @@ -1883,7 +1888,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_RES_TO_RETURN) { while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL); + doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { @@ -1926,7 +1931,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL); + doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { @@ -2600,17 +2605,22 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi pInfo->stateKey.type = pInfo->stateCol.type; pInfo->stateKey.bytes = pInfo->stateCol.bytes; pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes); - pInfo->pCondition = pStateNode->window.node.pConditions; if (pInfo->stateKey.pData == NULL) { goto _error; } + int32_t code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); initResultSizeInfo(&pOperator->resultInfo, 4096); - int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); + + code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2698,7 +2708,10 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW pInfo->binfo.pRes = pResBlock; pInfo->winSup.prevTs = INT64_MIN; pInfo->reptScan = false; - pInfo->pCondition = pSessionNode->window.node.pConditions; + code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } pOperator->name = "SessionWindowAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION; @@ -4877,7 +4890,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { setInputDataBlock(pSup, pBlock, pIaInfo->inputOrder, scanFlag, true); doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes); - doFilter(pMiaInfo->pCondition, pRes, NULL, NULL); + doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); if (pRes->info.rows >= pOperator->resultInfo.capacity) { break; } @@ -4940,7 +4953,11 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo; SExprSupp* pSup = &pOperator->exprSupp; - miaInfo->pCondition = pNode->window.node.pConditions; + int32_t code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + miaInfo->curTs = INT64_MIN; iaInfo->win = pTaskInfo->window; iaInfo->inputOrder = TSDB_ORDER_ASC; @@ -4954,7 +4971,8 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num); - int32_t code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); + + code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 4738e1bbf9..939be1809a 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -3994,9 +3994,12 @@ int32_t filterSetDataFromColId(SFilterInfo *info, void *param) { } int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pInfo, uint32_t options) { - int32_t code = 0; SFilterInfo *info = NULL; + if (pNode == NULL) { + return TSDB_CODE_SUCCESS; + } + int32_t code = 0; if (pNode == NULL || pInfo == NULL) { fltError("invalid param"); FLT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); @@ -4034,9 +4037,7 @@ int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pInfo, uint32_t options) _return: filterFreeInfo(*pInfo); - *pInfo = NULL; - FLT_RET(code); }