enh(query): add more load conditions check before load data block from file.
This commit is contained in:
parent
078c48a50a
commit
774e4ad62c
|
@ -179,7 +179,7 @@ typedef struct SQueryCostInfo {
|
|||
uint32_t totalBlocks;
|
||||
uint32_t loadBlocks;
|
||||
uint32_t loadBlockStatis;
|
||||
uint32_t discardBlocks;
|
||||
uint32_t skipBlocks;
|
||||
uint64_t elapsedTime;
|
||||
uint64_t firstStageMergeTime;
|
||||
uint64_t winInfoSize;
|
||||
|
|
|
@ -3144,7 +3144,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
|
|||
if ((*status) == BLK_DATA_NO_NEEDED || (*status) == BLK_DATA_DISCARD) {
|
||||
qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
|
||||
pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||
pCost->discardBlocks += 1;
|
||||
pCost->skipBlocks += 1;
|
||||
} else if ((*status) == BLK_DATA_STATIS_NEEDED) {
|
||||
// this function never returns error?
|
||||
pCost->loadBlockStatis += 1;
|
||||
|
@ -3184,7 +3184,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
|
|||
load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockStatis[i].min),
|
||||
(char*)&(pBlock->pBlockStatis[i].max));
|
||||
if (!load) { // current block has been discard due to filter applied
|
||||
pCost->discardBlocks += 1;
|
||||
pCost->skipBlocks += 1;
|
||||
qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId,
|
||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||
(*status) = BLK_DATA_DISCARD;
|
||||
|
@ -3196,7 +3196,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
|
|||
|
||||
// current block has been discard due to filter applied
|
||||
if (!doFilterByBlockStatistics(pRuntimeEnv, pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
|
||||
pCost->discardBlocks += 1;
|
||||
pCost->skipBlocks += 1;
|
||||
qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
|
||||
pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||
(*status) = BLK_DATA_DISCARD;
|
||||
|
|
|
@ -72,7 +72,6 @@ char *metaTbCursorNext(SMTbCursor *pTbCur);
|
|||
|
||||
// tsdb
|
||||
typedef struct STsdb STsdb;
|
||||
typedef struct SDataStatis SDataStatis;
|
||||
typedef struct STsdbQueryCond STsdbQueryCond;
|
||||
typedef void *tsdbReaderT;
|
||||
|
||||
|
@ -92,7 +91,7 @@ int32_t tsdbQuerySTableByTagCond(void *pMeta, uint64_t uid, TSKEY skey, con
|
|||
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
|
||||
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
|
||||
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
|
||||
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SDataStatis **pBlockStatis);
|
||||
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg **pBlockStatis);
|
||||
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
|
||||
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
|
||||
int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
|
||||
|
@ -159,16 +158,6 @@ struct SVnodeCfg {
|
|||
int8_t hashMethod;
|
||||
};
|
||||
|
||||
struct SDataStatis {
|
||||
int16_t colId;
|
||||
int16_t maxIndex;
|
||||
int16_t minIndex;
|
||||
int16_t numOfNull;
|
||||
int64_t sum;
|
||||
int64_t max;
|
||||
int64_t min;
|
||||
};
|
||||
|
||||
struct STsdbQueryCond {
|
||||
STimeWindow twindow;
|
||||
int32_t order; // desc|asc order to iterate the data block
|
||||
|
|
|
@ -438,7 +438,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
|
|||
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock);
|
||||
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx);
|
||||
void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx);
|
||||
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols, SBlock *pBlock);
|
||||
void tsdbGetBlockStatis(SReadH *pReadh, SColumnDataAgg *pStatis, int numOfCols, SBlock *pBlock);
|
||||
|
||||
static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
|
||||
void *pBuf = *ppBuf;
|
||||
|
|
|
@ -99,10 +99,10 @@ typedef struct SIOCostSummary {
|
|||
|
||||
typedef struct STsdbReadHandle {
|
||||
STsdb* pTsdb;
|
||||
SQueryFilePos cur; // current position
|
||||
SQueryFilePos cur; // current position
|
||||
int16_t order;
|
||||
STimeWindow window; // the primary query time window that applies to all queries
|
||||
SDataStatis* statis; // query level statistics, only one table block statistics info exists at any time
|
||||
STimeWindow window; // the primary query time window that applies to all queries
|
||||
SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time
|
||||
int32_t numOfBlocks;
|
||||
SArray* pColumns; // column list, SColumnInfoData array list
|
||||
bool locateStart;
|
||||
|
@ -377,7 +377,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
|
|||
|
||||
if (pCond->numOfCols > 0) {
|
||||
// allocate buffer in order to load data blocks from file
|
||||
pReadHandle->statis = taosMemoryCalloc(pCond->numOfCols, sizeof(SDataStatis));
|
||||
pReadHandle->statis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
|
||||
if (pReadHandle->statis == NULL) {
|
||||
goto _end;
|
||||
}
|
||||
|
@ -477,7 +477,7 @@ void tsdbResetQueryHandle(tsdbReaderT queryHandle, STsdbQueryCond* pCond) {
|
|||
}
|
||||
|
||||
// allocate buffer in order to load data blocks from file
|
||||
memset(pTsdbReadHandle->statis, 0, sizeof(SDataStatis));
|
||||
memset(pTsdbReadHandle->statis, 0, sizeof(SColumnDataAgg));
|
||||
|
||||
tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
|
||||
tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
|
||||
|
@ -505,7 +505,7 @@ void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, STsdbQueryCond* pC
|
|||
}
|
||||
|
||||
// allocate buffer in order to load data blocks from file
|
||||
memset(pTsdbReadHandle->statis, 0, sizeof(SDataStatis));
|
||||
memset(pTsdbReadHandle->statis, 0, sizeof(SColumnDataAgg));
|
||||
|
||||
tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
|
||||
tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
|
||||
|
@ -3222,7 +3222,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa
|
|||
/*
|
||||
* return null for mixed data block, if not a complete file data block, the statistics value will always return NULL
|
||||
*/
|
||||
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStatis** pBlockStatis) {
|
||||
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SColumnDataAgg** pBlockStatis) {
|
||||
STsdbReadHandle* pHandle = (STsdbReadHandle*)pTsdbReadHandle;
|
||||
|
||||
SQueryFilePos* c = &pHandle->cur;
|
||||
|
@ -3252,7 +3252,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStati
|
|||
int16_t* colIds = pHandle->defaultLoadColumn->pData;
|
||||
|
||||
size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
|
||||
memset(pHandle->statis, 0, numOfCols * sizeof(SDataStatis));
|
||||
memset(pHandle->statis, 0, numOfCols * sizeof(SColumnDataAgg));
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
pHandle->statis[i].colId = colIds[i];
|
||||
}
|
||||
|
@ -3260,7 +3260,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT* pTsdbReadHandle, SDataStati
|
|||
tsdbGetBlockStatis(&pHandle->rhelper, pHandle->statis, (int)numOfCols, pBlockInfo->compBlock);
|
||||
|
||||
// always load the first primary timestamp column data
|
||||
SDataStatis* pPrimaryColStatis = &pHandle->statis[0];
|
||||
SColumnDataAgg* pPrimaryColStatis = &pHandle->statis[0];
|
||||
assert(pPrimaryColStatis->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
|
||||
|
||||
pPrimaryColStatis->numOfNull = 0;
|
||||
|
|
|
@ -433,7 +433,7 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
|
|||
return buf;
|
||||
}
|
||||
|
||||
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols, SBlock *pBlock) {
|
||||
void tsdbGetBlockStatis(SReadH *pReadh, SColumnDataAgg *pStatis, int numOfCols, SBlock *pBlock) {
|
||||
#ifdef TD_REFACTOR_3
|
||||
SBlockData *pBlockData = pReadh->pBlkData;
|
||||
|
||||
|
|
|
@ -133,7 +133,8 @@ typedef struct STaskCostInfo {
|
|||
uint32_t totalBlocks;
|
||||
uint32_t loadBlocks;
|
||||
uint32_t loadBlockStatis;
|
||||
uint32_t discardBlocks;
|
||||
uint32_t skipBlocks;
|
||||
uint32_t filterOutBlocks;
|
||||
uint64_t elapsedTime;
|
||||
uint64_t firstStageMergeTime;
|
||||
uint64_t winInfoSize;
|
||||
|
|
|
@ -2447,7 +2447,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
|
|||
if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) {
|
||||
//qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
|
||||
// pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||
pCost->discardBlocks += 1;
|
||||
pCost->skipBlocks += 1;
|
||||
} else if ((*status) == BLK_DATA_SMA_LOAD) {
|
||||
// this function never returns error?
|
||||
pCost->loadBlockStatis += 1;
|
||||
|
@ -2487,7 +2487,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
|
|||
// load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockAgg[i].min),
|
||||
// (char*)&(pBlock->pBlockAgg[i].max));
|
||||
if (!load) { // current block has been discard due to filter applied
|
||||
pCost->discardBlocks += 1;
|
||||
pCost->skipBlocks += 1;
|
||||
//qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId,
|
||||
// pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||
(*status) = BLK_DATA_FILTEROUT;
|
||||
|
@ -2499,7 +2499,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
|
|||
|
||||
// current block has been discard due to filter applied
|
||||
// if (!doFilterByBlockStatistics(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) {
|
||||
// pCost->discardBlocks += 1;
|
||||
// pCost->skipBlocks += 1;
|
||||
// qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey,
|
||||
// pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||
// (*status) = BLK_DATA_FILTEROUT;
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include "tglobal.h"
|
||||
#include "filter.h"
|
||||
#include "function.h"
|
||||
#include "functionMgt.h"
|
||||
#include "os.h"
|
||||
#include "querynodes.h"
|
||||
#include "tname.h"
|
||||
|
@ -71,31 +72,80 @@ int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo,
|
|||
STaskCostInfo* pCost = &pTaskInfo->cost;
|
||||
|
||||
pCost->totalBlocks += 1;
|
||||
pCost->loadBlocks += 1;
|
||||
|
||||
pCost->totalRows += pBlock->info.rows;
|
||||
pCost->totalCheckedRows += pBlock->info.rows;
|
||||
|
||||
*status = pInfo->dataBlockLoadFlag;
|
||||
|
||||
SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
|
||||
if (pCols == NULL) {
|
||||
return terrno;
|
||||
if (pTableScanInfo->pFilterNode != NULL) {
|
||||
(*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||
}
|
||||
|
||||
int32_t numOfCols = pBlock->info.numOfCols;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* p = taosArrayGet(pCols, i);
|
||||
SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
|
||||
if (!pColMatchInfo->output) {
|
||||
continue;
|
||||
SDataBlockInfo* pBlockInfo = &pBlock->info;
|
||||
|
||||
if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
|
||||
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
|
||||
pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||
pCost->filterOutBlocks += 1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
|
||||
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
|
||||
pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||
pCost->skipBlocks += 1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
||||
pCost->loadBlockStatis += 1;
|
||||
tsdbRetrieveDataBlockStatisInfo(pTableScanInfo->dataReader, &pBlock->pBlockAgg);
|
||||
|
||||
// failed to load the block sma data, data block statistics does not exist, load data block instead
|
||||
if (pBlock->pBlockAgg == NULL) {
|
||||
pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
|
||||
pCost->totalCheckedRows += pBlock->info.rows;
|
||||
pCost->loadBlocks += 1;
|
||||
}
|
||||
|
||||
ASSERT(pColMatchInfo->colId == p->info.colId);
|
||||
taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (*status == FUNC_DATA_REQUIRED_DATA_LOAD) {
|
||||
// todo filter data block according to the block sma data firstly
|
||||
#if 0
|
||||
if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
|
||||
pCost->filterOutBlocks += 1;
|
||||
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
|
||||
pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||
(*status) = FUNC_DATA_REQUIRED_FILTEROUT;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
#endif
|
||||
|
||||
pCost->totalCheckedRows += pBlock->info.rows;
|
||||
pCost->loadBlocks += 1;
|
||||
|
||||
SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
|
||||
if (pCols == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
// relocated the column data into the correct slotId
|
||||
int32_t numOfCols = pBlock->info.numOfCols;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* p = taosArrayGet(pCols, i);
|
||||
SColMatchInfo* pColMatchInfo = taosArrayGet(pTableScanInfo->pColMatchInfo, i);
|
||||
if (!pColMatchInfo->output) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ASSERT(pColMatchInfo->colId == p->info.colId);
|
||||
taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p);
|
||||
}
|
||||
}
|
||||
|
||||
doFilter(pTableScanInfo->pFilterNode, pBlock);
|
||||
if (pBlock->info.rows == 0) {
|
||||
pCost->filterOutBlocks += 1;
|
||||
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey,
|
||||
pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -114,7 +164,6 @@ static void setupEnvForReverseScan(STableScanInfo* pTableScanInfo, SqlFunctionCt
|
|||
|
||||
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
|
||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||
STableGroupInfo* pTableGroupInfo = &pOperator->pTaskInfo->tableqinfoGroupInfo;
|
||||
|
@ -141,15 +190,15 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
|
|||
// }
|
||||
|
||||
// this function never returns error?
|
||||
uint32_t status = BLK_DATA_DATA_LOAD;
|
||||
uint32_t status = FUNC_DATA_REQUIRED_NOT_LOAD;
|
||||
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
|
||||
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pOperator->pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
// current block is ignored according to filter result by block statistics data, continue load the next block
|
||||
if (status == BLK_DATA_FILTEROUT || pBlock->info.rows == 0) {
|
||||
// current block is filter out according to filter condition, continue load the next block
|
||||
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -64,7 +64,7 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
|
|||
return false;
|
||||
}
|
||||
if (NULL == pNode->pParent ||
|
||||
(QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode->pParent) && QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode->pParent))) {
|
||||
(QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode->pParent) && QUERY_NODE_LOGIC_PLAN_AGG != nodeType(pNode->pParent))) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
|
|
Loading…
Reference in New Issue