diff --git a/source/libs/executor/test/queryPlanTests.cpp b/source/libs/executor/test/queryPlanTests.cpp index 6faeaed2cf..52edd59b75 100755 --- a/source/libs/executor/test/queryPlanTests.cpp +++ b/source/libs/executor/test/queryPlanTests.cpp @@ -46,8 +46,10 @@ namespace { -#define QPT_MAX_LOOP 10000 -#define QPT_MAX_SUBPLAN_LEVEL 1000 +#define QPT_MAX_LOOP 1000000 +#define QPT_MAX_LEVEL_SUBPLAN_NUM 10 +#define QPT_MAX_SUBPLAN_LEVEL 5 +#define QPT_MAX_SUBPLAN_GROUP 5 #define QPT_MAX_WHEN_THEN_NUM 10 #define QPT_MAX_NODE_LEVEL 5 #define QPT_MAX_STRING_LEN 1048576 @@ -66,9 +68,17 @@ typedef enum { QPT_NODE_EXPR, QPT_NODE_FUNCTION, QPT_NODE_VALUE, + QPT_NODE_SUBPLAN, QPT_NODE_MAX_VALUE } QPT_NODE_TYPE; +enum { + QPT_PLAN_PHYSIC = 1, + QPT_PLAN_SINK, + QPT_PLAN_SUBPLAN, + QPT_PLAN_PLAN +}; + typedef SNode* (*planBuildFunc)(int32_t); @@ -130,6 +140,7 @@ typedef struct { typedef struct { bool correctExpected; uint64_t schedulerId; + char userName[TSDB_USER_LEN]; SQPTPlanParam plan; SQPTDbParam db; SQPTVnodeParam vnode; @@ -146,8 +157,7 @@ typedef struct { EOrder currTsOrder; int16_t nextBlockId; int32_t primaryTsSlotId; - SSubplanId nextSubplanId; - int32_t currSubplanLevel; + int32_t nextSubplanId; SExecTaskInfo* pCurrTask; } SQPTBuildPlanCtx; @@ -185,6 +195,7 @@ typedef struct { typedef struct { int32_t type; + int32_t classify; char* name; planBuildFunc buildFunc; } SQPTPlan; @@ -236,57 +247,61 @@ SNode* qptCreateDataDispatchPhysiNode(int32_t nodeType); SNode* qptCreateDataInsertPhysiNode(int32_t nodeType); SNode* qptCreateDataQueryInsertPhysiNode(int32_t nodeType); SNode* qptCreateDataDeletePhysiNode(int32_t nodeType); +SNode* qptCreatePhysicalPlanNode(int32_t nodeIdx); +void qptCreatePhysiNodesTree(SPhysiNode** ppRes, SPhysiNode* pParent, int32_t level); +SNode* qptCreateQueryPlanNode(int32_t nodeType); +SNode* qptCreateSubplanNode(int32_t nodeType); SQPTPlan qptPlans[] = { - {QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, "tagScan", qptCreateTagScanPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, "tableScan", qptCreateTableScanPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, "tableSeqScan", qptCreateTableSeqScanPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, "tableMergeScan", qptCreateTableMergeScanPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, "streamScan", qptCreateStreamScanPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, "sysTableScan", qptCreateSysTableScanPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, "blockDistScan", qptCreateBlockDistScanPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, "lastRowScan", qptCreateLastRowScanPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_PROJECT, "project", qptCreateProjectPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, "mergeJoin", qptCreateMergeJoinPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, "hashAgg", qptCreateHashAggPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, "exchange", qptCreateExchangePhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_MERGE, "merge", qptCreateMergePhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_SORT, "sort", qptCreateSortPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, "groupSort", qptCreateGroupSortPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, "interval", qptCreateIntervalPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, "mergeInterval", qptCreateMergeIntervalPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL, "mergeAlignedInterval", qptCreateMergeAlignedIntervalPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, "streamInterval", qptCreateStreamIntervalPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL, "streamFinalInterval", qptCreateStreamFinalIntervalPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL, "streamSemiInterval", qptCreateStreamSemiIntervalPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_FILL, "fill", qptCreateFillPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, "streamFill", qptCreateStreamFillPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, "sessionWindow", qptCreateSessionPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, "streamSession", qptCreateStreamSessionPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION, "streamSemiSession", qptCreateStreamSemiSessionPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION, "streamFinalSession", qptCreateStreamFinalSessionPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, "stateWindow", qptCreateStateWindowPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, "streamState", qptCreateStreamStatePhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_PARTITION, "partition", qptCreatePartitionPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, "streamPartition", qptCreateStreamPartitionPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, "indefRowsFunc", qptCreateIndefRowsFuncPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, "interpFunc", qptCreateInterpFuncPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_DISPATCH, "dataDispatch", qptCreateDataDispatchPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_INSERT, "dataInseret", qptCreateDataInsertPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT, "dataQueryInsert", qptCreateDataQueryInsertPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_DELETE, "dataDelete", qptCreateDataDeletePhysiNode}, - {QUERY_NODE_PHYSICAL_SUBPLAN, "subplan", NULL}, - {QUERY_NODE_PHYSICAL_PLAN, "plan", NULL}, - {QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, "tableCountScan", qptCreateTableCountScanPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT, "eventWindow", qptCreateMergeEventPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, "streamEventWindow", qptCreateStreamEventPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, "hashJoin", qptCreateHashJoinPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, "groupCache", qptCreateGroupCachePhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, "dynQueryCtrl", qptCreateDynQueryCtrlPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT, "countWindow", qptCreateCountWindowPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT, "streamCountWindow", qptCreateStreamCountWindowPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL, "streamMidInterval", qptCreateStreamMidIntervalPhysiNode} + {QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, QPT_PLAN_PHYSIC, "tagScan", qptCreateTagScanPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, QPT_PLAN_PHYSIC, "tableScan", qptCreateTableScanPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, QPT_PLAN_PHYSIC, "tableSeqScan", qptCreateTableSeqScanPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, QPT_PLAN_PHYSIC, "tableMergeScan", qptCreateTableMergeScanPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, QPT_PLAN_PHYSIC, "streamScan", qptCreateStreamScanPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, QPT_PLAN_PHYSIC, "sysTableScan", qptCreateSysTableScanPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, QPT_PLAN_PHYSIC, "blockDistScan", qptCreateBlockDistScanPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, QPT_PLAN_PHYSIC, "lastRowScan", qptCreateLastRowScanPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_PROJECT, QPT_PLAN_PHYSIC, "project", qptCreateProjectPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, QPT_PLAN_PHYSIC, "mergeJoin", qptCreateMergeJoinPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, QPT_PLAN_PHYSIC, "hashAgg", qptCreateHashAggPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, QPT_PLAN_PHYSIC, "exchange", qptCreateExchangePhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_MERGE, QPT_PLAN_PHYSIC, "merge", qptCreateMergePhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_SORT, QPT_PLAN_PHYSIC, "sort", qptCreateSortPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, QPT_PLAN_PHYSIC, "groupSort", qptCreateGroupSortPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, QPT_PLAN_PHYSIC, "interval", qptCreateIntervalPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, QPT_PLAN_PHYSIC, "mergeInterval", qptCreateMergeIntervalPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL, QPT_PLAN_PHYSIC, "mergeAlignedInterval", qptCreateMergeAlignedIntervalPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, QPT_PLAN_PHYSIC, "streamInterval", qptCreateStreamIntervalPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL, QPT_PLAN_PHYSIC, "streamFinalInterval", qptCreateStreamFinalIntervalPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL, QPT_PLAN_PHYSIC, "streamSemiInterval", qptCreateStreamSemiIntervalPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_FILL, QPT_PLAN_PHYSIC, "fill", qptCreateFillPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, QPT_PLAN_PHYSIC, "streamFill", qptCreateStreamFillPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, QPT_PLAN_PHYSIC, "sessionWindow", qptCreateSessionPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, QPT_PLAN_PHYSIC, "streamSession", qptCreateStreamSessionPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION, QPT_PLAN_PHYSIC, "streamSemiSession", qptCreateStreamSemiSessionPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION, QPT_PLAN_PHYSIC, "streamFinalSession", qptCreateStreamFinalSessionPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, QPT_PLAN_PHYSIC, "stateWindow", qptCreateStateWindowPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, QPT_PLAN_PHYSIC, "streamState", qptCreateStreamStatePhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_PARTITION, QPT_PLAN_PHYSIC, "partition", qptCreatePartitionPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, QPT_PLAN_PHYSIC, "streamPartition", qptCreateStreamPartitionPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, QPT_PLAN_PHYSIC, "indefRowsFunc", qptCreateIndefRowsFuncPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, QPT_PLAN_PHYSIC, "interpFunc", qptCreateInterpFuncPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_DISPATCH, QPT_PLAN_SINK, "dataDispatch", qptCreateDataDispatchPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_INSERT, QPT_PLAN_SINK, "dataInseret", qptCreateDataInsertPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT, QPT_PLAN_SINK, "dataQueryInsert", qptCreateDataQueryInsertPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_DELETE, QPT_PLAN_SINK, "dataDelete", qptCreateDataDeletePhysiNode}, + {QUERY_NODE_PHYSICAL_SUBPLAN, QPT_PLAN_SUBPLAN, "subplan", qptCreateSubplanNode}, + {QUERY_NODE_PHYSICAL_PLAN, QPT_PLAN_PLAN, "plan", qptCreateQueryPlanNode}, + {QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, QPT_PLAN_PHYSIC, "tableCountScan", qptCreateTableCountScanPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT, QPT_PLAN_PHYSIC, "eventWindow", qptCreateMergeEventPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, QPT_PLAN_PHYSIC, "streamEventWindow", qptCreateStreamEventPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN, QPT_PLAN_PHYSIC, "hashJoin", qptCreateHashJoinPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, QPT_PLAN_PHYSIC, "groupCache", qptCreateGroupCachePhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, QPT_PLAN_PHYSIC, "dynQueryCtrl", qptCreateDynQueryCtrlPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT, QPT_PLAN_PHYSIC, "countWindow", qptCreateCountWindowPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT, QPT_PLAN_PHYSIC, "streamCountWindow", qptCreateStreamCountWindowPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL, QPT_PLAN_PHYSIC, "streamMidInterval", qptCreateStreamMidIntervalPhysiNode} }; @@ -307,6 +322,7 @@ SQPTCtx qptCtx = {0}; SQPTCtrl qptCtrl = {1, 0, 0, 0, 0}; bool qptErrorRerun = false; bool qptInRerun = false; +int32_t qptSink[] = {QUERY_NODE_PHYSICAL_PLAN_DISPATCH, QUERY_NODE_PHYSICAL_PLAN_INSERT, QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT, QUERY_NODE_PHYSICAL_PLAN_DELETE}; SNode* qptMakeExprNode(SNode** ppNode); @@ -727,10 +743,28 @@ void qptGetRandTimeWindow(STimeWindow* pWindow) { pWindow->ekey = taosRand(); } -void qptGetSubplanId(SSubplanId* pId) { - pId->queryId = qptCtx.buildCtx.nextSubplanId.queryId; - pId->groupId = qptCtx.buildCtx.nextSubplanId.groupId++; - pId->subplanId = qptCtx.buildCtx.nextSubplanId.subplanId++; +int32_t qptGetSubplanNum(SNodeList* pList) { + if (QPT_NCORRECT_LOW_PROB()) { + return taosRand(); + } + + int32_t subplanNum = 0; + SNode* pNode = NULL; + FOREACH(pNode, pList) { + if (NULL == pNode || QUERY_NODE_NODE_LIST != nodeType(pNode)) { + continue; + } + + SNodeListNode* pNodeListNode = (SNodeListNode*)pNode; + + if (NULL == pNodeListNode->pNodeList) { + continue; + } + + subplanNum += pNodeListNode->pNodeList->length; + } + + return subplanNum; } int32_t qptNodesListAppend(SNodeList* pList, SNode* pNode) { @@ -1226,6 +1260,11 @@ SNode* qptMakeNodeListNode(QPT_NODE_TYPE nodeType, SNode** ppNode) { return qptMakeRandNode(ppNode); } + SNode* pTmp = NULL; + if (NULL == ppNode) { + ppNode = &pTmp; + } + SNodeListNode* pList = NULL; nodesMakeNode(QUERY_NODE_NODE_LIST, (SNode**)&pList); @@ -2700,7 +2739,7 @@ SNode* qptCreateDataDeletePhysiNode(int32_t nodeType) { pDeleter->tableFName[0] = QPT_RAND_BOOL_V ? 'a' : 0; } - SQPTCol* pCol = nodesListGetNode(qptCtx.param.tbl.pColList, 0); + SQPTCol* pCol = (SQPTCol*)nodesListGetNode(qptCtx.param.tbl.pColList, 0); if (QPT_CORRECT_HIGH_PROB() && pCol) { strcpy(pDeleter->tsColName, pCol->name); } else { @@ -2712,34 +2751,58 @@ SNode* qptCreateDataDeletePhysiNode(int32_t nodeType) { return (SNode*)pDeleter; } +void qptBuildSinkIdx(int32_t* pSinkIdx) { + +} + +void qptCreateSubplanDataSink(SDataSinkNode** ppOutput) { + static int32_t sinkIdx[sizeof(qptSink) / sizeof(qptSink[0])] = {-1}; + int32_t nodeIdx = 0; + + if (sinkIdx[0] < 0) { + qptBuildSinkIdx(sinkIdx); + } + + nodeIdx = taosRand() % (sizeof(sinkIdx)/sizeof(sinkIdx[0])); + + *ppOutput = (SDataSinkNode*)qptCreatePhysicalPlanNode(nodeIdx); +} + SNode* qptCreateSubplanNode(int32_t nodeType) { SSubplan* pSubplan = NULL; assert(0 == nodesMakeNode((ENodeType)nodeType, (SNode**)&pSubplan)); - qptGetSubplanId(&pSubplan->id); + pSubplan->id.queryId = qptCtx.param.plan.queryId; + pSubplan->id.groupId = taosRand() % QPT_MAX_SUBPLAN_GROUP; + pSubplan->id.subplanId = qptCtx.buildCtx.nextSubplanId++; - pSubplan->subplanType = QPT_CORRECT_HIGH_PROB() ? (taosRand() % SUBPLAN_TYPE_COMPUTE + 1) : (ESubplanType)taosRand(); + pSubplan->subplanType = QPT_CORRECT_HIGH_PROB() ? (ESubplanType)(taosRand() % SUBPLAN_TYPE_COMPUTE + 1) : (ESubplanType)taosRand(); pSubplan->msgType = qptGetRandSubplanMsgType(); - pSubplan->level = qptCtx.buildCtx.; - pDeleter->tableId = QPT_CORRECT_HIGH_PROB() ? qptCtx.param.tbl.uid : taosRand(); - pDeleter->tableType = QPT_CORRECT_HIGH_PROB() ? (QPT_RAND_BOOL_V ? TSDB_CHILD_TABLE : TSDB_NORMAL_TABLE) : (taosRand() % TSDB_TABLE_MAX); - if (QPT_CORRECT_HIGH_PROB()) { - sprintf(pDeleter->tableFName, "1.%s.%s", qptCtx.param.db.dbName, qptCtx.param.tbl.tblName); - } else { - pDeleter->tableFName[0] = QPT_RAND_BOOL_V ? 'a' : 0; - } + pSubplan->level = taosRand() % QPT_MAX_SUBPLAN_LEVEL; + sprintf(pSubplan->dbFName, "1.%s", qptCtx.param.db.dbName); + strcpy(pSubplan->user, qptCtx.param.userName); + pSubplan->execNode.nodeId = qptCtx.param.vnode.vgId; + memcpy(&pSubplan->execNode.epSet, &qptCtx.param.vnode.epSet, sizeof(pSubplan->execNode.epSet)); + pSubplan->execNodeStat.tableNum = taosRand(); - SQPTCol* pCol = nodesListGetNode(qptCtx.param.tbl.pColList, 0); - if (QPT_CORRECT_HIGH_PROB() && pCol) { - strcpy(pDeleter->tsColName, pCol->name); - } else { - pDeleter->tsColName[0] = QPT_RAND_BOOL_V ? 't' : 0; - } + qptCreatePhysiNodesTree(&pSubplan->pNode, NULL, 0); - qptGetRandTimeWindow(&pDeleter->deleteTimeRange); + qptCreateSubplanDataSink(&pSubplan->pDataSink); + + qptInitMakeNodeCtx(QPT_CORRECT_HIGH_PROB() ? false : true, QPT_CORRECT_HIGH_PROB() ? true : false, QPT_RAND_BOOL_V, 0, NULL); + qptMakeExprNode(&pSubplan->pTagCond); + + qptInitMakeNodeCtx(QPT_CORRECT_HIGH_PROB() ? false : true, QPT_CORRECT_HIGH_PROB() ? true : false, QPT_RAND_BOOL_V, 0, NULL); + qptMakeExprNode(&pSubplan->pTagIndexCond); + + pSubplan->showRewrite = QPT_RAND_BOOL_V; + pSubplan->isView = QPT_RAND_BOOL_V; + pSubplan->isAudit = QPT_RAND_BOOL_V; + pSubplan->dynamicRowThreshold = QPT_RAND_BOOL_V; + pSubplan->rowsThreshold = taosRand(); - return (SNode*)pDeleter; + return (SNode*)pSubplan; } @@ -2751,8 +2814,108 @@ SNode* qptCreatePhysicalPlanNode(int32_t nodeIdx) { return NULL; } -void qptCreateQueryPlan(SNode** ppPlan) { +SNode* qptCreateRealPhysicalPlanNode() { + int32_t nodeIdx = 0; + do { + nodeIdx = taosRand() % (sizeof(qptPlans) / sizeof(qptPlans[0])); + if (QPT_PLAN_PHYSIC != qptPlans[nodeIdx].classify) { + continue; + } + + return qptCreatePhysicalPlanNode(nodeIdx); + } while (true); +} + +void qptCreatePhysiNodesTree(SPhysiNode** ppRes, SPhysiNode* pParent, int32_t level) { + SPhysiNode* pNew = NULL; + if (level < QPT_MAX_SUBPLAN_LEVEL && (NULL == pParent || QPT_RAND_BOOL_V)) { + pNew = (SPhysiNode*)qptCreateRealPhysicalPlanNode(); + pNew->pParent = pParent; + int32_t childrenNum = taosRand() % QPT_MAX_LEVEL_SUBPLAN_NUM; + for (int32_t i = 0; i < childrenNum; ++i) { + qptCreatePhysiNodesTree(NULL, pNew, level + 1); + } + } else if (QPT_RAND_BOOL_V) { + return; + } + + if (pParent) { + qptNodesListMakeStrictAppend(&pParent->pChildren, (SNode*)pNew); + } else { + *ppRes = pNew; + } +} + +void qptAppendParentsSubplan(SNodeList* pParents, SNodeList* pNew) { + SNode* pNode = NULL; + FOREACH(pNode, pNew) { + if (NULL == pNode || QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(pNode)) { + continue; + } + + qptNodesListMakeStrictAppend(&pParents, pNode); + } +} + +void qptSetSubplansRelation(SNodeList* pParents, SNodeList* pNew) { + int32_t parentIdx = 0; + SNode* pNode = NULL; + SSubplan* pChild = NULL; + SSubplan* pParent = NULL; + FOREACH(pNode, pNew) { + if (QPT_CORRECT_HIGH_PROB()) { + pChild = (SSubplan*)pNode; + parentIdx = taosRand() % pParents->length; + pParent = (SSubplan*)nodesListGetNode(pParents, parentIdx); + qptNodesListMakeStrictAppend(&pParent->pChildren, pNode); + qptNodesListMakeStrictAppend(&pChild->pParents, (SNode*)pParent); + } + } +} + +void qptBuildSubplansRelation(SNodeList* pList) { + SNode* pNode = NULL; + SNodeList* pParents = NULL; + FOREACH(pNode, pList) { + if (NULL == pNode || QUERY_NODE_NODE_LIST != nodeType(pNode)) { + continue; + } + + SNodeListNode* pNodeList = (SNodeListNode*)pNode; + + if (NULL == pParents) { + qptAppendParentsSubplan(pParents, pNodeList->pNodeList); + continue; + } + + qptSetSubplansRelation(pParents, pNodeList->pNodeList); + qptAppendParentsSubplan(pParents, pNodeList->pNodeList); + } +} + +SNode* qptCreateQueryPlanNode(int32_t nodeType) { + SQueryPlan* pPlan = NULL; + assert(0 == nodesMakeNode((ENodeType)nodeType, (SNode**)&pPlan)); + + int32_t subplanNum = 0, subplanLevelNum = taosRand() % QPT_MAX_SUBPLAN_LEVEL; + pPlan->queryId = QPT_CORRECT_HIGH_PROB() ? qptCtx.param.plan.queryId : taosRand(); + + for (int32_t l = 0; l < subplanLevelNum; ++l) { + qptInitMakeNodeCtx(QPT_CORRECT_HIGH_PROB() ? false : true, QPT_RAND_BOOL_V, QPT_RAND_BOOL_V, 0, NULL); + qptNodesListMakeStrictAppend(&pPlan->pSubplans, qptMakeNodeListNode(QPT_NODE_SUBPLAN, NULL)); + } + + pPlan->numOfSubplans = qptGetSubplanNum(pPlan->pSubplans); + qptBuildSubplansRelation(pPlan->pSubplans); + + pPlan->explainInfo.mode = (EExplainMode)(taosRand() % EXPLAIN_MODE_ANALYZE + 1); + pPlan->explainInfo.verbose = QPT_RAND_BOOL_V; + pPlan->explainInfo.ratio = taosRand(); + + pPlan->pPostPlan = QPT_RAND_BOOL_V ? NULL : (void*)0x1; + + return (SNode*)pPlan; } @@ -2913,6 +3076,7 @@ void qptInitTableCols(SNodeList** ppList, int32_t colNum, EColumnType colType) { void qptInitTestCtx(bool correctExpected, bool singleNode, int32_t nodeType, int32_t nodeIdx, int32_t paramNum, SQPTNodeParam* nodeParam) { qptCtx.param.correctExpected = correctExpected; qptCtx.param.schedulerId = taosRand(); + strcpy(qptCtx.param.userName, "user1"); qptCtx.param.plan.singlePhysiNode = singleNode; if (singleNode) { @@ -2967,10 +3131,7 @@ void qptInitTestCtx(bool correctExpected, bool singleNode, int32_t nodeType, int } qptCtx.buildCtx.nextBlockId++; - qptCtx.buildCtx.nextSubplanId.queryId = qptCtx.param.plan.queryId; - qptCtx.buildCtx.nextSubplanId.groupId++; - qptCtx.buildCtx.nextSubplanId.subplanId++; - qptCtx.buildCtx.currSubplanLevel = 0; + qptCtx.buildCtx.nextSubplanId++; } void qptDestroyTestCtx() { diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 77b39b8f0d..acb5c41d38 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1575,10 +1575,14 @@ void nodesDestroyNode(SNode* pNode) { case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN: { SHashJoinPhysiNode* pPhyNode = (SHashJoinPhysiNode*)pNode; destroyPhysiNode((SPhysiNode*)pPhyNode); + nodesDestroyNode(pPhyNode->pWindowOffset); + nodesDestroyNode(pPhyNode->pJLimit); nodesDestroyList(pPhyNode->pOnLeft); nodesDestroyList(pPhyNode->pOnRight); nodesDestroyNode(pPhyNode->leftPrimExpr); nodesDestroyNode(pPhyNode->rightPrimExpr); + nodesDestroyNode(pPhyNode->pLeftOnCond); + nodesDestroyNode(pPhyNode->pRightOnCond); nodesDestroyNode(pPhyNode->pFullOnCond); nodesDestroyList(pPhyNode->pTargets); @@ -1586,8 +1590,6 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyNode(pPhyNode->pColEqCond); nodesDestroyNode(pPhyNode->pTagEqCond); - nodesDestroyNode(pPhyNode->pLeftOnCond); - nodesDestroyNode(pPhyNode->pRightOnCond); break; } case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: { diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 03145da939..b888b826bb 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -363,6 +363,11 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } + if (QUERY_NODE_NODE_LIST != nodeType(plans)) { + SCH_JOB_ELOG("invalid level plan, level:%d, planNodeType:%d", i, nodeType(plans)); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } + taskNum = (int32_t)LIST_LENGTH(plans->pNodeList); if (taskNum <= 0) { SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i); @@ -380,10 +385,20 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { for (int32_t n = 0; n < taskNum; ++n) { SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n); if (NULL == plan) { - SCH_JOB_ELOG("fail to get the %dth subplan, taskNum: %d", n, taskNum); + SCH_JOB_ELOG("fail to get the %dth subplan, taskNum: %d, level: %d", n, taskNum, i); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } + if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(plan)) { + SCH_JOB_ELOG("invalid subplan type, level:%d, subplanNodeType:%d", i, nodeType(plan)); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } + + if (plan->subplanType < SUBPLAN_TYPE_MERGE || plan->subplanType > SUBPLAN_TYPE_COMPUTE) { + SCH_JOB_ELOG("invalid subplanType %d, level:%d, subplan idx:%d", plan->subplanType, i, n); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } + SCH_SET_JOB_TYPE(pJob, plan->subplanType); SSchTask task = {0};