enh(query): support filter by SMA.

This commit is contained in:
Haojun Liao 2022-08-05 17:40:40 +08:00
parent 2b59b4f2dd
commit 1063064a59
4 changed files with 84 additions and 58 deletions

View File

@ -44,7 +44,7 @@ extern int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict
extern int32_t filterConverNcharColumns(SFilterInfo *pFilterInfo, int32_t rows, bool *gotNchar); extern int32_t filterConverNcharColumns(SFilterInfo *pFilterInfo, int32_t rows, bool *gotNchar);
extern int32_t filterFreeNcharColumns(SFilterInfo *pFilterInfo); extern int32_t filterFreeNcharColumns(SFilterInfo *pFilterInfo);
extern void filterFreeInfo(SFilterInfo *info); extern void filterFreeInfo(SFilterInfo *info);
extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows); extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pColsAgg, int32_t numOfCols, int32_t numOfRows);
/* condition split interface */ /* condition split interface */
int32_t filterPartitionCond(SNode **pCondition, SNode **pPrimaryKeyCond, SNode **pTagIndexCond, SNode **pTagCond, int32_t filterPartitionCond(SNode **pCondition, SNode **pPrimaryKeyCond, SNode **pTagIndexCond, SNode **pTagCond,

View File

@ -1011,16 +1011,6 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
// } // }
//} //}
// static FORCE_INLINE bool doFilterByBlockStatistics(STaskRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis,
// SqlFunctionCtx *pCtx, int32_t numOfRows) {
// STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
//
// if (pDataStatis == NULL || pQueryAttr->pFilters == NULL) {
// return true;
// }
//
// return filterRangeExecute(pQueryAttr->pFilters, pDataStatis, pQueryAttr->numOfCols, numOfRows);
// }
#if 0 #if 0
static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) { static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) {
STimeWindow w = {0}; STimeWindow w = {0};
@ -1215,7 +1205,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
} }
// current block has been discard due to filter applied // current block has been discard due to filter applied
// if (!doFilterByBlockStatistics(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) { // if (!doFilterByBlockSMA(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) {
// pCost->skipBlocks += 1; // pCost->skipBlocks += 1;
// qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey, // qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
// pBlockInfo->window.ekey, pBlockInfo->rows); // pBlockInfo->window.ekey, pBlockInfo->rows);

View File

@ -13,10 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h"
#include "executorimpl.h" #include "executorimpl.h"
#include "filter.h"
#include "function.h" #include "function.h"
#include "functionMgt.h" #include "functionMgt.h"
#include "os.h"
#include "querynodes.h" #include "querynodes.h"
#include "systable.h" #include "systable.h"
#include "tname.h" #include "tname.h"
@ -227,6 +228,57 @@ static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE bool doFilterByBlockSMA(const SNode* pFilterNode, SColumnDataAgg** pColsAgg, int32_t numOfCols,
int32_t numOfRows) {
if (pColsAgg == NULL || pFilterNode == NULL) {
return true;
}
SFilterInfo* filter = NULL;
// todo move to the initialization function
int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0);
bool keep = filterRangeExecute(filter, pColsAgg, numOfCols, numOfRows);
filterFreeInfo(filter);
return keep;
}
static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
bool allColumnsHaveAgg = true;
SColumnDataAgg** pColAgg = NULL;
int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
if (!allColumnsHaveAgg) {
return false;
}
// if (allColumnsHaveAgg == true) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
// todo create this buffer during creating operator
if (pBlock->pBlockAgg == NULL) {
pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
if (pBlock->pBlockAgg == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
}
}
for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->pColMatchInfo); ++i) {
SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
if (!pColMatchInfo->output) {
continue;
}
pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i];
}
return true;
}
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) { uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -236,6 +288,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
pCost->totalBlocks += 1; pCost->totalBlocks += 1;
pCost->totalRows += pBlock->info.rows; pCost->totalRows += pBlock->info.rows;
bool loadSMA = false;
*status = pInfo->dataBlockLoadFlag; *status = pInfo->dataBlockLoadFlag;
if (pTableScanInfo->pFilterNode != NULL || if (pTableScanInfo->pFilterNode != NULL ||
@ -259,41 +312,34 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
pCost->loadBlockStatis += 1; pCost->loadBlockStatis += 1;
loadSMA = true; // mark the operator of load sma;
bool allColumnsHaveAgg = true; bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
SColumnDataAgg** pColAgg = NULL; if (!success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
if (allColumnsHaveAgg == true) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
// todo create this buffer during creating operator
if (pBlock->pBlockAgg == NULL) {
pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
}
for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->pColMatchInfo); ++i) {
SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
if (!pColMatchInfo->output) {
continue;
}
pBlock->pBlockAgg[pColMatchInfo->targetSlotId] = pColAgg[i];
}
return TSDB_CODE_SUCCESS;
} else { // failed to load the block sma data, data block statistics does not exist, load data block instead
*status = FUNC_DATA_REQUIRED_DATA_LOAD; *status = FUNC_DATA_REQUIRED_DATA_LOAD;
} }
} }
ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD); ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
// todo filter data block according to the block sma data firstly // try to filter data block according to sma info
if (pTableScanInfo->pFilterNode != NULL) {
if (!loadSMA) {
doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
}
bool keep = doFilterByBlockSMA(pTableScanInfo->pFilterNode, pBlock->pBlockAgg, taosArrayGetSize(pBlock->pDataBlock),
pBlockInfo->rows);
if (!keep) {
qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->filterOutBlocks += 1;
(*status) = FUNC_DATA_REQUIRED_FILTEROUT;
return TSDB_CODE_SUCCESS;
}
}
// try to filter datablock according to current results
doDynamicPruneDataBlock(pOperator, pBlockInfo, status); doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
@ -303,16 +349,6 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#if 0
if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
pCost->filterOutBlocks += 1;
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
pBlockInfo->window.ekey, pBlockInfo->rows);
(*status) = FUNC_DATA_REQUIRED_FILTEROUT;
return TSDB_CODE_SUCCESS;
}
#endif
pCost->totalCheckedRows += pBlock->info.rows; pCost->totalCheckedRows += pBlock->info.rows;
pCost->loadBlocks += 1; pCost->loadBlocks += 1;
@ -2722,7 +2758,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
// todo filter data block according to the block sma data firstly // todo filter data block according to the block sma data firstly
#if 0 #if 0
if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { if (!doFilterByBlockSMA(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
pCost->filterOutBlocks += 1; pCost->filterOutBlocks += 1;
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
pBlockInfo->window.ekey, pBlockInfo->rows); pBlockInfo->window.ekey, pBlockInfo->rows);

View File

@ -3245,7 +3245,7 @@ _return:
return code; return code;
} }
bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows) { bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg **pDataStatis, int32_t numOfCols, int32_t numOfRows) {
if (FILTER_EMPTY_RES(info)) { if (FILTER_EMPTY_RES(info)) {
return false; return false;
} }
@ -3261,7 +3261,7 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t
int32_t index = -1; int32_t index = -1;
SFilterRangeCtx *ctx = info->colRange[k]; SFilterRangeCtx *ctx = info->colRange[k];
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
if (pDataStatis[i].colId == ctx->colId) { if (pDataStatis[i]->colId == ctx->colId) {
index = i; index = i;
break; break;
} }
@ -3277,13 +3277,13 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t
break; break;
} }
if (pDataStatis[index].numOfNull <= 0) { if (pDataStatis[index]->numOfNull <= 0) {
if (ctx->isnull && !ctx->notnull && !ctx->isrange) { if (ctx->isnull && !ctx->notnull && !ctx->isrange) {
ret = false; ret = false;
break; break;
} }
} else if (pDataStatis[index].numOfNull > 0) { } else if (pDataStatis[index]->numOfNull > 0) {
if (pDataStatis[index].numOfNull == numOfRows) { if (pDataStatis[index]->numOfNull == numOfRows) {
if ((ctx->notnull || ctx->isrange) && (!ctx->isnull)) { if ((ctx->notnull || ctx->isrange) && (!ctx->isnull)) {
ret = false; ret = false;
break; break;
@ -3297,7 +3297,7 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t
} }
} }
SColumnDataAgg* pDataBlockst = &pDataStatis[index]; SColumnDataAgg* pDataBlockst = pDataStatis[index];
SFilterRangeNode *r = ctx->rs; SFilterRangeNode *r = ctx->rs;
float minv = 0; float minv = 0;