diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1a10f02c96..11c9be4d06 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -412,7 +412,7 @@ typedef enum ENodeType { // physical plan node QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN = 1100, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, - QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, + QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, // INACTIVE QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, @@ -426,7 +426,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_SORT, QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, - QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, + QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, // INACTIVE QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL, diff --git a/source/libs/executor/test/queryPlanTests.cpp b/source/libs/executor/test/queryPlanTests.cpp index 52edd59b75..f03d347a78 100755 --- a/source/libs/executor/test/queryPlanTests.cpp +++ b/source/libs/executor/test/queryPlanTests.cpp @@ -46,7 +46,7 @@ namespace { -#define QPT_MAX_LOOP 1000000 +#define QPT_MAX_LOOP 1000 #define QPT_MAX_LEVEL_SUBPLAN_NUM 10 #define QPT_MAX_SUBPLAN_LEVEL 5 #define QPT_MAX_SUBPLAN_GROUP 5 @@ -256,7 +256,7 @@ SNode* qptCreateSubplanNode(int32_t nodeType); SQPTPlan qptPlans[] = { {QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, QPT_PLAN_PHYSIC, "tagScan", qptCreateTagScanPhysiNode}, {QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, QPT_PLAN_PHYSIC, "tableScan", qptCreateTableScanPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, QPT_PLAN_PHYSIC, "tableSeqScan", qptCreateTableSeqScanPhysiNode}, + {QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, QPT_PLAN_PHYSIC, "tableSeqScan", NULL /*qptCreateTableSeqScanPhysiNode*/ }, {QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, QPT_PLAN_PHYSIC, "tableMergeScan", qptCreateTableMergeScanPhysiNode}, {QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, QPT_PLAN_PHYSIC, "streamScan", qptCreateStreamScanPhysiNode}, {QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, QPT_PLAN_PHYSIC, "sysTableScan", qptCreateSysTableScanPhysiNode}, @@ -2975,6 +2975,159 @@ void qptHandleTestEnd() { } +void qptExecPlan(SReadHandle* pReadHandle, SNode* pNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** ppOperaotr) { + switch (nodeType(pNode)) { + case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: + qptCtx.result.code = createTagScanOperatorInfo(pReadHandle, (STagScanPhysiNode*)pNode, NULL, NULL, NULL, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: + qptCtx.result.code = createTableScanOperatorInfo((STableScanPhysiNode*)pNode, pReadHandle, NULL, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN: + //qptCtx.result.code = createTableSeqScanOperatorInfo((STableScanPhysiNode*)pNode, pReadHandle, NULL, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN: + qptCtx.result.code = createTableMergeScanOperatorInfo((STableScanPhysiNode*)pNode, pReadHandle, NULL, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: + qptCtx.result.code = createStreamScanOperatorInfo(pReadHandle, (STableScanPhysiNode*)pNode, NULL, NULL, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: + qptCtx.result.code = createSysTableScanOperatorInfo(pReadHandle, (SSystemTableScanPhysiNode*)pNode, NULL, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN: + qptCtx.result.code = createDataBlockInfoScanOperator(pReadHandle, (SBlockDistScanPhysiNode*)pNode, NULL, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN: + qptCtx.result.code = createCacherowsScanOperator((SLastRowScanPhysiNode*)pNode, pReadHandle, NULL, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_PROJECT: + qptCtx.result.code = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: + qptCtx.result.code = createMergeJoinOperatorInfo(NULL, 0, (SSortMergeJoinPhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: { + SAggPhysiNode* pAggNode = (SAggPhysiNode*)pNode; + if (pAggNode->pGroupKeys != NULL) { + qptCtx.result.code = createGroupOperatorInfo(NULL, pAggNode, pTaskInfo, ppOperaotr); + } else { + qptCtx.result.code = createAggregateOperatorInfo(NULL, pAggNode, pTaskInfo, ppOperaotr); + } + break; + } + case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: + qptCtx.result.code = createExchangeOperatorInfo(NULL, (SExchangePhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE: + qptCtx.result.code = createMultiwayMergeOperatorInfo(NULL, 0, (SMergePhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_SORT: + qptCtx.result.code = createSortOperatorInfo(NULL, (SSortPhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT: + qptCtx.result.code = createGroupSortOperatorInfo(NULL, (SGroupSortPhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL: + qptCtx.result.code = createIntervalOperatorInfo(NULL, (SIntervalPhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: + //qptCtx.result.code = createMergeIntervalOperatorInfo(NULL, (SMergeIntervalPhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL: + qptCtx.result.code = createMergeAlignedIntervalOperatorInfo(NULL, (SMergeAlignedIntervalPhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL: + qptCtx.result.code = createStreamIntervalOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, pReadHandle, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: + qptCtx.result.code = createStreamFinalIntervalOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, 0, pReadHandle, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_FILL: + qptCtx.result.code = createFillOperatorInfo(NULL, (SFillPhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL: + qptCtx.result.code = createStreamFillOperatorInfo(NULL, (SStreamFillPhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: + qptCtx.result.code = createSessionAggOperatorInfo(NULL, (SSessionWinodwPhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: + qptCtx.result.code = createStreamSessionAggOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, pReadHandle, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION: + qptCtx.result.code = createStreamFinalSessionAggOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, 0, pReadHandle, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION: + qptCtx.result.code = createStreamFinalSessionAggOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, 0, pReadHandle, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: + qptCtx.result.code = createStatewindowOperatorInfo(NULL, (SStateWinodwPhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: + qptCtx.result.code = createStreamStateAggOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, pReadHandle, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_PARTITION: + qptCtx.result.code = createPartitionOperatorInfo(NULL, (SPartitionPhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION: + qptCtx.result.code = createStreamPartitionOperatorInfo(NULL, (SStreamPartitionPhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: + qptCtx.result.code = createIndefinitOutputOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: + qptCtx.result.code = createTimeSliceOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_INSERT: + break; + case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: + case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: + case QUERY_NODE_PHYSICAL_PLAN_DELETE: { + DataSinkHandle handle = NULL; + qptCtx.result.code = dsCreateDataSinker(NULL, (SDataSinkNode*)pNode, &handle, NULL, NULL); + dsDestroyDataSinker(handle); + break; + } + case QUERY_NODE_PHYSICAL_SUBPLAN: + break; + case QUERY_NODE_PHYSICAL_PLAN: { + SSchJob job = {0}; + qptCtx.result.code = schValidateAndBuildJob((SQueryPlan*)pNode, &job); + schFreeJobImpl(&job); + break; + } + case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN: + qptCtx.result.code = createTableCountScanOperatorInfo(pReadHandle, (STableCountScanPhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT: + qptCtx.result.code = createEventwindowOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT: + qptCtx.result.code = createStreamEventAggOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, pReadHandle, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN: + qptCtx.result.code = createHashJoinOperatorInfo(NULL, 0, (SHashJoinPhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE: + qptCtx.result.code = createGroupCacheOperatorInfo(NULL, 0, (SGroupCachePhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL: + qptCtx.result.code = createDynQueryCtrlOperatorInfo(NULL, 0, (SDynQueryCtrlPhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT: + qptCtx.result.code = createCountwindowOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, ppOperaotr); + break; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT: + qptCtx.result.code = createStreamCountAggOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, pReadHandle, ppOperaotr); + break; + default: + assert(0); + } +} + void qptRunSingleOpTest() { SNode* pNode = NULL; SReadHandle readHandle = {0}; @@ -2992,8 +3145,8 @@ void qptRunSingleOpTest() { qptPrintBeginInfo(); qptCtx.startTsUs = taosGetTimestampUs(); - //qptCtx.result.code = createTagScanOperatorInfo(&readHandle, (STagScanPhysiNode*)pNode, NULL, NULL, NULL, pTaskInfo, &pOperator); - //qptCtx.result.code = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pNode, pTaskInfo, &pOperator); + + qptExecPlan(&readHandle, pNode, pTaskInfo, &pOperator); doDestroyTask(pTaskInfo); destroyOperator(pOperator); @@ -3173,27 +3326,6 @@ TEST(singleNodeTest, randPlan) { } #endif - - - - - -#if 0 -TEST(correctSingleNodeTest, tagScan) { - char* caseName = "correctSingleNodeTest:tagScan"; - - qptInitTestCtx(true, true, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, 0, NULL); - - for (qptCtx.loopIdx = 0; qptCtx.loopIdx < QPT_MAX_LOOP; ++qptCtx.loopIdx) { - qptRunPlanTest(caseName); - } - - qptPrintStatInfo(caseName); -} -#endif - - - #endif diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 8a156e8a06..52c7c64887 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -330,8 +330,8 @@ extern SSchedulerMgmt schMgmt; #define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1) #define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) -#define SCH_IS_DATA_BIND_TASK(task) \ - (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)) +#define SCH_IS_DATA_BIND_PLAN(_plan) (((_plan)->subplanType == SUBPLAN_TYPE_SCAN) || ((_plan)->subplanType == SUBPLAN_TYPE_MODIFY)) +#define SCH_IS_DATA_BIND_TASK(task) SCH_IS_DATA_BIND_PLAN((task)->plan) #define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum) #define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task)) #define SCH_IS_LOCAL_EXEC_TASK(_job, _task) \ @@ -641,6 +641,7 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list); int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pTask); int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level); void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask); +int32_t schValidateSubplan(SSchJob *pJob, SSubplan* pSubplan, int32_t level, int32_t idx, int32_t taskNum); int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel); int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask); void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode); diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index b888b826bb..1b6e086472 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -200,7 +200,12 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { SSubplan *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n); if (NULL == child) { SCH_JOB_ELOG("fail to get the %dth child subplan, childNum: %d", n, childNum); - SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(child)) { + SCH_JOB_ELOG("invalid subplan type for the %dth child, level:%d, subplanNodeType:%d", n, i, nodeType(child)); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES); @@ -242,6 +247,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } + if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(parent)) { + SCH_JOB_ELOG("invalid subplan type for the %dth parent, level:%d, subplanNodeType:%d", n, i, nodeType(parent)); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES); if (NULL == parentTask || NULL == *parentTask) { SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); @@ -384,20 +394,8 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { for (int32_t n = 0; n < taskNum; ++n) { SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n); - if (NULL == plan) { - SCH_JOB_ELOG("fail to get the %dth subplan, taskNum: %d, level: %d", n, taskNum, i); - SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); - } - if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(plan)) { - SCH_JOB_ELOG("invalid subplan type, level:%d, subplanNodeType:%d", i, nodeType(plan)); - SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); - } - - if (plan->subplanType < SUBPLAN_TYPE_MERGE || plan->subplanType > SUBPLAN_TYPE_COMPUTE) { - SCH_JOB_ELOG("invalid subplanType %d, level:%d, subplan idx:%d", plan->subplanType, i, n); - SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); - } + SCH_ERR_RET(schValidateSubplan(pJob, plan, pLevel->level, n, taskNum)); SCH_SET_JOB_TYPE(pJob, plan->subplanType); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index e6b68051f9..fe24633c12 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -831,7 +831,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { if (SCH_IS_DATA_BIND_TASK(pTask)) { SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps); - SCH_ERR_RET(TSDB_CODE_MND_INVALID_SCHEMA_VER); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask)); diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index b68f665200..4840108638 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -360,3 +360,50 @@ void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) *pTask = *task; } + +int32_t schValidateSubplan(SSchJob *pJob, SSubplan* pSubplan, int32_t level, int32_t idx, int32_t taskNum) { + if (NULL == pSubplan) { + SCH_JOB_ELOG("fail to get the %dth subplan, taskNum: %d, level: %d", idx, taskNum, level); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(pSubplan)) { + SCH_JOB_ELOG("invalid subplan type, level:%d, subplanNodeType:%d", level, nodeType(pSubplan)); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + if (pSubplan->subplanType < SUBPLAN_TYPE_MERGE || pSubplan->subplanType > SUBPLAN_TYPE_COMPUTE) { + SCH_JOB_ELOG("invalid subplanType %d, level:%d, subplan idx:%d", pSubplan->subplanType, level, idx); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + if (pSubplan->level != level) { + SCH_JOB_ELOG("plan level %d mis-match with current level %d", pSubplan->level, level); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + if (SCH_IS_DATA_BIND_PLAN(pSubplan)) { + if (pSubplan->execNode.epSet.numOfEps <= 0) { + SCH_JOB_ELOG("no execNode specifed for data src plan %d, numOfEps:%d", pSubplan->subplanType, pSubplan->execNode.epSet.numOfEps); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + if (pSubplan->execNode.epSet.inUse >= pSubplan->execNode.epSet.numOfEps) { + SCH_JOB_ELOG("invalid epset inUse %d for data src plan %d, numOfEps:%d", pSubplan->execNode.epSet.inUse, pSubplan->subplanType, pSubplan->execNode.epSet.numOfEps); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + } + + if (NULL == pSubplan->pNode) { + SCH_JOB_ELOG("empty plan root node, level:%d, subplan idx:%d", level, idx); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + if (NULL == pSubplan->pDataSink) { + SCH_JOB_ELOG("empty plan dataSink, level:%d, subplan idx:%d", level, idx); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + return TSDB_CODE_SUCCESS; +} + +