From 8c4be7dc215211c8350fa8ba628ac1d2e928c81d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 4 Nov 2022 22:13:40 +0800 Subject: [PATCH 1/4] refactor: do some internal refactor. --- source/libs/executor/inc/executorimpl.h | 23 +---- source/libs/executor/src/executorimpl.c | 45 ++++----- source/libs/executor/src/groupoperator.c | 2 +- source/libs/executor/src/joinoperator.c | 10 +- source/libs/executor/src/projectoperator.c | 27 ++--- source/libs/executor/src/scanoperator.c | 99 ++++++++++--------- source/libs/executor/src/sortoperator.c | 9 +- source/libs/executor/src/tfill.c | 10 +- source/libs/executor/src/timewindowoperator.c | 40 +++++--- source/libs/scalar/src/filter.c | 7 +- 10 files changed, 142 insertions(+), 130 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 14159c2da2..50b44d6524 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -305,7 +305,6 @@ typedef struct STableScanInfo { SFileBlockLoadRecorder readRecorder; SScanInfo scanInfo; int32_t scanTimes; - SNode* pFilterNode; // filter info, which is push down by optimizer SSDataBlock* pResBlock; SColMatchInfo matchInfo; SExprSupp pseudoSup; @@ -342,7 +341,6 @@ typedef struct STableMergeScanInfo { int64_t numOfRows; SScanInfo scanInfo; int32_t scanTimes; - SNode* pFilterNode; // filter info, which is push down by optimizer SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context SResultRowInfo* pResultRowInfo; int32_t* rowEntryInfoOffset; @@ -460,7 +458,6 @@ typedef struct SStreamScanInfo { SReadHandle readHandle; SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SColMatchInfo matchInfo; - SNode* pCondition; SArray* pBlockLists; // multiple SSDatablock. SSDataBlock* pRes; // result SSDataBlock @@ -565,22 +562,18 @@ typedef struct SIntervalAggOperatorInfo { EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] STimeWindowAggSupp twAggSup; SArray* pPrevValues; // SArray used to keep the previous not null value for interpolation. - SNode* pCondition; } SIntervalAggOperatorInfo; typedef struct SMergeAlignedIntervalAggOperatorInfo { SIntervalAggOperatorInfo* intervalAggOperatorInfo; - // bool hasGroupId; uint64_t groupId; // current groupId int64_t curTs; // current ts SSDataBlock* prefetchedBlock; - SNode* pCondition; SResultRow* pResultRow; } SMergeAlignedIntervalAggOperatorInfo; typedef struct SStreamIntervalOperatorInfo { - // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; // basic info SAggSupporter aggSup; // aggregate supporter SExprSupp scalarSupp; // supporter for perform scalar function @@ -606,26 +599,21 @@ typedef struct SStreamIntervalOperatorInfo { } SStreamIntervalOperatorInfo; typedef struct SAggOperatorInfo { - // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode - SOptrBasicInfo binfo; - SAggSupporter aggSup; - + SOptrBasicInfo binfo; + SAggSupporter aggSup; STableQueryInfo* current; uint64_t groupId; SGroupResInfo groupResInfo; SExprSupp scalarExprSup; - SNode* pCondition; } SAggOperatorInfo; typedef struct SProjectOperatorInfo { SOptrBasicInfo binfo; SAggSupporter aggSup; - SNode* pFilterNode; // filter info, which is push down by optimizer SArray* pPseudoColInfo; SLimitInfo limitInfo; bool mergeDataBlocks; SSDataBlock* pFinalRes; - SNode* pCondition; } SProjectOperatorInfo; typedef struct SIndefOperatorInfo { @@ -647,7 +635,6 @@ typedef struct SFillOperatorInfo { void** p; SSDataBlock* existNewGroupBlock; STimeWindow win; - SNode* pCondition; SColMatchInfo matchInfo; int32_t primaryTsCol; int32_t primarySrcSlotId; @@ -712,7 +699,6 @@ typedef struct SSessionAggOperatorInfo { int64_t gap; // session window gap int32_t tsSlotId; // primary timestamp slot id STimeWindowAggSupp twAggSup; - const SNode* pCondition; } SSessionAggOperatorInfo; typedef struct SResultWindowInfo { @@ -786,7 +772,6 @@ typedef struct SStreamFillOperatorInfo { SSDataBlock* pSrcDelBlock; int32_t srcDelRowIndex; SSDataBlock* pDelRes; - SNode* pCondition; SColMatchInfo matchInfo; int32_t primaryTsCol; int32_t primarySrcSlotId; @@ -823,7 +808,6 @@ typedef struct SStateWindowOperatorInfo { SStateKeys stateKey; int32_t tsSlotId; // primary timestamp column slot id STimeWindowAggSupp twAggSup; - const SNode* pCondition; } SStateWindowOperatorInfo; typedef struct SSortOperatorInfo { @@ -836,7 +820,6 @@ typedef struct SSortOperatorInfo { int64_t startTs; // sort start time uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. SLimitInfo limitInfo; - SNode* pCondition; } SSortOperatorInfo; typedef struct STagFilterOperatorInfo { @@ -901,7 +884,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); void doSetOperatorCompleted(SOperatorInfo* pOperator); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo); +void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock, int32_t rows, const char* idStr); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 26abc2b90b..71d94822a1 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1039,41 +1039,24 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo) { - if (pFilterNode == NULL || pBlock->info.rows == 0) { +void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) { + if (pFilterInfo == NULL || pBlock->info.rows == 0) { return; } - SFilterInfo* filter = pFilterInfo; - int64_t st = taosGetTimestampUs(); - - // pError("start filter"); - - // todo move to the initialization function - int32_t code = 0; - bool needFree = false; - if (filter == NULL) { - needFree = true; - code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); - } - SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; - code = filterSetDataFromSlotId(filter, ¶m1); + int32_t code = filterSetDataFromSlotId(pFilterInfo, ¶m1); SColumnInfoData* p = NULL; int32_t status = 0; // todo the keep seems never to be True?? - bool keep = filterExecute(filter, pBlock, &p, NULL, param1.numOfCols, &status); - - if (needFree) { - filterFreeInfo(filter); - } - + bool keep = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status); extractQualifiedTupleByFilterResult(pBlock, p, keep, status); if (pColMatchInfo != NULL) { - for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo->pList); ++i) { + size_t size = taosArrayGetSize(pColMatchInfo->pList); + for (int32_t i = 0; i < size; ++i) { SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i); if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId); @@ -2395,7 +2378,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); - doFilter(pAggInfo->pCondition, pInfo->pRes, NULL, NULL); + doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); if (!hasRemainResults(&pAggInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); @@ -2728,7 +2711,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { break; } - doFilter(pInfo->pCondition, fillResult, &pInfo->matchInfo, NULL); + doFilter(fillResult, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo ); if (fillResult->info.rows > 0) { break; } @@ -2947,9 +2930,13 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN goto _error; } + code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock; pInfo->groupId = UINT64_MAX; - pInfo->pCondition = pAggNode->node.pConditions; pOperator->name = "TableAggregate"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG; pOperator->blocking = true; @@ -3175,7 +3162,11 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pInfo->pFinalRes = createOneDataBlock(pInfo->pRes, false); blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity); - pInfo->pCondition = pPhyFillNode->node.pConditions; + code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pOperator->name = "FillOperator"; pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index e07a3475e0..88f9ad382d 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -312,7 +312,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->binfo.pRes; while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pRes, NULL, NULL); + doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); if (!hasRemainResults(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 8dac44684f..11c4ee5141 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include #include "executorimpl.h" #include "function.h" #include "os.h" @@ -108,6 +109,11 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pInfo->pCondAfterMerge = NULL; } + code = filterInitFromNode(pInfo->pCondAfterMerge, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pInfo->inputOrder = TSDB_ORDER_ASC; if (pJoinNode->inputTsOrder == ORDER_ASC) { pInfo->inputOrder = TSDB_ORDER_ASC; @@ -400,8 +406,8 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { if (numOfNewRows == 0) { break; } - if (pJoinInfo->pCondAfterMerge != NULL) { - doFilter(pJoinInfo->pCondAfterMerge, pRes, NULL, NULL); + if (pOperator->exprSupp.pFilterInfo != NULL) { + doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); } if (pRes->info.rows >= pOperator->resultInfo.threshold) { break; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 4e4c33d4c3..c5fbee973d 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include #include "executorimpl.h" #include "functionMgt.h" @@ -71,13 +72,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys pInfo->binfo.pRes = pResBlock; pInfo->pFinalRes = createOneDataBlock(pResBlock, false); - pInfo->pFilterNode = pProjPhyNode->node.pConditions; - - if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) { - pInfo->mergeDataBlocks = false; - } else { - pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock; - } + pInfo->mergeDataBlocks = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM)? false:pProjPhyNode->mergeDataBlock; int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -97,6 +92,11 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys initBasicInfo(&pInfo->binfo, pResBlock); setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols); + code = filterInitFromNode((SNode*)pProjPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols); pOperator->name = "ProjectOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; @@ -313,7 +313,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } // do apply filter - doFilter(pProjectInfo->pFilterNode, pFinalRes, NULL, NULL); + doFilter(pFinalRes, pOperator->exprSupp.pFilterInfo, NULL); // when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint. if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) { @@ -323,7 +323,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } else { // do apply filter if (pRes->info.rows > 0) { - doFilter(pProjectInfo->pFilterNode, pRes, NULL, NULL); + doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); if (pRes->info.rows == 0) { continue; } @@ -391,9 +391,12 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy } setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr); + code = filterInitFromNode((SNode*)pPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } pInfo->binfo.pRes = pResBlock; - pInfo->pCondition = pPhyNode->node.pConditions; pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); pOperator->name = "IndefinitOperator"; @@ -516,7 +519,7 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { } } - doFilter(pIndefInfo->pCondition, pInfo->pRes, NULL, NULL); + doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); size_t rows = pInfo->pRes->info.rows; if (rows > 0 || pOperator->status == OP_EXEC_DONE) { break; @@ -618,7 +621,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) { } pRes->info.rows = 1; - doFilter(pProjectInfo->pFilterNode, pRes, NULL, NULL); + doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); /*int32_t status = */ doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3352b2685a..7a2a58f0c3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -103,8 +103,10 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, SMetaReader* smrChildTable, const char* dbname, const char* tableName, int32_t* pNumOfRows, const SSDataBlock* dataBlock); -static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock); -bool processBlockWithProbability(const SSampleExecInfo* pInfo) { +static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock, + SFilterInfo* pFilterInfo); + +bool processBlockWithProbability(const SSampleExecInfo* pInfo) { #if 0 if (pInfo->sampleRatio == 1) { return true; @@ -291,19 +293,13 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* return TSDB_CODE_SUCCESS; } -static FORCE_INLINE bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDataAgg** pColsAgg, int32_t numOfCols, +static FORCE_INLINE bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols, int32_t numOfRows) { - if (pColsAgg == NULL || pFilterNode == NULL) { + if (pColsAgg == NULL || pFilterInfo == NULL) { return true; } - SFilterInfo* filter = NULL; - - // todo move to the initialization function - int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); - bool keep = filterRangeExecute(filter, pColsAgg, numOfCols, numOfRows); - - filterFreeInfo(filter); + bool keep = filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows); return keep; } @@ -408,7 +404,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca bool loadSMA = false; *status = pInfo->dataBlockLoadFlag; - if (pTableScanInfo->pFilterNode != NULL || + if (pOperator->exprSupp.pFilterInfo != NULL || overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) { (*status) = FUNC_DATA_REQUIRED_DATA_LOAD; } @@ -454,11 +450,11 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD); // try to filter data block according to sma info - if (pTableScanInfo->pFilterNode != NULL && (!loadSMA)) { + if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) { bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo); if (success) { size_t size = taosArrayGetSize(pBlock->pDataBlock); - bool keep = doFilterByBlockSMA(pTableScanInfo->pFilterNode, pBlock->pBlockAgg, size, pBlockInfo->rows); + bool keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows); if (!keep) { qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); @@ -499,9 +495,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca // restore the previous value pCost->totalRows -= pBlock->info.rows; - if (pTableScanInfo->pFilterNode != NULL) { + if (pOperator->exprSupp.pFilterInfo != NULL) { int64_t st = taosGetTimestampUs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, pOperator->exprSupp.pFilterInfo); + doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo); double el = (taosGetTimestampUs() - st) / 1000.0; pTableScanInfo->readRecorder.filterTime += el; @@ -880,13 +876,9 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; pInfo->pResBlock = createResDataBlock(pDescNode); - pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; - - if (pInfo->pFilterNode != NULL) { - code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; } pInfo->scanFlag = MAIN_SCAN; @@ -1315,7 +1307,7 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32 return NULL; } - doFilter(pInfo->pCondition, pResult, NULL, NULL); + doFilter(pResult, pInfo->pTableScanOp->exprSupp.pFilterInfo, NULL); if (pResult->info.rows == 0) { continue; } @@ -1659,7 +1651,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock } if (filter) { - doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL); + doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); } blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); @@ -2111,7 +2103,7 @@ FETCH_NEXT_BLOCK: } } - doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL); + doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { @@ -2476,9 +2468,13 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); } + code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pInfo->pRes = createResDataBlock(pDescNode); pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); - pInfo->pCondition = pScanPhyNode->node.pConditions; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; pInfo->groupId = 0; @@ -2619,13 +2615,13 @@ static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) { return TSDB_CODE_SUCCESS; } -static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { - if (pInfo->pCondition == NULL) { - return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; +static SSDataBlock* doFilterResult(SSDataBlock* pDataBlock, SFilterInfo* pFilterInfo) { + if (pFilterInfo == NULL) { + return pDataBlock->info.rows == 0 ? NULL : pDataBlock; } - doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL); - return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; + doFilter(pDataBlock, pFilterInfo, NULL); + return pDataBlock->info.rows == 0 ? NULL : pDataBlock; } static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName) { @@ -2830,7 +2826,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { metaReaderClear(&smrSuperTable); metaReaderClear(&smrChildTable); if (numOfRows > 0) { - relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock); + relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo); numOfRows = 0; } blockDataDestroy(dataBlock); @@ -2870,7 +2866,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { metaReaderClear(&smrSuperTable); if (numOfRows >= pOperator->resultInfo.capacity) { - relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock); + relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo); numOfRows = 0; if (pInfo->pRes->info.rows > 0) { @@ -2880,7 +2876,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { } if (numOfRows > 0) { - relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock); + relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo); numOfRows = 0; } @@ -2895,13 +2891,13 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; } -static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock) { +void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock, + SFilterInfo* pFilterInfo) { dataBlock->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows; relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, dataBlock->pDataBlock, false); - doFilterResult(pInfo); - + doFilterResult(pInfo->pRes, pFilterInfo); blockDataCleanup(dataBlock); } @@ -3655,7 +3651,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { pInfo->pRes->info.rows = numOfRows; relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); - doFilterResult(pInfo); + doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); blockDataCleanup(p); numOfRows = 0; @@ -3671,7 +3667,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { pInfo->pRes->info.rows = numOfRows; relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); - doFilterResult(pInfo); + doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); blockDataCleanup(p); numOfRows = 0; @@ -3831,7 +3827,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { pInfo->pRes->info.rows = numOfRows; relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); - doFilterResult(pInfo); + doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); blockDataCleanup(p); numOfRows = 0; @@ -3847,7 +3843,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { pInfo->pRes->info.rows = numOfRows; relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); - doFilterResult(pInfo); + doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); blockDataCleanup(p); numOfRows = 0; @@ -3878,8 +3874,7 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { // the retrieve is executed on the mnode, so return tables that belongs to the information schema database. if (pInfo->readHandle.mnd != NULL) { buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity); - - doFilterResult(pInfo); + doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; doSetOperatorCompleted(pOperator); @@ -4014,7 +4009,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator); // todo log the filter info - doFilterResult(pInfo); + doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo); taosMemoryFree(pRsp); if (pInfo->pRes->info.rows > 0) { return pInfo->pRes; @@ -4309,7 +4304,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc pCost->totalRows += pBlock->info.rows; *status = pInfo->dataBlockLoadFlag; - if (pTableScanInfo->pFilterNode != NULL || + if (pOperator->exprSupp.pFilterInfo != NULL || overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) { (*status) = FUNC_DATA_REQUIRED_DATA_LOAD; } @@ -4397,9 +4392,9 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc } } - if (pTableScanInfo->pFilterNode != NULL) { + if (pOperator->exprSupp.pFilterInfo!= NULL) { int64_t st = taosGetTimestampMs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, NULL); + doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo); double el = (taosGetTimestampUs() - st) / 1000.0; pTableScanInfo->readRecorder.filterTime += el; @@ -4845,7 +4840,13 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.seed = taosGetTimestampSec(); pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; - pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; + + + code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pInfo->tableListInfo = pTableListInfo; pInfo->scanFlag = MAIN_SCAN; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 5b05b3b2ed..29264328ce 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include #include "executorimpl.h" #include "tdatablock.h" @@ -42,12 +43,14 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); - initResultSizeInfo(&pOperator->resultInfo, 1024); + code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } pInfo->binfo.pRes = pResBlock; pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); - pInfo->pCondition = pSortNode->node.pConditions; initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo); pOperator->name = "SortOperator"; @@ -215,7 +218,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { return NULL; } - doFilter(pInfo->pCondition, pBlock, &pInfo->matchInfo, NULL); + doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo); if (blockDataGetNumOfRows(pBlock) == 0) { continue; } diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 23847928da..0c106a0d2a 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include #include "os.h" #include "query.h" #include "taosdef.h" @@ -1499,7 +1500,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { } doStreamFillImpl(pOperator); - doFilter(pInfo->pCondition, pInfo->pRes, &pInfo->matchInfo, NULL); + doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo); memcpy(pInfo->pRes->info.parTbName, pInfo->pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN); pOperator->resultInfo.totalRows += pInfo->pRes->info.rows; if (pInfo->pRes->info.rows > 0) { @@ -1677,7 +1678,12 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi int32_t numOfOutputCols = 0; int32_t code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); - pInfo->pCondition = pPhyFillNode->node.pConditions; + + code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 6be9a0402f..1eb5890ece 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -12,6 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include #include "executorimpl.h" #include "function.h" #include "functionMgt.h" @@ -1226,7 +1227,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL); + doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { @@ -1264,7 +1265,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBlock, NULL, NULL); + doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { @@ -1746,7 +1747,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh pInfo->interval = interval; pInfo->execModel = pTaskInfo->execModel; pInfo->twAggSup = as; - pInfo->pCondition = pPhyNode->window.node.pConditions; pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock; if (pPhyNode->window.pExprs != NULL) { @@ -1758,6 +1758,11 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh } } + code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + if (isStream) { ASSERT(num > 0); initStreamFunciton(pSup->pCtx, pSup->numOfExprs); @@ -1882,7 +1887,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_RES_TO_RETURN) { while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL); + doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { @@ -1925,7 +1930,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL); + doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { @@ -2599,17 +2604,22 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi pInfo->stateKey.type = pInfo->stateCol.type; pInfo->stateKey.bytes = pInfo->stateCol.bytes; pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes); - pInfo->pCondition = pStateNode->window.node.pConditions; if (pInfo->stateKey.pData == NULL) { goto _error; } + int32_t code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); initResultSizeInfo(&pOperator->resultInfo, 4096); - int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); + + code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2697,7 +2707,10 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW pInfo->binfo.pRes = pResBlock; pInfo->winSup.prevTs = INT64_MIN; pInfo->reptScan = false; - pInfo->pCondition = pSessionNode->window.node.pConditions; + code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } pOperator->name = "SessionWindowAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION; @@ -4876,7 +4889,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { setInputDataBlock(pSup, pBlock, pIaInfo->inputOrder, scanFlag, true); doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes); - doFilter(pMiaInfo->pCondition, pRes, NULL, NULL); + doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); if (pRes->info.rows >= pOperator->resultInfo.capacity) { break; } @@ -4939,7 +4952,11 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo; SExprSupp* pSup = &pOperator->exprSupp; - miaInfo->pCondition = pNode->window.node.pConditions; + int32_t code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + miaInfo->curTs = INT64_MIN; iaInfo->win = pTaskInfo->window; iaInfo->inputOrder = TSDB_ORDER_ASC; @@ -4953,7 +4970,8 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num); - int32_t code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); + + code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 4738e1bbf9..939be1809a 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -3994,9 +3994,12 @@ int32_t filterSetDataFromColId(SFilterInfo *info, void *param) { } int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pInfo, uint32_t options) { - int32_t code = 0; SFilterInfo *info = NULL; + if (pNode == NULL) { + return TSDB_CODE_SUCCESS; + } + int32_t code = 0; if (pNode == NULL || pInfo == NULL) { fltError("invalid param"); FLT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); @@ -4034,9 +4037,7 @@ int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pInfo, uint32_t options) _return: filterFreeInfo(*pInfo); - *pInfo = NULL; - FLT_RET(code); } From 2c4c9e69de511052b484416eceb488f854f9a827 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 6 Nov 2022 23:15:23 +0800 Subject: [PATCH 2/4] fix(query): init filter in sys table scan. --- source/libs/executor/src/scanoperator.c | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7a2a58f0c3..c3af17bacb 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3684,6 +3684,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; } + static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -4094,20 +4095,28 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan } SScanPhysiNode* pScanNode = &pScanPhyNode->scan; - SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc; int32_t num = 0; int32_t code = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } pInfo->accountId = pScanPhyNode->accountId; pInfo->pUser = taosMemoryStrDup((void*)pUser); pInfo->sysInfo = pScanPhyNode->sysInfo; pInfo->showRewrite = pScanPhyNode->showRewrite; pInfo->pRes = createResDataBlock(pDescNode); + pInfo->pCondition = pScanNode->node.pConditions; + code = filterInitFromNode(pScanNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } initResultSizeInfo(&pOperator->resultInfo, 4096); + blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); tNameAssign(&pInfo->name, &pScanNode->tableName); const char* name = tNameGetTableName(&pInfo->name); @@ -4115,7 +4124,6 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0) { pInfo->readHandle = *(SReadHandle*)readHandle; - blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); } else { tsem_init(&pInfo->ready, 0, 0); pInfo->epSet = pScanPhyNode->mgmtEpSet; @@ -4131,13 +4139,15 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL); - return pOperator; _error: - taosMemoryFreeClear(pInfo); + if (pInfo != NULL) { + destroySysScanOperator(pInfo); + } + taosMemoryFreeClear(pOperator); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + pTaskInfo->code = code; return NULL; } From 341112923b872296b40f0e12b333e2eb97cee14e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 6 Nov 2022 23:42:04 +0800 Subject: [PATCH 3/4] fix(query): init filter in group operator. --- source/libs/executor/inc/executorimpl.h | 5 +---- source/libs/executor/src/groupoperator.c | 8 ++++++-- source/libs/executor/src/joinoperator.c | 2 +- source/libs/executor/src/projectoperator.c | 2 +- source/libs/executor/src/sortoperator.c | 2 +- source/libs/executor/src/tfill.c | 2 +- source/libs/executor/src/timewindowoperator.c | 2 +- 7 files changed, 12 insertions(+), 11 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 50b44d6524..d29c736019 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -621,10 +621,8 @@ typedef struct SIndefOperatorInfo { SAggSupporter aggSup; SArray* pPseudoColInfo; SExprSupp scalarSup; - SNode* pCondition; uint64_t groupId; - - SSDataBlock* pNextGroupRes; + SSDataBlock* pNextGroupRes; } SIndefOperatorInfo; typedef struct SFillOperatorInfo { @@ -649,7 +647,6 @@ typedef struct SGroupbyOperatorInfo { SAggSupporter aggSup; SArray* pGroupCols; // group by columns, SArray SArray* pGroupColVals; // current group column values, SArray - SNode* pCondition; bool isInit; // denote if current val is initialized or not char* keyBuf; // group by keys for hash int32_t groupKeyLen; // total group by column width diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 88f9ad382d..b68c606ec6 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "filter.h" #include "function.h" #include "os.h" #include "tname.h" @@ -414,8 +415,6 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* } pInfo->pGroupCols = extractColumnInfo(pAggNode->pGroupKeys); - pInfo->pCondition = pAggNode->node.pConditions; - int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -434,6 +433,11 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* goto _error; } + code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + initResultRowInfo(&pInfo->binfo.resultRowInfo); pOperator->name = "GroupbyAggOperator"; diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 11c4ee5141..45d76dce74 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include +#include "filter.h" #include "executorimpl.h" #include "function.h" #include "os.h" diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index c5fbee973d..fa469eb77b 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include +#include "filter.h" #include "executorimpl.h" #include "functionMgt.h" diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 29264328ce..a50d1c0ea4 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include +#include "filter.h" #include "executorimpl.h" #include "tdatablock.h" diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 0c106a0d2a..ebc3a962d3 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include +#include "filter.h" #include "os.h" #include "query.h" #include "taosdef.h" diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 1eb5890ece..0bcecf32e4 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -12,7 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include +#include "filter.h" #include "executorimpl.h" #include "function.h" #include "functionMgt.h" From a1a2bae25c3acdb803068d7d5507ede965976296 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 7 Nov 2022 10:29:58 +0800 Subject: [PATCH 4/4] fix(query): allocate memory for null value columns. --- source/common/src/tdatablock.c | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index ebb7b50692..5d0121aa15 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1172,11 +1172,7 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pColumn->nullbitmap = tmp; memset(&pColumn->nullbitmap[oldLen], 0, BitmapLen(numOfRows) - oldLen); - if (pColumn->info.type == TSDB_DATA_TYPE_NULL) { - return TSDB_CODE_SUCCESS; - } - - assert(pColumn->info.bytes); + ASSERT(pColumn->info.bytes); tmp = taosMemoryRealloc(pColumn->pData, numOfRows * pColumn->info.bytes); if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY;