Merge pull request #17919 from taosdata/fix/liao_cov

refactor: do some internal refactor.
This commit is contained in:
Shengliang Guan 2022-11-07 11:38:38 +08:00 committed by GitHub
commit 95e81e56d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 160 additions and 140 deletions

View File

@ -1172,11 +1172,7 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo*
pColumn->nullbitmap = tmp;
memset(&pColumn->nullbitmap[oldLen], 0, BitmapLen(numOfRows) - oldLen);
if (pColumn->info.type == TSDB_DATA_TYPE_NULL) {
return TSDB_CODE_SUCCESS;
}
assert(pColumn->info.bytes);
ASSERT(pColumn->info.bytes);
tmp = taosMemoryRealloc(pColumn->pData, numOfRows * pColumn->info.bytes);
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;

View File

@ -305,7 +305,6 @@ typedef struct STableScanInfo {
SFileBlockLoadRecorder readRecorder;
SScanInfo scanInfo;
int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer
SSDataBlock* pResBlock;
SColMatchInfo matchInfo;
SExprSupp pseudoSup;
@ -342,7 +341,6 @@ typedef struct STableMergeScanInfo {
int64_t numOfRows;
SScanInfo scanInfo;
int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
SResultRowInfo* pResultRowInfo;
int32_t* rowEntryInfoOffset;
@ -460,7 +458,6 @@ typedef struct SStreamScanInfo {
SReadHandle readHandle;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
SColMatchInfo matchInfo;
SNode* pCondition;
SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock
@ -564,7 +561,6 @@ typedef struct SIntervalAggOperatorInfo {
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
STimeWindowAggSupp twAggSup;
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
SNode* pCondition;
} SIntervalAggOperatorInfo;
typedef struct SMergeAlignedIntervalAggOperatorInfo {
@ -573,12 +569,10 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo {
uint64_t groupId; // current groupId
int64_t curTs; // current ts
SSDataBlock* prefetchedBlock;
SNode* pCondition;
SResultRow* pResultRow;
} SMergeAlignedIntervalAggOperatorInfo;
typedef struct SStreamIntervalOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter
SExprSupp scalarSupp; // supporter for perform scalar function
@ -604,26 +598,21 @@ typedef struct SStreamIntervalOperatorInfo {
} SStreamIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SOptrBasicInfo binfo;
SAggSupporter aggSup;
STableQueryInfo* current;
uint64_t groupId;
SGroupResInfo groupResInfo;
SExprSupp scalarExprSup;
SNode* pCondition;
} SAggOperatorInfo;
typedef struct SProjectOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SNode* pFilterNode; // filter info, which is push down by optimizer
SArray* pPseudoColInfo;
SLimitInfo limitInfo;
bool mergeDataBlocks;
SSDataBlock* pFinalRes;
SNode* pCondition;
} SProjectOperatorInfo;
typedef struct SIndefOperatorInfo {
@ -631,10 +620,8 @@ typedef struct SIndefOperatorInfo {
SAggSupporter aggSup;
SArray* pPseudoColInfo;
SExprSupp scalarSup;
SNode* pCondition;
uint64_t groupId;
SSDataBlock* pNextGroupRes;
SSDataBlock* pNextGroupRes;
} SIndefOperatorInfo;
typedef struct SFillOperatorInfo {
@ -645,7 +632,6 @@ typedef struct SFillOperatorInfo {
void** p;
SSDataBlock* existNewGroupBlock;
STimeWindow win;
SNode* pCondition;
SColMatchInfo matchInfo;
int32_t primaryTsCol;
int32_t primarySrcSlotId;
@ -660,7 +646,6 @@ typedef struct SGroupbyOperatorInfo {
SAggSupporter aggSup;
SArray* pGroupCols; // group by columns, SArray<SColumn>
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
SNode* pCondition;
bool isInit; // denote if current val is initialized or not
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
@ -710,7 +695,6 @@ typedef struct SSessionAggOperatorInfo {
int64_t gap; // session window gap
int32_t tsSlotId; // primary timestamp slot id
STimeWindowAggSupp twAggSup;
const SNode* pCondition;
} SSessionAggOperatorInfo;
typedef struct SResultWindowInfo {
@ -784,7 +768,6 @@ typedef struct SStreamFillOperatorInfo {
SSDataBlock* pSrcDelBlock;
int32_t srcDelRowIndex;
SSDataBlock* pDelRes;
SNode* pCondition;
SColMatchInfo matchInfo;
int32_t primaryTsCol;
int32_t primarySrcSlotId;
@ -821,7 +804,6 @@ typedef struct SStateWindowOperatorInfo {
SStateKeys stateKey;
int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup;
const SNode* pCondition;
} SStateWindowOperatorInfo;
typedef struct SSortOperatorInfo {
@ -834,7 +816,6 @@ typedef struct SSortOperatorInfo {
int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
SLimitInfo limitInfo;
SNode* pCondition;
} SSortOperatorInfo;
typedef struct SJoinOperatorInfo {
@ -895,7 +876,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo);
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
SSDataBlock* pBlock, int32_t rows, const char* idStr);

View File

@ -1039,39 +1039,24 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
int32_t status);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo) {
if (pFilterNode == NULL || pBlock->info.rows == 0) {
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
if (pFilterInfo == NULL || pBlock->info.rows == 0) {
return;
}
SFilterInfo* filter = pFilterInfo;
int64_t st = taosGetTimestampUs();
// todo move to the initialization function
int32_t code = 0;
bool needFree = false;
if (filter == NULL) {
needFree = true;
code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
}
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
code = filterSetDataFromSlotId(filter, &param1);
int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
SColumnInfoData* p = NULL;
int32_t status = 0;
// todo the keep seems never to be True??
bool keep = filterExecute(filter, pBlock, &p, NULL, param1.numOfCols, &status);
if (needFree) {
filterFreeInfo(filter);
}
bool keep = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
if (pColMatchInfo != NULL) {
for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo->pList); ++i) {
size_t size = taosArrayGetSize(pColMatchInfo->pList);
for (int32_t i = 0; i < size; ++i) {
SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId);
@ -2393,7 +2378,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
while (1) {
doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
doFilter(pAggInfo->pCondition, pInfo->pRes, NULL, NULL);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
if (!hasRemainResults(&pAggInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
@ -2726,7 +2711,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
break;
}
doFilter(pInfo->pCondition, fillResult, &pInfo->matchInfo, NULL);
doFilter(fillResult, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo );
if (fillResult->info.rows > 0) {
break;
}
@ -2945,9 +2930,13 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
goto _error;
}
code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
pInfo->groupId = UINT64_MAX;
pInfo->pCondition = pAggNode->node.pConditions;
pOperator->name = "TableAggregate";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
pOperator->blocking = true;
@ -3173,7 +3162,11 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
pInfo->pFinalRes = createOneDataBlock(pInfo->pRes, false);
blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity);
pInfo->pCondition = pPhyFillNode->node.pConditions;
code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pOperator->name = "FillOperator";
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "filter.h"
#include "function.h"
#include "os.h"
#include "tname.h"
@ -312,7 +313,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->binfo.pRes;
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes, NULL, NULL);
doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
if (!hasRemainResults(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
@ -411,8 +412,6 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
}
pInfo->pGroupCols = extractColumnInfo(pAggNode->pGroupKeys);
pInfo->pCondition = pAggNode->node.pConditions;
int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
@ -433,6 +432,11 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
goto _error;
}
code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo);
pOperator->name = "GroupbyAggOperator";

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "filter.h"
#include "executorimpl.h"
#include "function.h"
#include "os.h"
@ -108,6 +109,11 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pInfo->pCondAfterMerge = NULL;
}
code = filterInitFromNode(pInfo->pCondAfterMerge, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->inputOrder = TSDB_ORDER_ASC;
if (pJoinNode->inputTsOrder == ORDER_ASC) {
pInfo->inputOrder = TSDB_ORDER_ASC;
@ -400,8 +406,8 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
if (numOfNewRows == 0) {
break;
}
if (pJoinInfo->pCondAfterMerge != NULL) {
doFilter(pJoinInfo->pCondAfterMerge, pRes, NULL, NULL);
if (pOperator->exprSupp.pFilterInfo != NULL) {
doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
}
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break;

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "filter.h"
#include "executorimpl.h"
#include "functionMgt.h"
@ -71,13 +72,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pInfo->binfo.pRes = pResBlock;
pInfo->pFinalRes = createOneDataBlock(pResBlock, false);
pInfo->pFilterNode = pProjPhyNode->node.pConditions;
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
pInfo->mergeDataBlocks = false;
} else {
pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
}
pInfo->mergeDataBlocks = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM)? false:pProjPhyNode->mergeDataBlock;
int32_t numOfRows = 4096;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
@ -97,6 +92,11 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
initBasicInfo(&pInfo->binfo, pResBlock);
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
code = filterInitFromNode((SNode*)pProjPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
pOperator->name = "ProjectOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
@ -313,7 +313,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
// do apply filter
doFilter(pProjectInfo->pFilterNode, pFinalRes, NULL, NULL);
doFilter(pFinalRes, pOperator->exprSupp.pFilterInfo, NULL);
// when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
@ -323,7 +323,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
} else {
// do apply filter
if (pRes->info.rows > 0) {
doFilter(pProjectInfo->pFilterNode, pRes, NULL, NULL);
doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
if (pRes->info.rows == 0) {
continue;
}
@ -392,7 +392,12 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
}
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
pInfo->pCondition = pPhyNode->node.pConditions;
code = filterInitFromNode((SNode*)pPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->binfo.pRes = pResBlock;
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
pOperator->name = "IndefinitOperator";
@ -515,7 +520,7 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
}
}
doFilter(pIndefInfo->pCondition, pInfo->pRes, NULL, NULL);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
size_t rows = pInfo->pRes->info.rows;
if (rows > 0 || pOperator->status == OP_EXEC_DONE) {
break;
@ -617,7 +622,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) {
}
pRes->info.rows = 1;
doFilter(pProjectInfo->pFilterNode, pRes, NULL, NULL);
doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
/*int32_t status = */ doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator);

View File

@ -115,8 +115,10 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
SMetaReader* smrChildTable, const char* dbname, const char* tableName,
int32_t* pNumOfRows, const SSDataBlock* dataBlock);
static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock);
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock,
SFilterInfo* pFilterInfo);
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
#if 0
if (pInfo->sampleRatio == 1) {
return true;
@ -280,19 +282,13 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo*
return TSDB_CODE_SUCCESS;
}
static bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDataAgg** pColsAgg, int32_t numOfCols,
static bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
int32_t numOfRows) {
if (pColsAgg == NULL || pFilterNode == NULL) {
if (pColsAgg == NULL || pFilterInfo == NULL) {
return true;
}
SFilterInfo* filter = NULL;
// todo move to the initialization function
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
bool keep = filterRangeExecute(filter, pColsAgg, numOfCols, numOfRows);
filterFreeInfo(filter);
bool keep = filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows);
return keep;
}
@ -386,7 +382,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
bool loadSMA = false;
*status = pInfo->dataBlockLoadFlag;
if (pTableScanInfo->pFilterNode != NULL ||
if (pOperator->exprSupp.pFilterInfo != NULL ||
overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
(*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
}
@ -424,11 +420,11 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
// try to filter data block according to sma info
if (pTableScanInfo->pFilterNode != NULL && (!loadSMA)) {
if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
if (success) {
size_t size = taosArrayGetSize(pBlock->pDataBlock);
bool keep = doFilterByBlockSMA(pTableScanInfo->pFilterNode, pBlock->pBlockAgg, size, pBlockInfo->rows);
bool keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
if (!keep) {
qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
@ -468,9 +464,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
// restore the previous value
pCost->totalRows -= pBlock->info.rows;
if (pTableScanInfo->pFilterNode != NULL) {
if (pOperator->exprSupp.pFilterInfo != NULL) {
int64_t st = taosGetTimestampUs();
doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, pOperator->exprSupp.pFilterInfo);
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
double el = (taosGetTimestampUs() - st) / 1000.0;
pTableScanInfo->readRecorder.filterTime += el;
@ -861,12 +857,9 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->pResBlock = createResDataBlock(pDescNode);
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
pInfo->pFilterNode = pScanNode->node.pConditions;
if (pInfo->pFilterNode != NULL) {
code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->scanFlag = MAIN_SCAN;
@ -1295,7 +1288,7 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
return NULL;
}
doFilter(pInfo->pCondition, pResult, NULL, NULL);
doFilter(pResult, pInfo->pTableScanOp->exprSupp.pFilterInfo, NULL);
if (pResult->info.rows == 0) {
continue;
}
@ -1639,7 +1632,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
}
if (filter) {
doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
}
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
@ -2091,7 +2084,7 @@ FETCH_NEXT_BLOCK:
}
}
doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
@ -2456,9 +2449,13 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
}
code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->pRes = createResDataBlock(pDescNode);
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
pInfo->pCondition = pScanPhyNode->node.pConditions;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
pInfo->groupId = 0;
@ -2599,13 +2596,13 @@ static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) {
return TSDB_CODE_SUCCESS;
}
static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
if (pInfo->pCondition == NULL) {
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
static SSDataBlock* doFilterResult(SSDataBlock* pDataBlock, SFilterInfo* pFilterInfo) {
if (pFilterInfo == NULL) {
return pDataBlock->info.rows == 0 ? NULL : pDataBlock;
}
doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL);
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
doFilter(pDataBlock, pFilterInfo, NULL);
return pDataBlock->info.rows == 0 ? NULL : pDataBlock;
}
static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName) {
@ -2810,7 +2807,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
metaReaderClear(&smrSuperTable);
metaReaderClear(&smrChildTable);
if (numOfRows > 0) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock);
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
}
blockDataDestroy(dataBlock);
@ -2850,7 +2847,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
metaReaderClear(&smrSuperTable);
if (numOfRows >= pOperator->resultInfo.capacity) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock);
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
if (pInfo->pRes->info.rows > 0) {
@ -2860,7 +2857,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
}
if (numOfRows > 0) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock);
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0;
}
@ -2875,13 +2872,13 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock) {
void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock,
SFilterInfo* pFilterInfo) {
dataBlock->info.rows = numOfRows;
pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, dataBlock->pDataBlock, false);
doFilterResult(pInfo);
doFilterResult(pInfo->pRes, pFilterInfo);
blockDataCleanup(dataBlock);
}
@ -3635,7 +3632,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
doFilterResult(pInfo);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
blockDataCleanup(p);
numOfRows = 0;
@ -3651,7 +3648,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
doFilterResult(pInfo);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
blockDataCleanup(p);
numOfRows = 0;
@ -3668,6 +3665,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -3811,7 +3809,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
doFilterResult(pInfo);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
blockDataCleanup(p);
numOfRows = 0;
@ -3827,7 +3825,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
doFilterResult(pInfo);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
blockDataCleanup(p);
numOfRows = 0;
@ -3858,8 +3856,7 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
// the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
if (pInfo->readHandle.mnd != NULL) {
buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);
doFilterResult(pInfo);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
doSetOperatorCompleted(pOperator);
@ -3994,7 +3991,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator);
// todo log the filter info
doFilterResult(pInfo);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
taosMemoryFree(pRsp);
if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes;
@ -4092,9 +4089,15 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
pInfo->sysInfo = pScanPhyNode->sysInfo;
pInfo->showRewrite = pScanPhyNode->showRewrite;
pInfo->pRes = createResDataBlock(pDescNode);
pInfo->pCondition = pScanNode->node.pConditions;
code = filterInitFromNode(pScanNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
tNameAssign(&pInfo->name, &pScanNode->tableName);
const char* name = tNameGetTableName(&pInfo->name);
@ -4102,7 +4105,6 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 ||
strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0) {
pInfo->readHandle = *(SReadHandle*)readHandle;
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
} else {
tsem_init(&pInfo->ready, 0, 0);
pInfo->epSet = pScanPhyNode->mgmtEpSet;
@ -4118,7 +4120,6 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL);
return pOperator;
_error:
@ -4126,7 +4127,7 @@ _error:
destroySysScanOperator(pInfo);
}
taosMemoryFreeClear(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
pTaskInfo->code = code;
return NULL;
}
@ -4280,7 +4281,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
pCost->totalRows += pBlock->info.rows;
*status = pInfo->dataBlockLoadFlag;
if (pTableScanInfo->pFilterNode != NULL ||
if (pOperator->exprSupp.pFilterInfo != NULL ||
overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) {
(*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
}
@ -4366,9 +4367,9 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
T_LONG_JMP(pTaskInfo->env, code);
}
if (pTableScanInfo->pFilterNode != NULL) {
if (pOperator->exprSupp.pFilterInfo!= NULL) {
int64_t st = taosGetTimestampMs();
doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, NULL);
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
double el = (taosGetTimestampUs() - st) / 1000.0;
pTableScanInfo->readRecorder.filterTime += el;
@ -4741,7 +4742,13 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->sample.sampleRatio = pTableScanNode->ratio;
pInfo->sample.seed = taosGetTimestampSec();
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->tableListInfo = pTableListInfo;
pInfo->scanFlag = MAIN_SCAN;

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "filter.h"
#include "executorimpl.h"
#include "tdatablock.h"
@ -42,12 +43,14 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
initResultSizeInfo(&pOperator->resultInfo, 1024);
code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->binfo.pRes = pResBlock;
pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
pInfo->pCondition = pSortNode->node.pConditions;
initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
pOperator->name = "SortOperator";
@ -215,7 +218,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
return NULL;
}
doFilter(pInfo->pCondition, pBlock, &pInfo->matchInfo, NULL);
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
if (blockDataGetNumOfRows(pBlock) == 0) {
continue;
}

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "filter.h"
#include "os.h"
#include "query.h"
#include "taosdef.h"
@ -1499,7 +1500,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
}
doStreamFillImpl(pOperator);
doFilter(pInfo->pCondition, pInfo->pRes, &pInfo->matchInfo, NULL);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
memcpy(pInfo->pRes->info.parTbName, pInfo->pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
if (pInfo->pRes->info.rows > 0) {
@ -1677,7 +1678,12 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
int32_t numOfOutputCols = 0;
int32_t code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc,
&numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
pInfo->pCondition = pPhyFillNode->node.pConditions;
code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols);
if (code != TSDB_CODE_SUCCESS) {
goto _error;

View File

@ -12,6 +12,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "filter.h"
#include "executorimpl.h"
#include "function.h"
#include "functionMgt.h"
@ -1227,7 +1228,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL);
doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
@ -1265,7 +1266,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity);
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBlock, NULL, NULL);
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
@ -1747,7 +1748,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
pInfo->interval = interval;
pInfo->execModel = pTaskInfo->execModel;
pInfo->twAggSup = as;
pInfo->pCondition = pPhyNode->window.node.pConditions;
pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
if (pPhyNode->window.pExprs != NULL) {
@ -1759,6 +1759,11 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
}
}
code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (isStream) {
ASSERT(num > 0);
initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
@ -1883,7 +1888,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_RES_TO_RETURN) {
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL);
doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
@ -1926,7 +1931,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL);
doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) {
@ -2600,17 +2605,22 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
pInfo->stateKey.type = pInfo->stateCol.type;
pInfo->stateKey.bytes = pInfo->stateCol.bytes;
pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
pInfo->pCondition = pStateNode->window.node.pConditions;
if (pInfo->stateKey.pData == NULL) {
goto _error;
}
int32_t code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@ -2698,7 +2708,10 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
pInfo->binfo.pRes = pResBlock;
pInfo->winSup.prevTs = INT64_MIN;
pInfo->reptScan = false;
pInfo->pCondition = pSessionNode->window.node.pConditions;
code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
@ -4877,7 +4890,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
setInputDataBlock(pSup, pBlock, pIaInfo->inputOrder, scanFlag, true);
doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
doFilter(pMiaInfo->pCondition, pRes, NULL, NULL);
doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
if (pRes->info.rows >= pOperator->resultInfo.capacity) {
break;
}
@ -4940,7 +4953,11 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
SExprSupp* pSup = &pOperator->exprSupp;
miaInfo->pCondition = pNode->window.node.pConditions;
int32_t code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
miaInfo->curTs = INT64_MIN;
iaInfo->win = pTaskInfo->window;
iaInfo->inputOrder = TSDB_ORDER_ASC;
@ -4954,7 +4971,8 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num);
int32_t code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}

View File

@ -3994,9 +3994,12 @@ int32_t filterSetDataFromColId(SFilterInfo *info, void *param) {
}
int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pInfo, uint32_t options) {
int32_t code = 0;
SFilterInfo *info = NULL;
if (pNode == NULL) {
return TSDB_CODE_SUCCESS;
}
int32_t code = 0;
if (pNode == NULL || pInfo == NULL) {
fltError("invalid param");
FLT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
@ -4034,9 +4037,7 @@ int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pInfo, uint32_t options)
_return:
filterFreeInfo(*pInfo);
*pInfo = NULL;
FLT_RET(code);
}