enh: add operator

This commit is contained in:
dapan1121 2024-10-21 19:24:52 +08:00
parent cde86c2e32
commit 6a9fd0c95e
6 changed files with 222 additions and 44 deletions

View File

@ -412,7 +412,7 @@ typedef enum ENodeType {
// physical plan node
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN = 1100,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN,
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, // INACTIVE
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN,
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN,
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN,
@ -426,7 +426,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_SORT,
QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT,
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, // INACTIVE
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL,

View File

@ -46,7 +46,7 @@
namespace {
#define QPT_MAX_LOOP 1000000
#define QPT_MAX_LOOP 1000
#define QPT_MAX_LEVEL_SUBPLAN_NUM 10
#define QPT_MAX_SUBPLAN_LEVEL 5
#define QPT_MAX_SUBPLAN_GROUP 5
@ -256,7 +256,7 @@ SNode* qptCreateSubplanNode(int32_t nodeType);
SQPTPlan qptPlans[] = {
{QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, QPT_PLAN_PHYSIC, "tagScan", qptCreateTagScanPhysiNode},
{QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, QPT_PLAN_PHYSIC, "tableScan", qptCreateTableScanPhysiNode},
{QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, QPT_PLAN_PHYSIC, "tableSeqScan", qptCreateTableSeqScanPhysiNode},
{QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, QPT_PLAN_PHYSIC, "tableSeqScan", NULL /*qptCreateTableSeqScanPhysiNode*/ },
{QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, QPT_PLAN_PHYSIC, "tableMergeScan", qptCreateTableMergeScanPhysiNode},
{QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, QPT_PLAN_PHYSIC, "streamScan", qptCreateStreamScanPhysiNode},
{QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, QPT_PLAN_PHYSIC, "sysTableScan", qptCreateSysTableScanPhysiNode},
@ -2975,6 +2975,159 @@ void qptHandleTestEnd() {
}
void qptExecPlan(SReadHandle* pReadHandle, SNode* pNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** ppOperaotr) {
switch (nodeType(pNode)) {
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
qptCtx.result.code = createTagScanOperatorInfo(pReadHandle, (STagScanPhysiNode*)pNode, NULL, NULL, NULL, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
qptCtx.result.code = createTableScanOperatorInfo((STableScanPhysiNode*)pNode, pReadHandle, NULL, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
//qptCtx.result.code = createTableSeqScanOperatorInfo((STableScanPhysiNode*)pNode, pReadHandle, NULL, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN:
qptCtx.result.code = createTableMergeScanOperatorInfo((STableScanPhysiNode*)pNode, pReadHandle, NULL, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
qptCtx.result.code = createStreamScanOperatorInfo(pReadHandle, (STableScanPhysiNode*)pNode, NULL, NULL, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
qptCtx.result.code = createSysTableScanOperatorInfo(pReadHandle, (SSystemTableScanPhysiNode*)pNode, NULL, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
qptCtx.result.code = createDataBlockInfoScanOperator(pReadHandle, (SBlockDistScanPhysiNode*)pNode, NULL, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
qptCtx.result.code = createCacherowsScanOperator((SLastRowScanPhysiNode*)pNode, pReadHandle, NULL, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
qptCtx.result.code = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
qptCtx.result.code = createMergeJoinOperatorInfo(NULL, 0, (SSortMergeJoinPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {
SAggPhysiNode* pAggNode = (SAggPhysiNode*)pNode;
if (pAggNode->pGroupKeys != NULL) {
qptCtx.result.code = createGroupOperatorInfo(NULL, pAggNode, pTaskInfo, ppOperaotr);
} else {
qptCtx.result.code = createAggregateOperatorInfo(NULL, pAggNode, pTaskInfo, ppOperaotr);
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
qptCtx.result.code = createExchangeOperatorInfo(NULL, (SExchangePhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE:
qptCtx.result.code = createMultiwayMergeOperatorInfo(NULL, 0, (SMergePhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_SORT:
qptCtx.result.code = createSortOperatorInfo(NULL, (SSortPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT:
qptCtx.result.code = createGroupSortOperatorInfo(NULL, (SGroupSortPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL:
qptCtx.result.code = createIntervalOperatorInfo(NULL, (SIntervalPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL:
//qptCtx.result.code = createMergeIntervalOperatorInfo(NULL, (SMergeIntervalPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL:
qptCtx.result.code = createMergeAlignedIntervalOperatorInfo(NULL, (SMergeAlignedIntervalPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
qptCtx.result.code = createStreamIntervalOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, pReadHandle, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
qptCtx.result.code = createStreamFinalIntervalOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, 0, pReadHandle, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_FILL:
qptCtx.result.code = createFillOperatorInfo(NULL, (SFillPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
qptCtx.result.code = createStreamFillOperatorInfo(NULL, (SStreamFillPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION:
qptCtx.result.code = createSessionAggOperatorInfo(NULL, (SSessionWinodwPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
qptCtx.result.code = createStreamSessionAggOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, pReadHandle, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
qptCtx.result.code = createStreamFinalSessionAggOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, 0, pReadHandle, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
qptCtx.result.code = createStreamFinalSessionAggOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, 0, pReadHandle, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
qptCtx.result.code = createStatewindowOperatorInfo(NULL, (SStateWinodwPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
qptCtx.result.code = createStreamStateAggOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, pReadHandle, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
qptCtx.result.code = createPartitionOperatorInfo(NULL, (SPartitionPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
qptCtx.result.code = createStreamPartitionOperatorInfo(NULL, (SStreamPartitionPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC:
qptCtx.result.code = createIndefinitOutputOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
qptCtx.result.code = createTimeSliceOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
break;
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT:
case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
DataSinkHandle handle = NULL;
qptCtx.result.code = dsCreateDataSinker(NULL, (SDataSinkNode*)pNode, &handle, NULL, NULL);
dsDestroyDataSinker(handle);
break;
}
case QUERY_NODE_PHYSICAL_SUBPLAN:
break;
case QUERY_NODE_PHYSICAL_PLAN: {
SSchJob job = {0};
qptCtx.result.code = schValidateAndBuildJob((SQueryPlan*)pNode, &job);
schFreeJobImpl(&job);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
qptCtx.result.code = createTableCountScanOperatorInfo(pReadHandle, (STableCountScanPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
qptCtx.result.code = createEventwindowOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
qptCtx.result.code = createStreamEventAggOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, pReadHandle, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN:
qptCtx.result.code = createHashJoinOperatorInfo(NULL, 0, (SHashJoinPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE:
qptCtx.result.code = createGroupCacheOperatorInfo(NULL, 0, (SGroupCachePhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL:
qptCtx.result.code = createDynQueryCtrlOperatorInfo(NULL, 0, (SDynQueryCtrlPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT:
qptCtx.result.code = createCountwindowOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, ppOperaotr);
break;
case QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT:
qptCtx.result.code = createStreamCountAggOperatorInfo(NULL, (SPhysiNode*)pNode, pTaskInfo, pReadHandle, ppOperaotr);
break;
default:
assert(0);
}
}
void qptRunSingleOpTest() {
SNode* pNode = NULL;
SReadHandle readHandle = {0};
@ -2992,8 +3145,8 @@ void qptRunSingleOpTest() {
qptPrintBeginInfo();
qptCtx.startTsUs = taosGetTimestampUs();
//qptCtx.result.code = createTagScanOperatorInfo(&readHandle, (STagScanPhysiNode*)pNode, NULL, NULL, NULL, pTaskInfo, &pOperator);
//qptCtx.result.code = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pNode, pTaskInfo, &pOperator);
qptExecPlan(&readHandle, pNode, pTaskInfo, &pOperator);
doDestroyTask(pTaskInfo);
destroyOperator(pOperator);
@ -3173,27 +3326,6 @@ TEST(singleNodeTest, randPlan) {
}
#endif
#if 0
TEST(correctSingleNodeTest, tagScan) {
char* caseName = "correctSingleNodeTest:tagScan";
qptInitTestCtx(true, true, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, 0, NULL);
for (qptCtx.loopIdx = 0; qptCtx.loopIdx < QPT_MAX_LOOP; ++qptCtx.loopIdx) {
qptRunPlanTest(caseName);
}
qptPrintStatInfo(caseName);
}
#endif
#endif

View File

@ -330,8 +330,8 @@ extern SSchedulerMgmt schMgmt;
#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1)
#define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN)
#define SCH_IS_DATA_BIND_TASK(task) \
(((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY))
#define SCH_IS_DATA_BIND_PLAN(_plan) (((_plan)->subplanType == SUBPLAN_TYPE_SCAN) || ((_plan)->subplanType == SUBPLAN_TYPE_MODIFY))
#define SCH_IS_DATA_BIND_TASK(task) SCH_IS_DATA_BIND_PLAN((task)->plan)
#define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum)
#define SCH_IS_DATA_MERGE_TASK(task) (!SCH_IS_DATA_BIND_TASK(task))
#define SCH_IS_LOCAL_EXEC_TASK(_job, _task) \
@ -641,6 +641,7 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list);
int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pTask);
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
int32_t schValidateSubplan(SSchJob *pJob, SSubplan* pSubplan, int32_t level, int32_t idx, int32_t taskNum);
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel);
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode);

View File

@ -200,7 +200,12 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
SSubplan *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
if (NULL == child) {
SCH_JOB_ELOG("fail to get the %dth child subplan, childNum: %d", n, childNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(child)) {
SCH_JOB_ELOG("invalid subplan type for the %dth child, level:%d, subplanNodeType:%d", n, i, nodeType(child));
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
@ -242,6 +247,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(parent)) {
SCH_JOB_ELOG("invalid subplan type for the %dth parent, level:%d, subplanNodeType:%d", n, i, nodeType(parent));
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
if (NULL == parentTask || NULL == *parentTask) {
SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
@ -384,20 +394,8 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
for (int32_t n = 0; n < taskNum; ++n) {
SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
if (NULL == plan) {
SCH_JOB_ELOG("fail to get the %dth subplan, taskNum: %d, level: %d", n, taskNum, i);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(plan)) {
SCH_JOB_ELOG("invalid subplan type, level:%d, subplanNodeType:%d", i, nodeType(plan));
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (plan->subplanType < SUBPLAN_TYPE_MERGE || plan->subplanType > SUBPLAN_TYPE_COMPUTE) {
SCH_JOB_ELOG("invalid subplanType %d, level:%d, subplan idx:%d", plan->subplanType, i, n);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
SCH_ERR_RET(schValidateSubplan(pJob, plan, pLevel->level, n, taskNum));
SCH_SET_JOB_TYPE(pJob, plan->subplanType);

View File

@ -831,7 +831,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
if (SCH_IS_DATA_BIND_TASK(pTask)) {
SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
SCH_ERR_RET(TSDB_CODE_MND_INVALID_SCHEMA_VER);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));

View File

@ -360,3 +360,50 @@ void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask)
*pTask = *task;
}
int32_t schValidateSubplan(SSchJob *pJob, SSubplan* pSubplan, int32_t level, int32_t idx, int32_t taskNum) {
if (NULL == pSubplan) {
SCH_JOB_ELOG("fail to get the %dth subplan, taskNum: %d, level: %d", idx, taskNum, level);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (QUERY_NODE_PHYSICAL_SUBPLAN != nodeType(pSubplan)) {
SCH_JOB_ELOG("invalid subplan type, level:%d, subplanNodeType:%d", level, nodeType(pSubplan));
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (pSubplan->subplanType < SUBPLAN_TYPE_MERGE || pSubplan->subplanType > SUBPLAN_TYPE_COMPUTE) {
SCH_JOB_ELOG("invalid subplanType %d, level:%d, subplan idx:%d", pSubplan->subplanType, level, idx);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (pSubplan->level != level) {
SCH_JOB_ELOG("plan level %d mis-match with current level %d", pSubplan->level, level);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (SCH_IS_DATA_BIND_PLAN(pSubplan)) {
if (pSubplan->execNode.epSet.numOfEps <= 0) {
SCH_JOB_ELOG("no execNode specifed for data src plan %d, numOfEps:%d", pSubplan->subplanType, pSubplan->execNode.epSet.numOfEps);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (pSubplan->execNode.epSet.inUse >= pSubplan->execNode.epSet.numOfEps) {
SCH_JOB_ELOG("invalid epset inUse %d for data src plan %d, numOfEps:%d", pSubplan->execNode.epSet.inUse, pSubplan->subplanType, pSubplan->execNode.epSet.numOfEps);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
}
if (NULL == pSubplan->pNode) {
SCH_JOB_ELOG("empty plan root node, level:%d, subplan idx:%d", level, idx);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (NULL == pSubplan->pDataSink) {
SCH_JOB_ELOG("empty plan dataSink, level:%d, subplan idx:%d", level, idx);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
return TSDB_CODE_SUCCESS;
}