Merge branch '3.0' into feature/TD-11463-3.0
This commit is contained in:
commit
c841f892a9
|
@ -139,10 +139,10 @@ bool fmIsSpecialDataRequiredFunc(int32_t funcId);
|
||||||
bool fmIsDynamicScanOptimizedFunc(int32_t funcId);
|
bool fmIsDynamicScanOptimizedFunc(int32_t funcId);
|
||||||
|
|
||||||
typedef enum EFuncDataRequired {
|
typedef enum EFuncDataRequired {
|
||||||
FUNC_DATA_REQUIRED_ALL_NEEDED = 1,
|
FUNC_DATA_REQUIRED_DATA_LOAD = 1,
|
||||||
FUNC_DATA_REQUIRED_STATIS_NEEDED,
|
FUNC_DATA_REQUIRED_STATIS_LOAD,
|
||||||
FUNC_DATA_REQUIRED_NO_NEEDED,
|
FUNC_DATA_REQUIRED_NOT_LOAD,
|
||||||
FUNC_DATA_REQUIRED_DISCARD
|
FUNC_DATA_REQUIRED_FILTEROUT,
|
||||||
} EFuncDataRequired;
|
} EFuncDataRequired;
|
||||||
|
|
||||||
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
|
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
|
||||||
|
|
|
@ -42,10 +42,6 @@
|
||||||
|
|
||||||
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
|
#define curTimeWindowIndex(_winres) ((_winres)->curIndex)
|
||||||
|
|
||||||
struct SColumnFilterElem;
|
|
||||||
|
|
||||||
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, const char* val1, const char* val2, int16_t type);
|
|
||||||
|
|
||||||
typedef struct SGroupResInfo {
|
typedef struct SGroupResInfo {
|
||||||
int32_t totalGroup;
|
int32_t totalGroup;
|
||||||
int32_t currentGroup;
|
int32_t currentGroup;
|
||||||
|
|
|
@ -38,8 +38,6 @@ extern "C" {
|
||||||
#include "tpagedbuf.h"
|
#include "tpagedbuf.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
|
|
||||||
struct SColumnFilterElem;
|
|
||||||
|
|
||||||
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
|
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
|
||||||
|
|
||||||
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
|
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
|
||||||
|
@ -225,8 +223,6 @@ typedef struct STaskRuntimeEnv {
|
||||||
void* qinfo;
|
void* qinfo;
|
||||||
uint8_t scanFlag; // denotes reversed scan of data or not
|
uint8_t scanFlag; // denotes reversed scan of data or not
|
||||||
void* pTsdbReadHandle;
|
void* pTsdbReadHandle;
|
||||||
|
|
||||||
int32_t prevGroupId; // previous executed group id
|
|
||||||
bool enableGroupData;
|
bool enableGroupData;
|
||||||
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
||||||
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
||||||
|
@ -241,8 +237,6 @@ typedef struct STaskRuntimeEnv {
|
||||||
|
|
||||||
char* tagVal; // tag value of current data block
|
char* tagVal; // tag value of current data block
|
||||||
struct SScalarFunctionSupport* scalarSup;
|
struct SScalarFunctionSupport* scalarSup;
|
||||||
|
|
||||||
SSDataBlock* outputBuf;
|
|
||||||
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
|
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
|
||||||
struct SOperatorInfo* proot;
|
struct SOperatorInfo* proot;
|
||||||
SGroupResInfo groupResInfo;
|
SGroupResInfo groupResInfo;
|
||||||
|
@ -250,7 +244,6 @@ typedef struct STaskRuntimeEnv {
|
||||||
|
|
||||||
STableQueryInfo* current;
|
STableQueryInfo* current;
|
||||||
SResultInfo resultInfo;
|
SResultInfo resultInfo;
|
||||||
SHashObj* pTableRetrieveTsMap;
|
|
||||||
struct SUdfInfo* pUdfInfo;
|
struct SUdfInfo* pUdfInfo;
|
||||||
} STaskRuntimeEnv;
|
} STaskRuntimeEnv;
|
||||||
|
|
||||||
|
@ -350,6 +343,7 @@ typedef struct STableScanInfo {
|
||||||
int64_t elapsedTime;
|
int64_t elapsedTime;
|
||||||
int32_t prevGroupId; // previous table group id
|
int32_t prevGroupId; // previous table group id
|
||||||
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
||||||
|
int32_t dataBlockLoadFlag;
|
||||||
} STableScanInfo;
|
} STableScanInfo;
|
||||||
|
|
||||||
typedef struct STagScanInfo {
|
typedef struct STagScanInfo {
|
||||||
|
@ -616,7 +610,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
|
||||||
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
|
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
|
||||||
|
|
||||||
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime,
|
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t dataLoadFlag, int32_t repeatTime,
|
||||||
int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SExecTaskInfo* pTaskInfo);
|
int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
|
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
|
||||||
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||||
|
|
|
@ -271,7 +271,7 @@ static int compareRowData(const void* a, const void* b, const void* userData) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// setup the output buffer for each operator
|
// setup the output buffer for each operator
|
||||||
SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) {
|
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
|
||||||
int32_t numOfCols = LIST_LENGTH(pNode->pSlots);
|
int32_t numOfCols = LIST_LENGTH(pNode->pSlots);
|
||||||
|
|
||||||
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||||
|
@ -6518,8 +6518,6 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
|
|
||||||
SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) {
|
SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||||
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
|
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
|
||||||
// pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
|
|
||||||
|
|
||||||
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
|
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
|
||||||
assert(numOfGroup == 0 || numOfGroup == 1);
|
assert(numOfGroup == 0 || numOfGroup == 1);
|
||||||
|
|
||||||
|
@ -6739,21 +6737,22 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
int32_t type = nodeType(pPhyNode);
|
int32_t type = nodeType(pPhyNode);
|
||||||
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
|
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
|
||||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
|
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode;
|
||||||
|
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode* ) pPhyNode;
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
||||||
if (pDataReader == NULL) {
|
if (pDataReader == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
||||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
||||||
|
|
||||||
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count,
|
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pTableScanNode->dataRequired, pScanPhyNode->count,
|
||||||
pScanPhyNode->reverse, pColList, pResBlock, pScanPhyNode->node.pConditions, pTaskInfo);
|
pScanPhyNode->reverse, pColList, pResBlock, pScanPhyNode->node.pConditions, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
||||||
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
||||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc);
|
||||||
return createExchangeOperatorInfo(pExchange->pSrcEndPoints, pResBlock, pTaskInfo);
|
return createExchangeOperatorInfo(pExchange->pSrcEndPoints, pResBlock, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
|
||||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
||||||
|
@ -6761,7 +6760,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
||||||
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pScanPhyNode->node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
||||||
|
@ -6770,7 +6769,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
return pOperator;
|
return pOperator;
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
||||||
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
|
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
|
||||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pSysScanPhyNode->scan.node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pSysScanPhyNode->scan.node.pOutputDataBlockDesc);
|
||||||
|
|
||||||
struct SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan;
|
struct SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan;
|
||||||
SArray* colList = extractScanColumnId(pScanNode->pScanCols);
|
SArray* colList = extractScanColumnId(pScanNode->pScanCols);
|
||||||
|
@ -6799,14 +6798,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode;
|
SProjectPhysiNode* pProjPhyNode = (SProjectPhysiNode*) pPhyNode;
|
||||||
SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &num);
|
SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &num);
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||||
SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset};
|
SLimit limit = {.limit = pProjPhyNode->limit, .offset = pProjPhyNode->offset};
|
||||||
SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset};
|
SLimit slimit = {.limit = pProjPhyNode->slimit, .offset = pProjPhyNode->soffset};
|
||||||
pOptr = createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo);
|
pOptr = createProjectOperatorInfo(ops[0], pExprInfo, num, pResBlock, &limit, &slimit, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_AGG == type) {
|
||||||
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
|
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode;
|
||||||
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
|
SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num);
|
||||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||||
|
|
||||||
SExprInfo* pScalarExprInfo = NULL;
|
SExprInfo* pScalarExprInfo = NULL;
|
||||||
int32_t numOfScalarExpr = 0;
|
int32_t numOfScalarExpr = 0;
|
||||||
|
@ -6824,7 +6823,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
|
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num);
|
||||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||||
|
|
||||||
SInterval interval = {
|
SInterval interval = {
|
||||||
.interval = pIntervalPhyNode->interval,
|
.interval = pIntervalPhyNode->interval,
|
||||||
|
@ -6840,7 +6839,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
|
||||||
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
|
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||||
SArray* info = createSortInfo(pSortPhyNode->pSortKeys, pSortPhyNode->pTargets);
|
SArray* info = createSortInfo(pSortPhyNode->pSortKeys, pSortPhyNode->pTargets);
|
||||||
SArray* slotMap = createIndexMap(pSortPhyNode->pTargets);
|
SArray* slotMap = createIndexMap(pSortPhyNode->pTargets);
|
||||||
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, slotMap, pTaskInfo);
|
pOptr = createSortOperatorInfo(ops[0], pResBlock, info, slotMap, pTaskInfo);
|
||||||
|
@ -6848,12 +6847,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
|
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num);
|
||||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||||
pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo);
|
pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
|
||||||
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode;
|
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*) pPhyNode;
|
||||||
SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys);
|
SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys);
|
||||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||||
|
|
||||||
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num);
|
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num);
|
||||||
pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL);
|
pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo, NULL);
|
||||||
|
@ -6861,11 +6860,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pPhyNode;
|
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pPhyNode;
|
||||||
|
|
||||||
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
|
SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num);
|
||||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||||
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo);
|
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_JOIN == type) {
|
||||||
SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*) pPhyNode;
|
SJoinPhysiNode* pJoinNode = (SJoinPhysiNode*) pPhyNode;
|
||||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
||||||
|
|
||||||
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num);
|
SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &num);
|
||||||
pOptr = createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo);
|
pOptr = createJoinOperatorInfo(ops, size, pExprInfo, num, pResBlock, pJoinNode->pOnConditions, pTaskInfo);
|
||||||
|
|
|
@ -64,16 +64,19 @@ static void setupQueryRangeForReverseScan(STableScanInfo* pTableScanInfo) {
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t loadDataBlock(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) {
|
int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) {
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
STableScanInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
STaskCostInfo* pCost = &pTaskInfo->cost;
|
STaskCostInfo* pCost = &pTaskInfo->cost;
|
||||||
|
|
||||||
pCost->totalBlocks += 1;
|
pCost->totalBlocks += 1;
|
||||||
pCost->totalRows += pBlock->info.rows;
|
|
||||||
|
|
||||||
pCost->totalCheckedRows += pBlock->info.rows;
|
|
||||||
pCost->loadBlocks += 1;
|
pCost->loadBlocks += 1;
|
||||||
|
|
||||||
*status = BLK_DATA_DATA_LOAD;
|
pCost->totalRows += pBlock->info.rows;
|
||||||
|
pCost->totalCheckedRows += pBlock->info.rows;
|
||||||
|
|
||||||
|
*status = pInfo->dataBlockLoadFlag;
|
||||||
|
|
||||||
SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
|
SArray* pCols = tsdbRetrieveDataBlock(pTableScanInfo->dataReader, NULL);
|
||||||
if (pCols == NULL) {
|
if (pCols == NULL) {
|
||||||
|
@ -139,7 +142,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
|
|
||||||
// this function never returns error?
|
// this function never returns error?
|
||||||
uint32_t status = BLK_DATA_DATA_LOAD;
|
uint32_t status = BLK_DATA_DATA_LOAD;
|
||||||
int32_t code = loadDataBlock(pTaskInfo, pTableScanInfo, pBlock, &status);
|
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
|
||||||
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
|
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pOperator->pTaskInfo->env, code);
|
longjmp(pOperator->pTaskInfo->env, code);
|
||||||
|
@ -217,7 +220,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
|
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t dataLoadFlag,
|
||||||
int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock,
|
int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock,
|
||||||
SNode* pCondition, SExecTaskInfo* pTaskInfo) {
|
SNode* pCondition, SExecTaskInfo* pTaskInfo) {
|
||||||
assert(repeatTime > 0);
|
assert(repeatTime > 0);
|
||||||
|
@ -232,6 +235,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pInfo->dataBlockLoadFlag= dataLoadFlag;
|
||||||
pInfo->pResBlock = pResBlock;
|
pInfo->pResBlock = pResBlock;
|
||||||
pInfo->pFilterNode = pCondition;
|
pInfo->pFilterNode = pCondition;
|
||||||
pInfo->dataReader = pTsdbReadHandle;
|
pInfo->dataReader = pTsdbReadHandle;
|
||||||
|
|
|
@ -58,9 +58,9 @@ void functionFinalize(SqlFunctionCtx *pCtx) {
|
||||||
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
||||||
SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
|
SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
if (QUERY_NODE_COLUMN == nodeType(pParam) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pParam)->colId) {
|
if (QUERY_NODE_COLUMN == nodeType(pParam) && PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pParam)->colId) {
|
||||||
return FUNC_DATA_REQUIRED_NO_NEEDED;
|
return FUNC_DATA_REQUIRED_NOT_LOAD;
|
||||||
}
|
}
|
||||||
return FUNC_DATA_REQUIRED_STATIS_NEEDED;
|
return FUNC_DATA_REQUIRED_STATIS_LOAD;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
|
|
|
@ -78,10 +78,10 @@ int32_t fmGetFuncResultType(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
|
||||||
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
||||||
if (pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) {
|
if (pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) {
|
||||||
return FUNC_DATA_REQUIRED_ALL_NEEDED;
|
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
}
|
}
|
||||||
if (NULL == funcMgtBuiltins[pFunc->funcId].dataRequiredFunc) {
|
if (NULL == funcMgtBuiltins[pFunc->funcId].dataRequiredFunc) {
|
||||||
return FUNC_DATA_REQUIRED_ALL_NEEDED;
|
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
}
|
}
|
||||||
return funcMgtBuiltins[pFunc->funcId].dataRequiredFunc(pFunc, pTimeWindow);
|
return funcMgtBuiltins[pFunc->funcId].dataRequiredFunc(pFunc, pTimeWindow);
|
||||||
}
|
}
|
||||||
|
|
|
@ -200,7 +200,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
||||||
strcpy(pScan->tableName.tname, pRealTable->table.tableName);
|
strcpy(pScan->tableName.tname, pRealTable->table.tableName);
|
||||||
pScan->showRewrite = pCxt->pPlanCxt->showRewrite;
|
pScan->showRewrite = pCxt->pPlanCxt->showRewrite;
|
||||||
pScan->ratio = pRealTable->ratio;
|
pScan->ratio = pRealTable->ratio;
|
||||||
pScan->dataRequired = FUNC_DATA_REQUIRED_ALL_NEEDED;
|
pScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
|
|
||||||
// set columns to scan
|
// set columns to scan
|
||||||
SNodeList* pCols = NULL;
|
SNodeList* pCols = NULL;
|
||||||
|
|
|
@ -125,12 +125,12 @@ static int32_t osdMatch(SOptimizeContext* pCxt, SLogicNode* pLogicNode, SOsdInfo
|
||||||
|
|
||||||
static EFuncDataRequired osdPromoteDataRequired(EFuncDataRequired l , EFuncDataRequired r) {
|
static EFuncDataRequired osdPromoteDataRequired(EFuncDataRequired l , EFuncDataRequired r) {
|
||||||
switch (l) {
|
switch (l) {
|
||||||
case FUNC_DATA_REQUIRED_ALL_NEEDED:
|
case FUNC_DATA_REQUIRED_DATA_LOAD:
|
||||||
return l;
|
return l;
|
||||||
case FUNC_DATA_REQUIRED_STATIS_NEEDED:
|
case FUNC_DATA_REQUIRED_STATIS_LOAD:
|
||||||
return FUNC_DATA_REQUIRED_ALL_NEEDED == r ? r : l;
|
return FUNC_DATA_REQUIRED_DATA_LOAD == r ? r : l;
|
||||||
case FUNC_DATA_REQUIRED_NO_NEEDED:
|
case FUNC_DATA_REQUIRED_NOT_LOAD:
|
||||||
return FUNC_DATA_REQUIRED_DISCARD == r ? l : r;
|
return FUNC_DATA_REQUIRED_FILTEROUT == r ? l : r;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -139,9 +139,9 @@ static EFuncDataRequired osdPromoteDataRequired(EFuncDataRequired l , EFuncDataR
|
||||||
|
|
||||||
static int32_t osdGetDataRequired(SNodeList* pFuncs) {
|
static int32_t osdGetDataRequired(SNodeList* pFuncs) {
|
||||||
if (NULL == pFuncs) {
|
if (NULL == pFuncs) {
|
||||||
return FUNC_DATA_REQUIRED_ALL_NEEDED;
|
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
}
|
}
|
||||||
EFuncDataRequired dataRequired = FUNC_DATA_REQUIRED_DISCARD;
|
EFuncDataRequired dataRequired = FUNC_DATA_REQUIRED_FILTEROUT;
|
||||||
SNode* pFunc = NULL;
|
SNode* pFunc = NULL;
|
||||||
FOREACH(pFunc, pFuncs) {
|
FOREACH(pFunc, pFuncs) {
|
||||||
dataRequired = osdPromoteDataRequired(dataRequired, fmFuncDataRequired((SFunctionNode*)pFunc, NULL));
|
dataRequired = osdPromoteDataRequired(dataRequired, fmFuncDataRequired((SFunctionNode*)pFunc, NULL));
|
||||||
|
|
Loading…
Reference in New Issue