diff --git a/include/libs/scalar/filter.h b/include/libs/scalar/filter.h index 44064f0a9f..1f1d9dea93 100644 --- a/include/libs/scalar/filter.h +++ b/include/libs/scalar/filter.h @@ -44,7 +44,7 @@ extern int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict extern int32_t filterConverNcharColumns(SFilterInfo *pFilterInfo, int32_t rows, bool *gotNchar); extern int32_t filterFreeNcharColumns(SFilterInfo *pFilterInfo); extern void filterFreeInfo(SFilterInfo *info); -extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows); +extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pColsAgg, int32_t numOfCols, int32_t numOfRows); /* condition split interface */ int32_t filterPartitionCond(SNode **pCondition, SNode **pPrimaryKeyCond, SNode **pTagIndexCond, SNode **pTagCond, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d17fe7633f..1908e2a92d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1011,16 +1011,6 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) { // } //} -// static FORCE_INLINE bool doFilterByBlockStatistics(STaskRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, -// SqlFunctionCtx *pCtx, int32_t numOfRows) { -// STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; -// -// if (pDataStatis == NULL || pQueryAttr->pFilters == NULL) { -// return true; -// } -// -// return filterRangeExecute(pQueryAttr->pFilters, pDataStatis, pQueryAttr->numOfCols, numOfRows); -// } #if 0 static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) { STimeWindow w = {0}; @@ -1215,7 +1205,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc } // current block has been discard due to filter applied -// if (!doFilterByBlockStatistics(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) { +// if (!doFilterByBlockSMA(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) { // pCost->skipBlocks += 1; // qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey, // pBlockInfo->window.ekey, pBlockInfo->rows); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 03c08a6bf5..a2e0b68623 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,10 +13,11 @@ * along with this program. If not, see . */ +#include "os.h" #include "executorimpl.h" +#include "filter.h" #include "function.h" #include "functionMgt.h" -#include "os.h" #include "querynodes.h" #include "systable.h" #include "tname.h" @@ -227,6 +228,57 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* return TSDB_CODE_SUCCESS; } +static FORCE_INLINE bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDataAgg** pColsAgg, int32_t numOfCols, + int32_t numOfRows) { + if (pColsAgg == NULL || pFilterNode == NULL) { + return true; + } + + SFilterInfo* filter = NULL; + + // todo move to the initialization function + int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); + bool keep = filterRangeExecute(filter, pColsAgg, numOfCols, numOfRows); + + filterFreeInfo(filter); + return keep; +} + +static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { + bool allColumnsHaveAgg = true; + SColumnDataAgg** pColAgg = NULL; + + int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } + + if (!allColumnsHaveAgg) { + return false; + } + + // if (allColumnsHaveAgg == true) { + int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + + // todo create this buffer during creating operator + if (pBlock->pBlockAgg == NULL) { + pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES); + if (pBlock->pBlockAgg == NULL) { + longjmp(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); + } + } + + for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->pColMatchInfo); ++i) { + SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i); + if (!pColMatchInfo->output) { + continue; + } + pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i]; + } + + return true; +} + static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -236,6 +288,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca pCost->totalBlocks += 1; pCost->totalRows += pBlock->info.rows; + bool loadSMA = false; *status = pInfo->dataBlockLoadFlag; if (pTableScanInfo->pFilterNode != NULL || @@ -259,41 +312,34 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca return TSDB_CODE_SUCCESS; } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { pCost->loadBlockStatis += 1; - - bool allColumnsHaveAgg = true; - SColumnDataAgg** pColAgg = NULL; - - int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg); - if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); - } - - if (allColumnsHaveAgg == true) { - int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - - // todo create this buffer during creating operator - if (pBlock->pBlockAgg == NULL) { - pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES); - } - - for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->pColMatchInfo); ++i) { - SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i); - if (!pColMatchInfo->output) { - continue; - } - pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i]; - } - - return TSDB_CODE_SUCCESS; - } else { // failed to load the block sma data, data block statistics does not exist, load data block instead + loadSMA = true; // mark the operator of load sma; + bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo); + if (!success) { // failed to load the block sma data, data block statistics does not exist, load data block instead *status = FUNC_DATA_REQUIRED_DATA_LOAD; } } ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD); - // todo filter data block according to the block sma data firstly + // try to filter data block according to sma info + if (pTableScanInfo->pFilterNode != NULL) { + if (!loadSMA) { + doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo); + } + bool keep = doFilterByBlockSMA(pTableScanInfo->pFilterNode, pBlock->pBlockAgg, taosArrayGetSize(pBlock->pDataBlock), + pBlockInfo->rows); + if (!keep) { + qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + pCost->filterOutBlocks += 1; + (*status) = FUNC_DATA_REQUIRED_FILTEROUT; + + return TSDB_CODE_SUCCESS; + } + } + + // try to filter datablock according to current results doDynamicPruneDataBlock(pOperator, pBlockInfo, status); if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), @@ -303,16 +349,6 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca return TSDB_CODE_SUCCESS; } -#if 0 - if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { - pCost->filterOutBlocks += 1; - qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, - pBlockInfo->window.ekey, pBlockInfo->rows); - (*status) = FUNC_DATA_REQUIRED_FILTEROUT; - return TSDB_CODE_SUCCESS; - } -#endif - pCost->totalCheckedRows += pBlock->info.rows; pCost->loadBlocks += 1; @@ -2722,7 +2758,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc // todo filter data block according to the block sma data firstly #if 0 - if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { + if (!doFilterByBlockSMA(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { pCost->filterOutBlocks += 1; qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 04328fda9c..a6a47da2e5 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -3245,7 +3245,7 @@ _return: return code; } -bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows) { +bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t numOfCols, int32_t numOfRows) { if (FILTER_EMPTY_RES(info)) { return false; } @@ -3261,7 +3261,7 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t int32_t index = -1; SFilterRangeCtx *ctx = info->colRange[k]; for(int32_t i = 0; i < numOfCols; ++i) { - if (pDataStatis[i].colId == ctx->colId) { + if (pDataStatis[i]->colId == ctx->colId) { index = i; break; } @@ -3277,13 +3277,13 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t break; } - if (pDataStatis[index].numOfNull <= 0) { + if (pDataStatis[index]->numOfNull <= 0) { if (ctx->isnull && !ctx->notnull && !ctx->isrange) { ret = false; break; } - } else if (pDataStatis[index].numOfNull > 0) { - if (pDataStatis[index].numOfNull == numOfRows) { + } else if (pDataStatis[index]->numOfNull > 0) { + if (pDataStatis[index]->numOfNull == numOfRows) { if ((ctx->notnull || ctx->isrange) && (!ctx->isnull)) { ret = false; break; @@ -3297,7 +3297,7 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t } } - SColumnDataAgg* pDataBlockst = &pDataStatis[index]; + SColumnDataAgg* pDataBlockst = pDataStatis[index]; SFilterRangeNode *r = ctx->rs; float minv = 0;