refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2022-11-04 22:13:40 +08:00
parent 8ce2e12423
commit 8c4be7dc21
10 changed files with 142 additions and 130 deletions

View File

@ -305,7 +305,6 @@ typedef struct STableScanInfo {
SFileBlockLoadRecorder readRecorder; SFileBlockLoadRecorder readRecorder;
SScanInfo scanInfo; SScanInfo scanInfo;
int32_t scanTimes; int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
SColMatchInfo matchInfo; SColMatchInfo matchInfo;
SExprSupp pseudoSup; SExprSupp pseudoSup;
@ -342,7 +341,6 @@ typedef struct STableMergeScanInfo {
int64_t numOfRows; int64_t numOfRows;
SScanInfo scanInfo; SScanInfo scanInfo;
int32_t scanTimes; int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer
SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
SResultRowInfo* pResultRowInfo; SResultRowInfo* pResultRowInfo;
int32_t* rowEntryInfoOffset; int32_t* rowEntryInfoOffset;
@ -460,7 +458,6 @@ typedef struct SStreamScanInfo {
SReadHandle readHandle; SReadHandle readHandle;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
SColMatchInfo matchInfo; SColMatchInfo matchInfo;
SNode* pCondition;
SArray* pBlockLists; // multiple SSDatablock. SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock SSDataBlock* pRes; // result SSDataBlock
@ -565,22 +562,18 @@ typedef struct SIntervalAggOperatorInfo {
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation. SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
SNode* pCondition;
} SIntervalAggOperatorInfo; } SIntervalAggOperatorInfo;
typedef struct SMergeAlignedIntervalAggOperatorInfo { typedef struct SMergeAlignedIntervalAggOperatorInfo {
SIntervalAggOperatorInfo* intervalAggOperatorInfo; SIntervalAggOperatorInfo* intervalAggOperatorInfo;
// bool hasGroupId;
uint64_t groupId; // current groupId uint64_t groupId; // current groupId
int64_t curTs; // current ts int64_t curTs; // current ts
SSDataBlock* prefetchedBlock; SSDataBlock* prefetchedBlock;
SNode* pCondition;
SResultRow* pResultRow; SResultRow* pResultRow;
} SMergeAlignedIntervalAggOperatorInfo; } SMergeAlignedIntervalAggOperatorInfo;
typedef struct SStreamIntervalOperatorInfo { typedef struct SStreamIntervalOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo; // basic info SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter SAggSupporter aggSup; // aggregate supporter
SExprSupp scalarSupp; // supporter for perform scalar function SExprSupp scalarSupp; // supporter for perform scalar function
@ -606,26 +599,21 @@ typedef struct SStreamIntervalOperatorInfo {
} SStreamIntervalOperatorInfo; } SStreamIntervalOperatorInfo;
typedef struct SAggOperatorInfo { typedef struct SAggOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo;
SOptrBasicInfo binfo; SAggSupporter aggSup;
SAggSupporter aggSup;
STableQueryInfo* current; STableQueryInfo* current;
uint64_t groupId; uint64_t groupId;
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
SExprSupp scalarExprSup; SExprSupp scalarExprSup;
SNode* pCondition;
} SAggOperatorInfo; } SAggOperatorInfo;
typedef struct SProjectOperatorInfo { typedef struct SProjectOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SAggSupporter aggSup; SAggSupporter aggSup;
SNode* pFilterNode; // filter info, which is push down by optimizer
SArray* pPseudoColInfo; SArray* pPseudoColInfo;
SLimitInfo limitInfo; SLimitInfo limitInfo;
bool mergeDataBlocks; bool mergeDataBlocks;
SSDataBlock* pFinalRes; SSDataBlock* pFinalRes;
SNode* pCondition;
} SProjectOperatorInfo; } SProjectOperatorInfo;
typedef struct SIndefOperatorInfo { typedef struct SIndefOperatorInfo {
@ -647,7 +635,6 @@ typedef struct SFillOperatorInfo {
void** p; void** p;
SSDataBlock* existNewGroupBlock; SSDataBlock* existNewGroupBlock;
STimeWindow win; STimeWindow win;
SNode* pCondition;
SColMatchInfo matchInfo; SColMatchInfo matchInfo;
int32_t primaryTsCol; int32_t primaryTsCol;
int32_t primarySrcSlotId; int32_t primarySrcSlotId;
@ -712,7 +699,6 @@ typedef struct SSessionAggOperatorInfo {
int64_t gap; // session window gap int64_t gap; // session window gap
int32_t tsSlotId; // primary timestamp slot id int32_t tsSlotId; // primary timestamp slot id
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
const SNode* pCondition;
} SSessionAggOperatorInfo; } SSessionAggOperatorInfo;
typedef struct SResultWindowInfo { typedef struct SResultWindowInfo {
@ -786,7 +772,6 @@ typedef struct SStreamFillOperatorInfo {
SSDataBlock* pSrcDelBlock; SSDataBlock* pSrcDelBlock;
int32_t srcDelRowIndex; int32_t srcDelRowIndex;
SSDataBlock* pDelRes; SSDataBlock* pDelRes;
SNode* pCondition;
SColMatchInfo matchInfo; SColMatchInfo matchInfo;
int32_t primaryTsCol; int32_t primaryTsCol;
int32_t primarySrcSlotId; int32_t primarySrcSlotId;
@ -823,7 +808,6 @@ typedef struct SStateWindowOperatorInfo {
SStateKeys stateKey; SStateKeys stateKey;
int32_t tsSlotId; // primary timestamp column slot id int32_t tsSlotId; // primary timestamp column slot id
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
const SNode* pCondition;
} SStateWindowOperatorInfo; } SStateWindowOperatorInfo;
typedef struct SSortOperatorInfo { typedef struct SSortOperatorInfo {
@ -836,7 +820,6 @@ typedef struct SSortOperatorInfo {
int64_t startTs; // sort start time int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
SLimitInfo limitInfo; SLimitInfo limitInfo;
SNode* pCondition;
} SSortOperatorInfo; } SSortOperatorInfo;
typedef struct STagFilterOperatorInfo { typedef struct STagFilterOperatorInfo {
@ -901,7 +884,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
void doSetOperatorCompleted(SOperatorInfo* pOperator); void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo); void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr,
SSDataBlock* pBlock, int32_t rows, const char* idStr); SSDataBlock* pBlock, int32_t rows, const char* idStr);

View File

@ -1039,41 +1039,24 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep,
int32_t status); int32_t status);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo) { void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
if (pFilterNode == NULL || pBlock->info.rows == 0) { if (pFilterInfo == NULL || pBlock->info.rows == 0) {
return; return;
} }
SFilterInfo* filter = pFilterInfo;
int64_t st = taosGetTimestampUs();
// pError("start filter");
// todo move to the initialization function
int32_t code = 0;
bool needFree = false;
if (filter == NULL) {
needFree = true;
code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
}
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
code = filterSetDataFromSlotId(filter, &param1); int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
SColumnInfoData* p = NULL; SColumnInfoData* p = NULL;
int32_t status = 0; int32_t status = 0;
// todo the keep seems never to be True?? // todo the keep seems never to be True??
bool keep = filterExecute(filter, pBlock, &p, NULL, param1.numOfCols, &status); bool keep = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
if (needFree) {
filterFreeInfo(filter);
}
extractQualifiedTupleByFilterResult(pBlock, p, keep, status); extractQualifiedTupleByFilterResult(pBlock, p, keep, status);
if (pColMatchInfo != NULL) { if (pColMatchInfo != NULL) {
for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo->pList); ++i) { size_t size = taosArrayGetSize(pColMatchInfo->pList);
for (int32_t i = 0; i < size; ++i) {
SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i); SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId); SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId);
@ -2395,7 +2378,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
while (1) { while (1) {
doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
doFilter(pAggInfo->pCondition, pInfo->pRes, NULL, NULL); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
if (!hasRemainResults(&pAggInfo->groupResInfo)) { if (!hasRemainResults(&pAggInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
@ -2728,7 +2711,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) {
break; break;
} }
doFilter(pInfo->pCondition, fillResult, &pInfo->matchInfo, NULL); doFilter(fillResult, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo );
if (fillResult->info.rows > 0) { if (fillResult->info.rows > 0) {
break; break;
} }
@ -2947,9 +2930,13 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
goto _error; goto _error;
} }
code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock; pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
pInfo->groupId = UINT64_MAX; pInfo->groupId = UINT64_MAX;
pInfo->pCondition = pAggNode->node.pConditions;
pOperator->name = "TableAggregate"; pOperator->name = "TableAggregate";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
pOperator->blocking = true; pOperator->blocking = true;
@ -3175,7 +3162,11 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
pInfo->pFinalRes = createOneDataBlock(pInfo->pRes, false); pInfo->pFinalRes = createOneDataBlock(pInfo->pRes, false);
blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity);
pInfo->pCondition = pPhyFillNode->node.pConditions; code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pOperator->name = "FillOperator"; pOperator->name = "FillOperator";
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;

View File

@ -312,7 +312,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pInfo->binfo.pRes; SSDataBlock* pRes = pInfo->binfo.pRes;
while (1) { while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes, NULL, NULL); doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
if (!hasRemainResults(&pInfo->groupResInfo)) { if (!hasRemainResults(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <libs/scalar/filter.h>
#include "executorimpl.h" #include "executorimpl.h"
#include "function.h" #include "function.h"
#include "os.h" #include "os.h"
@ -108,6 +109,11 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pInfo->pCondAfterMerge = NULL; pInfo->pCondAfterMerge = NULL;
} }
code = filterInitFromNode(pInfo->pCondAfterMerge, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->inputOrder = TSDB_ORDER_ASC; pInfo->inputOrder = TSDB_ORDER_ASC;
if (pJoinNode->inputTsOrder == ORDER_ASC) { if (pJoinNode->inputTsOrder == ORDER_ASC) {
pInfo->inputOrder = TSDB_ORDER_ASC; pInfo->inputOrder = TSDB_ORDER_ASC;
@ -400,8 +406,8 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
if (numOfNewRows == 0) { if (numOfNewRows == 0) {
break; break;
} }
if (pJoinInfo->pCondAfterMerge != NULL) { if (pOperator->exprSupp.pFilterInfo != NULL) {
doFilter(pJoinInfo->pCondAfterMerge, pRes, NULL, NULL); doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
} }
if (pRes->info.rows >= pOperator->resultInfo.threshold) { if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break; break;

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <libs/scalar/filter.h>
#include "executorimpl.h" #include "executorimpl.h"
#include "functionMgt.h" #include "functionMgt.h"
@ -71,13 +72,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->pFinalRes = createOneDataBlock(pResBlock, false); pInfo->pFinalRes = createOneDataBlock(pResBlock, false);
pInfo->pFilterNode = pProjPhyNode->node.pConditions; pInfo->mergeDataBlocks = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM)? false:pProjPhyNode->mergeDataBlock;
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
pInfo->mergeDataBlocks = false;
} else {
pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
}
int32_t numOfRows = 4096; int32_t numOfRows = 4096;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
@ -97,6 +92,11 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
initBasicInfo(&pInfo->binfo, pResBlock); initBasicInfo(&pInfo->binfo, pResBlock);
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols); setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols);
code = filterInitFromNode((SNode*)pProjPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
pOperator->name = "ProjectOperator"; pOperator->name = "ProjectOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
@ -313,7 +313,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
} }
// do apply filter // do apply filter
doFilter(pProjectInfo->pFilterNode, pFinalRes, NULL, NULL); doFilter(pFinalRes, pOperator->exprSupp.pFilterInfo, NULL);
// when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint. // when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) { if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
@ -323,7 +323,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
} else { } else {
// do apply filter // do apply filter
if (pRes->info.rows > 0) { if (pRes->info.rows > 0) {
doFilter(pProjectInfo->pFilterNode, pRes, NULL, NULL); doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
if (pRes->info.rows == 0) { if (pRes->info.rows == 0) {
continue; continue;
} }
@ -391,9 +391,12 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
} }
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr); setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
code = filterInitFromNode((SNode*)pPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->pCondition = pPhyNode->node.pConditions;
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
pOperator->name = "IndefinitOperator"; pOperator->name = "IndefinitOperator";
@ -516,7 +519,7 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
} }
} }
doFilter(pIndefInfo->pCondition, pInfo->pRes, NULL, NULL); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
size_t rows = pInfo->pRes->info.rows; size_t rows = pInfo->pRes->info.rows;
if (rows > 0 || pOperator->status == OP_EXEC_DONE) { if (rows > 0 || pOperator->status == OP_EXEC_DONE) {
break; break;
@ -618,7 +621,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) {
} }
pRes->info.rows = 1; pRes->info.rows = 1;
doFilter(pProjectInfo->pFilterNode, pRes, NULL, NULL); doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
/*int32_t status = */ doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator); /*int32_t status = */ doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator);

View File

@ -103,8 +103,10 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
SMetaReader* smrChildTable, const char* dbname, const char* tableName, SMetaReader* smrChildTable, const char* dbname, const char* tableName,
int32_t* pNumOfRows, const SSDataBlock* dataBlock); int32_t* pNumOfRows, const SSDataBlock* dataBlock);
static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock); static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock,
bool processBlockWithProbability(const SSampleExecInfo* pInfo) { SFilterInfo* pFilterInfo);
bool processBlockWithProbability(const SSampleExecInfo* pInfo) {
#if 0 #if 0
if (pInfo->sampleRatio == 1) { if (pInfo->sampleRatio == 1) {
return true; return true;
@ -291,19 +293,13 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDataAgg** pColsAgg, int32_t numOfCols, static FORCE_INLINE bool doFilterByBlockSMA(SFilterInfo* pFilterInfo, SColumnDataAgg** pColsAgg, int32_t numOfCols,
int32_t numOfRows) { int32_t numOfRows) {
if (pColsAgg == NULL || pFilterNode == NULL) { if (pColsAgg == NULL || pFilterInfo == NULL) {
return true; return true;
} }
SFilterInfo* filter = NULL; bool keep = filterRangeExecute(pFilterInfo, pColsAgg, numOfCols, numOfRows);
// todo move to the initialization function
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
bool keep = filterRangeExecute(filter, pColsAgg, numOfCols, numOfRows);
filterFreeInfo(filter);
return keep; return keep;
} }
@ -408,7 +404,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
bool loadSMA = false; bool loadSMA = false;
*status = pInfo->dataBlockLoadFlag; *status = pInfo->dataBlockLoadFlag;
if (pTableScanInfo->pFilterNode != NULL || if (pOperator->exprSupp.pFilterInfo != NULL ||
overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) { overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
(*status) = FUNC_DATA_REQUIRED_DATA_LOAD; (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
} }
@ -454,11 +450,11 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD); ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
// try to filter data block according to sma info // try to filter data block according to sma info
if (pTableScanInfo->pFilterNode != NULL && (!loadSMA)) { if (pOperator->exprSupp.pFilterInfo != NULL && (!loadSMA)) {
bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo); bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
if (success) { if (success) {
size_t size = taosArrayGetSize(pBlock->pDataBlock); size_t size = taosArrayGetSize(pBlock->pDataBlock);
bool keep = doFilterByBlockSMA(pTableScanInfo->pFilterNode, pBlock->pBlockAgg, size, pBlockInfo->rows); bool keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
if (!keep) { if (!keep) {
qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
@ -499,9 +495,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
// restore the previous value // restore the previous value
pCost->totalRows -= pBlock->info.rows; pCost->totalRows -= pBlock->info.rows;
if (pTableScanInfo->pFilterNode != NULL) { if (pOperator->exprSupp.pFilterInfo != NULL) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, pOperator->exprSupp.pFilterInfo); doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
double el = (taosGetTimestampUs() - st) / 1000.0; double el = (taosGetTimestampUs() - st) / 1000.0;
pTableScanInfo->readRecorder.filterTime += el; pTableScanInfo->readRecorder.filterTime += el;
@ -880,13 +876,9 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->pResBlock = createResDataBlock(pDescNode); pInfo->pResBlock = createResDataBlock(pDescNode);
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
if (pInfo->pFilterNode != NULL) { goto _error;
code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
} }
pInfo->scanFlag = MAIN_SCAN; pInfo->scanFlag = MAIN_SCAN;
@ -1315,7 +1307,7 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
return NULL; return NULL;
} }
doFilter(pInfo->pCondition, pResult, NULL, NULL); doFilter(pResult, pInfo->pTableScanOp->exprSupp.pFilterInfo, NULL);
if (pResult->info.rows == 0) { if (pResult->info.rows == 0) {
continue; continue;
} }
@ -1659,7 +1651,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
} }
if (filter) { if (filter) {
doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
} }
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
@ -2111,7 +2103,7 @@ FETCH_NEXT_BLOCK:
} }
} }
doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) {
@ -2476,9 +2468,13 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr);
} }
code = filterInitFromNode((SNode*)pScanPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->pRes = createResDataBlock(pDescNode); pInfo->pRes = createResDataBlock(pDescNode);
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
pInfo->pCondition = pScanPhyNode->node.pConditions;
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN};
pInfo->groupId = 0; pInfo->groupId = 0;
@ -2619,13 +2615,13 @@ static int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { static SSDataBlock* doFilterResult(SSDataBlock* pDataBlock, SFilterInfo* pFilterInfo) {
if (pInfo->pCondition == NULL) { if (pFilterInfo == NULL) {
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; return pDataBlock->info.rows == 0 ? NULL : pDataBlock;
} }
doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL); doFilter(pDataBlock, pFilterInfo, NULL);
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; return pDataBlock->info.rows == 0 ? NULL : pDataBlock;
} }
static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName) { static SSDataBlock* buildInfoSchemaTableMetaBlock(char* tableName) {
@ -2830,7 +2826,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
metaReaderClear(&smrSuperTable); metaReaderClear(&smrSuperTable);
metaReaderClear(&smrChildTable); metaReaderClear(&smrChildTable);
if (numOfRows > 0) { if (numOfRows > 0) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock); relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0; numOfRows = 0;
} }
blockDataDestroy(dataBlock); blockDataDestroy(dataBlock);
@ -2870,7 +2866,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
metaReaderClear(&smrSuperTable); metaReaderClear(&smrSuperTable);
if (numOfRows >= pOperator->resultInfo.capacity) { if (numOfRows >= pOperator->resultInfo.capacity) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock); relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0; numOfRows = 0;
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
@ -2880,7 +2876,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
} }
if (numOfRows > 0) { if (numOfRows > 0) {
relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock); relocateAndFilterSysTagsScanResult(pInfo, numOfRows, dataBlock, pOperator->exprSupp.pFilterInfo);
numOfRows = 0; numOfRows = 0;
} }
@ -2895,13 +2891,13 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
} }
static void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock) { void relocateAndFilterSysTagsScanResult(SSysTableScanInfo* pInfo, int32_t numOfRows, SSDataBlock* dataBlock,
SFilterInfo* pFilterInfo) {
dataBlock->info.rows = numOfRows; dataBlock->info.rows = numOfRows;
pInfo->pRes->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, dataBlock->pDataBlock, false); relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, dataBlock->pDataBlock, false);
doFilterResult(pInfo); doFilterResult(pInfo->pRes, pFilterInfo);
blockDataCleanup(dataBlock); blockDataCleanup(dataBlock);
} }
@ -3655,7 +3651,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
doFilterResult(pInfo); doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
blockDataCleanup(p); blockDataCleanup(p);
numOfRows = 0; numOfRows = 0;
@ -3671,7 +3667,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
doFilterResult(pInfo); doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
blockDataCleanup(p); blockDataCleanup(p);
numOfRows = 0; numOfRows = 0;
@ -3831,7 +3827,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
doFilterResult(pInfo); doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
blockDataCleanup(p); blockDataCleanup(p);
numOfRows = 0; numOfRows = 0;
@ -3847,7 +3843,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
pInfo->pRes->info.rows = numOfRows; pInfo->pRes->info.rows = numOfRows;
relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false);
doFilterResult(pInfo); doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
blockDataCleanup(p); blockDataCleanup(p);
numOfRows = 0; numOfRows = 0;
@ -3878,8 +3874,7 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
// the retrieve is executed on the mnode, so return tables that belongs to the information schema database. // the retrieve is executed on the mnode, so return tables that belongs to the information schema database.
if (pInfo->readHandle.mnd != NULL) { if (pInfo->readHandle.mnd != NULL) {
buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity); buildSysDbTableInfo(pInfo, pOperator->resultInfo.capacity);
doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
doFilterResult(pInfo);
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
@ -4014,7 +4009,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator); updateLoadRemoteInfo(&pInfo->loadInfo, pRsp->numOfRows, pRsp->compLen, startTs, pOperator);
// todo log the filter info // todo log the filter info
doFilterResult(pInfo); doFilterResult(pInfo->pRes, pOperator->exprSupp.pFilterInfo);
taosMemoryFree(pRsp); taosMemoryFree(pRsp);
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes; return pInfo->pRes;
@ -4309,7 +4304,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
pCost->totalRows += pBlock->info.rows; pCost->totalRows += pBlock->info.rows;
*status = pInfo->dataBlockLoadFlag; *status = pInfo->dataBlockLoadFlag;
if (pTableScanInfo->pFilterNode != NULL || if (pOperator->exprSupp.pFilterInfo != NULL ||
overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) { overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) {
(*status) = FUNC_DATA_REQUIRED_DATA_LOAD; (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
} }
@ -4397,9 +4392,9 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
} }
} }
if (pTableScanInfo->pFilterNode != NULL) { if (pOperator->exprSupp.pFilterInfo!= NULL) {
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, NULL); doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo);
double el = (taosGetTimestampUs() - st) / 1000.0; double el = (taosGetTimestampUs() - st) / 1000.0;
pTableScanInfo->readRecorder.filterTime += el; pTableScanInfo->readRecorder.filterTime += el;
@ -4845,7 +4840,13 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.sampleRatio = pTableScanNode->ratio;
pInfo->sample.seed = taosGetTimestampSec(); pInfo->sample.seed = taosGetTimestampSec();
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->tableListInfo = pTableListInfo; pInfo->tableListInfo = pTableListInfo;
pInfo->scanFlag = MAIN_SCAN; pInfo->scanFlag = MAIN_SCAN;

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <libs/scalar/filter.h>
#include "executorimpl.h" #include "executorimpl.h"
#include "tdatablock.h" #include "tdatablock.h"
@ -42,12 +43,14 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset);
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 1024);
code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
pInfo->pCondition = pSortNode->node.pConditions;
initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo); initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
pOperator->name = "SortOperator"; pOperator->name = "SortOperator";
@ -215,7 +218,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
doFilter(pInfo->pCondition, pBlock, &pInfo->matchInfo, NULL); doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
if (blockDataGetNumOfRows(pBlock) == 0) { if (blockDataGetNumOfRows(pBlock) == 0) {
continue; continue;
} }

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <libs/scalar/filter.h>
#include "os.h" #include "os.h"
#include "query.h" #include "query.h"
#include "taosdef.h" #include "taosdef.h"
@ -1499,7 +1500,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
} }
doStreamFillImpl(pOperator); doStreamFillImpl(pOperator);
doFilter(pInfo->pCondition, pInfo->pRes, &pInfo->matchInfo, NULL); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
memcpy(pInfo->pRes->info.parTbName, pInfo->pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN); memcpy(pInfo->pRes->info.parTbName, pInfo->pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
pOperator->resultInfo.totalRows += pInfo->pRes->info.rows; pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
@ -1677,7 +1678,12 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
int32_t numOfOutputCols = 0; int32_t numOfOutputCols = 0;
int32_t code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, int32_t code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc,
&numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
pInfo->pCondition = pPhyFillNode->node.pConditions;
code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols); code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;

View File

@ -12,6 +12,7 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <libs/scalar/filter.h>
#include "executorimpl.h" #include "executorimpl.h"
#include "function.h" #include "function.h"
#include "functionMgt.h" #include "functionMgt.h"
@ -1226,7 +1227,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
while (1) { while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL); doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo); bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) { if (!hasRemain) {
@ -1264,7 +1265,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity);
while (1) { while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBlock, NULL, NULL); doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo); bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) { if (!hasRemain) {
@ -1746,7 +1747,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
pInfo->interval = interval; pInfo->interval = interval;
pInfo->execModel = pTaskInfo->execModel; pInfo->execModel = pTaskInfo->execModel;
pInfo->twAggSup = as; pInfo->twAggSup = as;
pInfo->pCondition = pPhyNode->window.node.pConditions;
pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock; pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
if (pPhyNode->window.pExprs != NULL) { if (pPhyNode->window.pExprs != NULL) {
@ -1758,6 +1758,11 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
} }
} }
code = filterInitFromNode((SNode*)pPhyNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (isStream) { if (isStream) {
ASSERT(num > 0); ASSERT(num > 0);
initStreamFunciton(pSup->pCtx, pSup->numOfExprs); initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
@ -1882,7 +1887,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
while (1) { while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL); doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo); bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) { if (!hasRemain) {
@ -1925,7 +1930,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
while (1) { while (1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL); doFilter(pBInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
bool hasRemain = hasRemainResults(&pInfo->groupResInfo); bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) { if (!hasRemain) {
@ -2599,17 +2604,22 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
pInfo->stateKey.type = pInfo->stateCol.type; pInfo->stateKey.type = pInfo->stateCol.type;
pInfo->stateKey.bytes = pInfo->stateCol.bytes; pInfo->stateKey.bytes = pInfo->stateCol.bytes;
pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes); pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes);
pInfo->pCondition = pStateNode->window.node.pConditions;
if (pInfo->stateKey.pData == NULL) { if (pInfo->stateKey.pData == NULL) {
goto _error; goto _error;
} }
int32_t code = filterInitFromNode((SNode*)pStateNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
@ -2697,7 +2707,10 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;
pInfo->winSup.prevTs = INT64_MIN; pInfo->winSup.prevTs = INT64_MIN;
pInfo->reptScan = false; pInfo->reptScan = false;
pInfo->pCondition = pSessionNode->window.node.pConditions; code = filterInitFromNode((SNode*)pSessionNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pOperator->name = "SessionWindowAggOperator"; pOperator->name = "SessionWindowAggOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION;
@ -4876,7 +4889,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
setInputDataBlock(pSup, pBlock, pIaInfo->inputOrder, scanFlag, true); setInputDataBlock(pSup, pBlock, pIaInfo->inputOrder, scanFlag, true);
doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes); doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
doFilter(pMiaInfo->pCondition, pRes, NULL, NULL); doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
if (pRes->info.rows >= pOperator->resultInfo.capacity) { if (pRes->info.rows >= pOperator->resultInfo.capacity) {
break; break;
} }
@ -4939,7 +4952,11 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo; SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
miaInfo->pCondition = pNode->window.node.pConditions; int32_t code = filterInitFromNode((SNode*)pNode->window.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
miaInfo->curTs = INT64_MIN; miaInfo->curTs = INT64_MIN;
iaInfo->win = pTaskInfo->window; iaInfo->win = pTaskInfo->window;
iaInfo->inputOrder = TSDB_ORDER_ASC; iaInfo->inputOrder = TSDB_ORDER_ASC;
@ -4953,7 +4970,8 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
int32_t num = 0; int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num); SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num);
int32_t code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }

View File

@ -3994,9 +3994,12 @@ int32_t filterSetDataFromColId(SFilterInfo *info, void *param) {
} }
int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pInfo, uint32_t options) { int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pInfo, uint32_t options) {
int32_t code = 0;
SFilterInfo *info = NULL; SFilterInfo *info = NULL;
if (pNode == NULL) {
return TSDB_CODE_SUCCESS;
}
int32_t code = 0;
if (pNode == NULL || pInfo == NULL) { if (pNode == NULL || pInfo == NULL) {
fltError("invalid param"); fltError("invalid param");
FLT_ERR_RET(TSDB_CODE_QRY_APP_ERROR); FLT_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
@ -4034,9 +4037,7 @@ int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pInfo, uint32_t options)
_return: _return:
filterFreeInfo(*pInfo); filterFreeInfo(*pInfo);
*pInfo = NULL; *pInfo = NULL;
FLT_RET(code); FLT_RET(code);
} }