From e52628ab57371c62db0d003ff526a3cac2d7adef Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 23 Oct 2024 19:13:15 +0800 Subject: [PATCH] fix: scheduler validate plan issues --- source/client/inc/clientInt.h | 2 + source/client/src/clientImpl.c | 13 ++- source/libs/executor/test/queryPlanTests.cpp | 87 +++++++++++++++++--- source/libs/scheduler/inc/schInt.h | 3 + source/libs/scheduler/src/schJob.c | 36 +++++--- source/libs/scheduler/src/scheduler.c | 7 ++ 6 files changed, 125 insertions(+), 23 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 8d45e8b4a8..114bc00125 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -58,6 +58,8 @@ enum { #define TD_RES_TMQ_METADATA(res) (*(int8_t*)(res) == RES_TYPE__TMQ_METADATA) #define TD_RES_TMQ_BATCH_META(res) (*(int8_t*)(res) == RES_TYPE__TMQ_BATCH_META) +#define TSC_MAX_SUBPLAN_CAPACITY_NUM 1000 + typedef struct SAppInstInfo SAppInstInfo; typedef struct { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 2c67cafdf5..c55764aaf4 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1251,6 +1251,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) { int32_t code = 0; + int32_t subplanNum = 0; if (pQuery->pRoot) { pRequest->stmtType = pQuery->pRoot->type; @@ -1292,7 +1293,7 @@ void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void SQueryPlan* pDag = NULL; code = getPlan(pRequest, pQuery, &pDag, pMnodeList); if (TSDB_CODE_SUCCESS == code) { - pRequest->body.subplanNum = pDag->numOfSubplans; + subplanNum = pDag->numOfSubplans; if (!pRequest->validateOnly) { SArray* pNodeList = NULL; code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList); @@ -1301,6 +1302,10 @@ void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void } taosArrayDestroy(pNodeList); } + + if (TSDB_CODE_SUCCESS == code) { + pRequest->body.subplanNum = subplanNum; + } } taosArrayDestroy(pMnodeList); break; @@ -1343,6 +1348,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat pRequest->type = pQuery->msgType; SArray* pMnodeList = NULL; SQueryPlan* pDag = NULL; + int32_t subplanNum = 0; int64_t st = taosGetTimestampUs(); if (!pRequest->parseOnly) { @@ -1369,7 +1375,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code), pRequest->requestId); } else { - pRequest->body.subplanNum = pDag->numOfSubplans; + subplanNum = pDag->numOfSubplans; TSWAP(pRequest->pPostPlan, pDag->pPostPlan); } } @@ -1406,6 +1412,9 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat if (TSDB_CODE_SUCCESS == code) { code = schedulerExecJob(&req, &pRequest->body.queryJob); } + if (TSDB_CODE_SUCCESS == code) { + pRequest->body.subplanNum = subplanNum; + } taosArrayDestroy(pNodeList); } else { qDestroyQueryPlan(pDag); diff --git a/source/libs/executor/test/queryPlanTests.cpp b/source/libs/executor/test/queryPlanTests.cpp index 176af60944..c367391f8e 100755 --- a/source/libs/executor/test/queryPlanTests.cpp +++ b/source/libs/executor/test/queryPlanTests.cpp @@ -47,9 +47,9 @@ namespace { -#define QPT_MAX_LOOP 1000 +#define QPT_MAX_LOOP 100000 #define QPT_MAX_LEVEL_SUBPLAN_NUM 10 -#define QPT_MAX_SUBPLAN_LEVEL 5 +#define QPT_MAX_SUBPLAN_LEVEL 2 #define QPT_MAX_SUBPLAN_GROUP 5 #define QPT_MAX_WHEN_THEN_NUM 10 #define QPT_MAX_NODE_LEVEL 5 @@ -172,7 +172,10 @@ typedef struct { } SQPTMakeNodeCtx; typedef struct { + int64_t startTsUs; int32_t code; + int64_t succeedTimes; + int64_t failedTimes; } SQPTExecResult; typedef struct { @@ -183,7 +186,6 @@ typedef struct { SQPTMakeNodeCtx makeCtx; SQPTMakeNodeCtx makeCtxBak; SQPTExecResult result; - int64_t startTsUs; } SQPTCtx; typedef struct { @@ -257,7 +259,7 @@ SNode* qptCreateSubplanNode(int32_t nodeType); SQPTPlan qptPlans[] = { {QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, QPT_PLAN_PHYSIC, "tagScan", qptCreateTagScanPhysiNode}, {QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, QPT_PLAN_PHYSIC, "tableScan", qptCreateTableScanPhysiNode}, - {QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, QPT_PLAN_PHYSIC, "tableSeqScan", NULL /*qptCreateTableSeqScanPhysiNode*/ }, + {QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, QPT_PLAN_PHYSIC, "tableSeqScan", qptCreateTableSeqScanPhysiNode}, {QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, QPT_PLAN_PHYSIC, "tableMergeScan", qptCreateTableMergeScanPhysiNode}, {QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, QPT_PLAN_PHYSIC, "streamScan", qptCreateStreamScanPhysiNode}, {QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, QPT_PLAN_PHYSIC, "sysTableScan", qptCreateSysTableScanPhysiNode}, @@ -329,6 +331,16 @@ int32_t qptSink[] = {QUERY_NODE_PHYSICAL_PLAN_DISPATCH, QUERY_NODE_PHYSICAL_PLAN SNode* qptMakeExprNode(SNode** ppNode); void qptMakeNodeList(QPT_NODE_TYPE nodeType, SNodeList** ppList); +int32_t qptGetSpecificPlanIndex(int32_t type) { + int32_t planNum = sizeof(qptPlans) / sizeof(qptPlans[0]); + for (int32_t i = 0; i < planNum; ++i) { + if (qptPlans[i].type == type) { + return i; + } + } + + return -1; +} int32_t qptGetColumnRandLen(int32_t colType) { switch (colType) { @@ -460,7 +472,8 @@ void qptPrintEndInfo() { } void qptPrintStatInfo() { - + printf("\n\tAll %d times TEST [%s] END, result - succeed:%" PRId64 " failed:%" PRId64 "\n", qptCtx.loopIdx + 1, qptCtx.caseName, + qptCtx.result.succeedTimes, qptCtx.result.failedTimes); } @@ -1661,6 +1674,19 @@ SNode* qptMakeOrderByExprNode(SNode** ppNode) { return *ppNode; } +SNode* qptMakeSubplanNode(SNode** ppNode) { + if (QPT_NCORRECT_LOW_PROB()) { + return qptMakeRandNode(ppNode); + } + + *ppNode = (SNode*)qptCreateSubplanNode(QUERY_NODE_PHYSICAL_SUBPLAN); + + return *ppNode; +} + + + + SPhysiNode* qptCreatePhysiNode(int32_t nodeType) { SPhysiNode* pPhysiNode = NULL; assert(0 == nodesMakeNode((ENodeType)nodeType, (SNode**)&pPhysiNode)); @@ -1928,6 +1954,18 @@ void qptMakeOrerByExprList(SNodeList** ppList) { } +void qptMakeSubplanList(SNodeList** ppList) { + qptSaveMakeNodeCtx(); + + int32_t planNum = taosRand() % QPT_MAX_LEVEL_SUBPLAN_NUM + (QPT_CORRECT_HIGH_PROB() ? 1 : 0); + for (int32_t i = 0; i < planNum; ++i) { + SNode* pNode = NULL; + qptRestoreMakeNodeCtx(); + qptMakeSubplanNode(&pNode); + qptNodesListMakeStrictAppend(ppList, pNode); + } +} + void qptMakeSpecTypeNodeList(QPT_NODE_TYPE nodeType, SNodeList** ppList) { switch (nodeType) { case QPT_NODE_COLUMN: @@ -1938,6 +1976,8 @@ void qptMakeSpecTypeNodeList(QPT_NODE_TYPE nodeType, SNodeList** ppList) { return qptMakeExprList(ppList); case QPT_NODE_VALUE: return qptMakeValueList(ppList); + case QPT_NODE_SUBPLAN: + return qptMakeSubplanList(ppList); default: break; } @@ -2934,6 +2974,8 @@ void qptResetForReRun() { qptCtx.buildCtx.pCurr = NULL; qptCtx.buildCtx.pCurrTask = NULL; + + qptCtx.result.code = 0; } void qptSingleTestDone(bool* contLoop) { @@ -3126,6 +3168,13 @@ void qptExecPlan(SReadHandle* pReadHandle, SNode* pNode, SExecTaskInfo* pTaskInf default: assert(0); } + + if (qptCtx.result.code) { + qptCtx.result.failedTimes++; + } else { + qptCtx.result.succeedTimes++; + } + } void qptRunSingleOpTest() { @@ -3144,7 +3193,7 @@ void qptRunSingleOpTest() { qptPrintBeginInfo(); - qptCtx.startTsUs = taosGetTimestampUs(); + qptCtx.result.startTsUs = taosGetTimestampUs(); qptExecPlan(&readHandle, pNode, pTaskInfo, &pOperator); @@ -3170,7 +3219,7 @@ void qptRunSubplanTest() { qptPrintBeginInfo(); - qptCtx.startTsUs = taosGetTimestampUs(); + qptCtx.result.startTsUs = taosGetTimestampUs(); //qptCtx.result.code = createTagScanOperatorInfo(&readHandle, (STagScanPhysiNode*)pNode, NULL, NULL, NULL, NULL, &pOperator); //qptCtx.result.code = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pNode, NULL, &pOperator); @@ -3307,9 +3356,9 @@ void qptDestroyTestCtx() { } // namespace #if 1 -#if 1 -TEST(singleNodeTest, randPlan) { - char* caseType = "singleNodeTest:randPlan"; +#if 0 +TEST(singleRandNodeTest, loopPlans) { + char* caseType = "singleRandNodeTest:loopPlans"; for (qptCtx.loopIdx = 0; qptCtx.loopIdx < QPT_MAX_LOOP; ++qptCtx.loopIdx) { for (int32_t i = 0; i < sizeof(qptPlans)/sizeof(qptPlans[0]); ++i) { @@ -3325,6 +3374,24 @@ TEST(singleNodeTest, randPlan) { qptPrintStatInfo(); } #endif +#if 1 +TEST(singleRandNodeTest, specificPlan) { + char* caseType = "singleRandNodeTest:specificPlan"; + + int32_t idx = qptGetSpecificPlanIndex(QUERY_NODE_PHYSICAL_PLAN); + for (qptCtx.loopIdx = 0; qptCtx.loopIdx < QPT_MAX_LOOP; ++qptCtx.loopIdx) { + sprintf(qptCtx.caseName, "%s:%s", caseType, qptPlans[idx].name); + qptInitTestCtx(false, true, qptPlans[idx].type, idx, 0, NULL); + + qptRunPlanTest(); + + qptDestroyTestCtx(); + } + + qptPrintStatInfo(); +} +#endif + #endif diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 52c7c64887..96b9d2da8d 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -62,6 +62,7 @@ typedef enum { #define SCH_DEFAULT_MAX_RETRY_NUM 6 #define SCH_MIN_AYSNC_EXEC_NUM 3 #define SCH_DEFAULT_RETRY_TOTAL_ROUND 3 +#define SCH_DEFAULT_TASK_CAPACITY_NUM 1000 typedef struct SSchDebug { bool lockEnable; @@ -318,6 +319,8 @@ typedef struct SSchTaskCtx { extern SSchedulerMgmt schMgmt; +#define SCH_GET_TASK_CAPACITY(_n) ((_n) > SCH_DEFAULT_TASK_CAPACITY_NUM ? SCH_DEFAULT_TASK_CAPACITY_NUM : (_n)) + #define SCH_TASK_TIMEOUT(_task) \ ((taosGetTimestampUs() - *(int64_t *)taosArrayGet((_task)->profile.execTime, (_task)->execId)) > (_task)->timeoutUsec) diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index d1c970b9fe..c52f779b75 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -317,7 +317,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - pJob->dataSrcTasks = taosArrayInit(pDag->numOfSubplans, POINTER_BYTES); + pJob->dataSrcTasks = taosArrayInit(SCH_GET_TASK_CAPACITY(pDag->numOfSubplans), POINTER_BYTES); if (NULL == pJob->dataSrcTasks) { SCH_ERR_RET(terrno); } @@ -329,7 +329,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { } SHashObj *planToTask = taosHashInit( - pDag->numOfSubplans, + SCH_GET_TASK_CAPACITY(pDag->numOfSubplans), taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); if (NULL == planToTask) { @@ -349,6 +349,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SSchLevel level = {0}; SNodeListNode *plans = NULL; int32_t taskNum = 0; + int32_t totalTaskNum = 0; SSchLevel *pLevel = NULL; level.status = JOB_TASK_STATUS_INIT; @@ -362,7 +363,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { pLevel = taosArrayGet(pJob->levels, i); if (NULL == pLevel) { SCH_JOB_ELOG("fail to get the %dth level, levelNum: %d", i, levelNum); - SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } pLevel->level = i; @@ -384,6 +385,12 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } + totalTaskNum += taskNum; + if (totalTaskNum > pDag->numOfSubplans) { + SCH_JOB_ELOG("current totalTaskNum %d is bigger than numOfSubplans %d, level:%d", totalTaskNum, pDag->numOfSubplans, i); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } + pLevel->taskNum = taskNum; pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask)); @@ -410,14 +417,16 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SCH_ERR_JRET(schAppendJobDataSrc(pJob, pTask)); - if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES)) { - SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n); - SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + code = taosHashPut(planToTask, &plan, POINTER_BYTES, &pTask, POINTER_BYTES); + if (0 != code) { + SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d, error:%s", n, tstrerror(code)); + SCH_ERR_JRET(code); } - if (0 != taosHashPut(pJob->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) { - SCH_TASK_ELOG("taosHashPut to taskList failed, taskIdx:%d", n); - SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + code = taosHashPut(pJob->taskList, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES); + if (0 != code) { + SCH_TASK_ELOG("taosHashPut to taskList failed, taskIdx:%d, error:%s", n, tstrerror(code)); + SCH_ERR_JRET(code); } ++pJob->taskNum; @@ -426,6 +435,11 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SCH_JOB_DLOG("level %d initialized, taskNum:%d", i, taskNum); } + if (totalTaskNum != pDag->numOfSubplans) { + SCH_JOB_ELOG("totalTaskNum %d mis-match with numOfSubplans %d", totalTaskNum, pDag->numOfSubplans); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } + SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask)); _return: @@ -876,10 +890,10 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { } } - pJob->taskList = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, + pJob->taskList = taosHashInit(SCH_GET_TASK_CAPACITY(pReq->pDag->numOfSubplans), taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); if (NULL == pJob->taskList) { - SCH_JOB_ELOG("taosHashInit %d taskList failed", pReq->pDag->numOfSubplans); + SCH_JOB_ELOG("taosHashInit %d taskList failed", SCH_GET_TASK_CAPACITY(pReq->pDag->numOfSubplans)); SCH_ERR_JRET(terrno); } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 01d3204dc6..db9ecd6025 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -233,6 +233,13 @@ int32_t schedulerValidatePlan(SQueryPlan* pPlan) { SCH_ERR_RET(terrno); } + pJob->taskList = taosHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, + HASH_ENTRY_LOCK); + if (NULL == pJob->taskList) { + SCH_JOB_ELOG("taosHashInit %d taskList failed", 100); + SCH_ERR_JRET(terrno); + } + SCH_ERR_JRET(schValidateAndBuildJob(pPlan, pJob)); if (SCH_IS_EXPLAIN_JOB(pJob)) {