ehn(query): add the file data block load optimization check.
This commit is contained in:
parent
ad3675daed
commit
5029cd67b2
|
@ -139,10 +139,10 @@ bool fmIsSpecialDataRequiredFunc(int32_t funcId);
|
|||
bool fmIsDynamicScanOptimizedFunc(int32_t funcId);
|
||||
|
||||
typedef enum EFuncDataRequired {
|
||||
FUNC_DATA_REQUIRED_ALL_NEEDED = 1,
|
||||
FUNC_DATA_REQUIRED_STATIS_NEEDED,
|
||||
FUNC_DATA_REQUIRED_NO_NEEDED,
|
||||
FUNC_DATA_REQUIRED_DISCARD
|
||||
FUNC_DATA_REQUIRED_DATA_LOAD = 1,
|
||||
FUNC_DATA_REQUIRED_STATIS_LOAD,
|
||||
FUNC_DATA_REQUIRED_NOT_LOAD,
|
||||
FUNC_DATA_REQUIRED_FILTEROUT,
|
||||
} EFuncDataRequired;
|
||||
|
||||
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
|
||||
|
|
|
@ -42,10 +42,6 @@
|
|||
|
||||
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
|
||||
|
||||
struct SColumnFilterElem;
|
||||
|
||||
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, const char* val1, const char* val2, int16_t type);
|
||||
|
||||
typedef struct SGroupResInfo {
|
||||
int32_t totalGroup;
|
||||
int32_t currentGroup;
|
||||
|
|
|
@ -38,8 +38,6 @@ extern "C" {
|
|||
#include "tpagedbuf.h"
|
||||
#include "tmsg.h"
|
||||
|
||||
struct SColumnFilterElem;
|
||||
|
||||
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
|
||||
|
||||
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
|
||||
|
@ -225,8 +223,6 @@ typedef struct STaskRuntimeEnv {
|
|||
void* qinfo;
|
||||
uint8_t scanFlag; // denotes reversed scan of data or not
|
||||
void* pTsdbReadHandle;
|
||||
|
||||
int32_t prevGroupId; // previous executed group id
|
||||
bool enableGroupData;
|
||||
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
||||
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
||||
|
@ -241,8 +237,6 @@ typedef struct STaskRuntimeEnv {
|
|||
|
||||
char* tagVal; // tag value of current data block
|
||||
struct SScalarFunctionSupport* scalarSup;
|
||||
|
||||
SSDataBlock* outputBuf;
|
||||
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
|
||||
struct SOperatorInfo* proot;
|
||||
SGroupResInfo groupResInfo;
|
||||
|
@ -250,7 +244,6 @@ typedef struct STaskRuntimeEnv {
|
|||
|
||||
STableQueryInfo* current;
|
||||
SResultInfo resultInfo;
|
||||
SHashObj* pTableRetrieveTsMap;
|
||||
struct SUdfInfo* pUdfInfo;
|
||||
} STaskRuntimeEnv;
|
||||
|
||||
|
@ -350,6 +343,7 @@ typedef struct STableScanInfo {
|
|||
int64_t elapsedTime;
|
||||
int32_t prevGroupId; // previous table group id
|
||||
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
||||
int32_t dataBlockLoadFlag;
|
||||
} STableScanInfo;
|
||||
|
||||
typedef struct STagScanInfo {
|
||||
|
@ -616,7 +610,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
|
|||
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
|
||||
|
||||
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime,
|
||||
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t dataLoadFlag, int32_t repeatTime,
|
||||
int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
|
||||
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||
|
|
|
@ -271,7 +271,7 @@ static int compareRowData(const void* a, const void* b, const void* userData) {
|
|||
}
|
||||
|
||||
// setup the output buffer for each operator
|
||||
SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) {
|
||||
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
|
||||
int32_t numOfCols = LIST_LENGTH(pNode->pSlots);
|
||||
|
||||
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
|
@ -6518,8 +6518,6 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) {
|
|||
|
||||
SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
|
||||
// pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
|
||||
|
||||
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
|
||||
assert(numOfGroup == 0 || numOfGroup == 1);
|
||||
|
||||
|
@ -6747,13 +6745,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
}
|
||||
|
||||
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
||||
|
||||
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count,
|
||||
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, dataLoadFlag, pScanPhyNode->count,
|
||||
pScanPhyNode->reverse, pColList, pResBlock, pScanPhyNode->node.pConditions, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
||||
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc);
|
||||
return createExchangeOperatorInfo(pExchange->pSrcEndPoints, pResBlock, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
|
||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
||||
|
@ -6761,7 +6759,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
||||
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
||||
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
||||
|
@ -6770,7 +6768,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
return pOperator;
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
||||
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pSysScanPhyNode->scan.node.pOutputDataBlockDesc);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pSysScanPhyNode->scan.node.pOutputDataBlockDesc);
|
||||
|
||||
struct SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan;
|
||||
SArray* colList = extractScanColumnId(pScanNode->pScanCols);
|
||||
|
@ -6799,14 +6797,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode;
|
||||
SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &num);
|
||||
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset};
|
||||
SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset};
|
||||
pOptr = createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) {
|
||||
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
|
||||
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
|
||||
SExprInfo* pScalarExprInfo = NULL;
|
||||
int32_t numOfScalarExpr = 0;
|
||||
|
@ -6824,7 +6822,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
||||
|
||||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
|
||||
SInterval interval = {
|
||||
.interval = pIntervalPhyNode->interval,
|
||||
|
@ -6840,7 +6838,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
|
||||
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
|
||||
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
SArray* info = createSortInfo(pSortPhyNode->pSortKeys, pSortPhyNode->pTargets);
|
||||
SArray* slotMap = createIndexMap(pSortPhyNode->pTargets);
|
||||
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, slotMap, pTaskInfo);
|
||||
|
@ -6848,12 +6846,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
||||
|
||||
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
|
||||
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode;
|
||||
SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys);
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
|
||||
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num);
|
||||
pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL);
|
||||
|
@ -6861,11 +6859,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pPhyNode;
|
||||
|
||||
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) {
|
||||
SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*) pPhyNode;
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||
|
||||
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num);
|
||||
pOptr = createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo);
|
||||
|
|
|
@ -64,16 +64,19 @@ static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) {
|
|||
#endif
|
||||
}
|
||||
|
||||
int32_t loadDataBlock(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) {
|
||||
int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
STableScanInfo* pInfo = pOperator->info;
|
||||
|
||||
STaskCostInfo* pCost = &pTaskInfo->cost;
|
||||
|
||||
pCost->totalBlocks += 1;
|
||||
pCost->loadBlocks += 1;
|
||||
|
||||
pCost->totalRows += pBlock->info.rows;
|
||||
|
||||
pCost->totalCheckedRows += pBlock->info.rows;
|
||||
pCost->loadBlocks += 1;
|
||||
|
||||
*status = BLK_DATA_DATA_LOAD;
|
||||
*status = pInfo->dataBlockLoadFlag;
|
||||
|
||||
SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
|
||||
if (pCols == NULL) {
|
||||
|
@ -217,7 +220,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
|
|||
return p;
|
||||
}
|
||||
|
||||
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
|
||||
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t dataLoadFlag,
|
||||
int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock,
|
||||
SNode* pCondition, SExecTaskInfo* pTaskInfo) {
|
||||
assert(repeatTime > 0);
|
||||
|
@ -232,6 +235,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pInfo->dataBlockLoadFlag= dataLoadFlag;
|
||||
pInfo->pResBlock = pResBlock;
|
||||
pInfo->pFilterNode = pCondition;
|
||||
pInfo->dataReader = pTsdbReadHandle;
|
||||
|
|
|
@ -58,9 +58,9 @@ void functionFinalize(SqlFunctionCtx *pCtx) {
|
|||
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
||||
SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (QUERY_NODE_COLUMN == nodeType(pParam) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pParam)->colId) {
|
||||
return FUNC_DATA_REQUIRED_NO_NEEDED;
|
||||
return FUNC_DATA_REQUIRED_NOT_LOAD;
|
||||
}
|
||||
return FUNC_DATA_REQUIRED_STATIS_NEEDED;
|
||||
return FUNC_DATA_REQUIRED_STATIS_LOAD;
|
||||
}
|
||||
|
||||
bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
|
|
|
@ -78,10 +78,10 @@ int32_t fmGetFuncResultType(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
|||
|
||||
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
||||
if (pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) {
|
||||
return FUNC_DATA_REQUIRED_ALL_NEEDED;
|
||||
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||
}
|
||||
if (NULL == funcMgtBuiltins[pFunc->funcId].dataRequiredFunc) {
|
||||
return FUNC_DATA_REQUIRED_ALL_NEEDED;
|
||||
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||
}
|
||||
return funcMgtBuiltins[pFunc->funcId].dataRequiredFunc(pFunc, pTimeWindow);
|
||||
}
|
||||
|
|
|
@ -200,7 +200,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
strcpy(pScan->tableName.tname, pRealTable->table.tableName);
|
||||
pScan->showRewrite = pCxt->pPlanCxt->showRewrite;
|
||||
pScan->ratio = pRealTable->ratio;
|
||||
pScan->dataRequired = FUNC_DATA_REQUIRED_ALL_NEEDED;
|
||||
pScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||
|
||||
// set columns to scan
|
||||
SNodeList* pCols = NULL;
|
||||
|
|
|
@ -125,12 +125,12 @@ static int32_t osdMatch(SOptimizeContext* pCxt, SLogicNode* pLogicNode, SOsdInfo
|
|||
|
||||
static EFuncDataRequired osdPromoteDataRequired(EFuncDataRequired l , EFuncDataRequired r) {
|
||||
switch (l) {
|
||||
case FUNC_DATA_REQUIRED_ALL_NEEDED:
|
||||
case FUNC_DATA_REQUIRED_DATA_LOAD:
|
||||
return l;
|
||||
case FUNC_DATA_REQUIRED_STATIS_NEEDED:
|
||||
return FUNC_DATA_REQUIRED_ALL_NEEDED == r ? r : l;
|
||||
case FUNC_DATA_REQUIRED_NO_NEEDED:
|
||||
return FUNC_DATA_REQUIRED_DISCARD == r ? l : r;
|
||||
case FUNC_DATA_REQUIRED_STATIS_LOAD:
|
||||
return FUNC_DATA_REQUIRED_DATA_LOAD == r ? r : l;
|
||||
case FUNC_DATA_REQUIRED_NOT_LOAD:
|
||||
return FUNC_DATA_REQUIRED_FILTEROUT == r ? l : r;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -139,9 +139,9 @@ static EFuncDataRequired osdPromoteDataRequired(EFuncDataRequired l , EFuncDataR
|
|||
|
||||
static int32_t osdGetDataRequired(SNodeList* pFuncs) {
|
||||
if (NULL == pFuncs) {
|
||||
return FUNC_DATA_REQUIRED_ALL_NEEDED;
|
||||
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||
}
|
||||
EFuncDataRequired dataRequired = FUNC_DATA_REQUIRED_DISCARD;
|
||||
EFuncDataRequired dataRequired = FUNC_DATA_REQUIRED_FILTEROUT;
|
||||
SNode* pFunc = NULL;
|
||||
FOREACH(pFunc, pFuncs) {
|
||||
dataRequired = osdPromoteDataRequired(dataRequired, fmFuncDataRequired((SFunctionNode*)pFunc, NULL));
|
||||
|
|
Loading…
Reference in New Issue