From 82b517dd2ae3a5c550a4b1cdd6ad29b3352d6873 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 15 Jul 2024 19:28:04 +0800 Subject: [PATCH] enh: add scheduler return code processing --- source/libs/scheduler/inc/schInt.h | 2 +- source/libs/scheduler/src/schJob.c | 143 ++++++++++++++++++++++++---- source/libs/scheduler/src/schUtil.c | 10 +- 3 files changed, 129 insertions(+), 26 deletions(-) diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index c7f6c20b7d..92459e06f5 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -606,7 +606,7 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask); void schDropTaskInHashList(SSchJob *pJob, SHashObj *list); int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pTask); int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level); -int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask); +void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask); int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel); int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask); void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode); diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 9ad1c1ff30..8601c943d9 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -167,9 +167,18 @@ _return: int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { for (int32_t i = 0; i < pJob->levelNum; ++i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); - + if (NULL == pLevel) { + SCH_JOB_ELOG("fail to get the %dth level, levelNum: %d", i, pJob->levelNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + for (int32_t m = 0; m < pLevel->taskNum; ++m) { SSchTask *pTask = taosArrayGet(pLevel->subTasks, m); + if (NULL == pTask) { + SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum: %d", m, pLevel->level, pLevel->taskNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + SSubplan *pPlan = pTask->plan; int32_t childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0; int32_t parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0; @@ -189,6 +198,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { for (int32_t n = 0; n < childNum; ++n) { SSubplan *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n); + if (NULL == child) { + SCH_JOB_ELOG("fail to get the %dth child subplan, childNum: %d", n, childNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES); if (NULL == childTask || NULL == *childTask) { SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); @@ -223,6 +237,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { for (int32_t n = 0; n < parentNum; ++n) { SSubplan *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n); + if (NULL == parent) { + SCH_JOB_ELOG("fail to get the %dth parent subplan, parentNum: %d", n, parentNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES); if (NULL == parentTask || NULL == *parentTask) { SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); @@ -242,6 +261,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { } SSchLevel *pLevel = taosArrayGet(pJob->levels, 0); + if (NULL == pLevel) { + SCH_JOB_ELOG("fail to get level 0 level, levelNum:%d", taosArrayGetSize(pJob->levels)); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + if (SCH_IS_QUERY_JOB(pJob)) { if (pLevel->taskNum > 1) { SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum); @@ -249,6 +273,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { } SSchTask *pTask = taosArrayGet(pLevel->subTasks, 0); + if (NULL == pLevel) { + SCH_JOB_ELOG("fail to get the first task in level 0, taskNum:%d", taosArrayGetSize(pLevel->subTasks)); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + if (SUBPLAN_TYPE_MODIFY != pTask->plan->subplanType || EXPLAIN_MODE_DISABLE != pJob->attr.explainMode) { pJob->attr.needFetch = true; } @@ -262,7 +291,9 @@ int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } - taosArrayPush(pJob->dataSrcTasks, &pTask); + if (NULL == taosArrayPush(pJob->dataSrcTasks, &pTask)) { + SCH_ERR_RET(terrno); + } return TSDB_CODE_SUCCESS; } @@ -319,6 +350,11 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { } pLevel = taosArrayGet(pJob->levels, i); + if (NULL == pLevel) { + SCH_JOB_ELOG("fail to get the %dth level, levelNum: %d", i, levelNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + pLevel->level = i; plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i); @@ -343,6 +379,10 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { for (int32_t n = 0; n < taskNum; ++n) { SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n); + if (NULL == plan) { + SCH_JOB_ELOG("fail to get the %dth subplan, taskNum: %d", n, taskNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } SCH_SET_JOB_TYPE(pJob, plan->subplanType); @@ -422,10 +462,13 @@ int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) { if (NULL == *pData) { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp)); - if (rsp) { - rsp->completed = 1; + if (NULL == rsp) { + SCH_JOB_ELOG("malloc SRetrieveTableRsp %d failed, code:%x", sizeof(SRetrieveTableRsp), terrno); + SCH_ERR_JRET(terrno); } + rsp->completed = 1; + *pData = rsp; SCH_JOB_DLOG("empty res and set query complete, code:%x", code); } @@ -441,8 +484,14 @@ _return: int32_t schNotifyUserExecRes(SSchJob *pJob) { SExecResult *pRes = taosMemoryCalloc(1, sizeof(SExecResult)); - if (pRes) { - schDumpJobExecRes(pJob, pRes); + if (NULL == pRes) { + qError("malloc execResult %d failed, error: %x", sizeof(SExecResult), terrno); + SCH_RET(terrno); + } + + int32_t code = schDumpJobExecRes(pJob, pRes); + if (TSDB_CODE_SUCCESS != code && TSDB_CODE_SUCCESS == atomic_load_32(pJob->errCode)) { + atomic_store_32(&pJob->errCode, code); } SCH_JOB_DLOG("sch start to invoke exec cb, code: %s", tstrerror(pJob->errCode)); @@ -455,7 +504,10 @@ int32_t schNotifyUserExecRes(SSchJob *pJob) { int32_t schNotifyUserFetchRes(SSchJob *pJob) { void *pRes = NULL; - schDumpJobFetchRes(pJob, &pRes); + int32_t code = schDumpJobFetchRes(pJob, &pRes); + if (TSDB_CODE_SUCCESS != code && TSDB_CODE_SUCCESS == atomic_load_32(pJob->errCode)) { + atomic_store_32(&pJob->errCode, code); + } SCH_JOB_DLOG("sch start to invoke fetch cb, code: %s", tstrerror(pJob->errCode)); (*pJob->userRes.fetchFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode)); @@ -479,13 +531,13 @@ void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) { if (SCH_JOB_IN_SYNC_OP(pJob)) { SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); - tsem_post(&pJob->rspSem); + (void)tsem_post(&pJob->rspSem); // ignore error } else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) { SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); - schNotifyUserExecRes(pJob); + (void)schNotifyUserExecRes(pJob); // ignore error } else if (SCH_JOB_IN_ASYNC_FETCH_OP(pJob)) { SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); - schNotifyUserFetchRes(pJob); + (void)schNotifyUserFetchRes(pJob); // ignore error } else { SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock); SCH_JOB_ELOG("job not in any operation, status:%s", jobTaskStatusStr(pJob->status)); @@ -520,7 +572,8 @@ int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode) { return TSDB_CODE_SCH_IGNORE_ERROR; } - schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, &errCode); + (void)schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, &errCode); // ignore error + return TSDB_CODE_SCH_IGNORE_ERROR; } @@ -531,7 +584,8 @@ int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) { return TSDB_CODE_SCH_IGNORE_ERROR; } - schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, &errCode); + (void)schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, &errCode); // ignore error + return TSDB_CODE_SCH_IGNORE_ERROR; } @@ -573,9 +627,18 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) { atomic_sub_fetch_32(&pJob->levelIdx, 1); pLevel = taosArrayGet(pJob->levels, pJob->levelIdx); + if (NULL == pLevel) { + SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", pJob->levelIdx, taosArrayGetSize(pJob->levels)); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + for (int32_t i = 0; i < pLevel->taskNum; ++i) { SSchTask *pTask = taosArrayGet(pLevel->subTasks, i); - + if (NULL == pTask) { + SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum:%d", i, pLevel->level, pLevel->taskNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + if (pTask->children && taosArrayGetSize(pTask->children) > 0) { continue; } @@ -603,7 +666,11 @@ int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp) { } } - taosArrayAddBatch((SArray *)pJob->execRes.res, taosArrayGet(rsp->tbVerInfo, 0), taosArrayGetSize(rsp->tbVerInfo)); + if (NULL == taosArrayAddBatch((SArray *)pJob->execRes.res, taosArrayGet(rsp->tbVerInfo, 0), taosArrayGetSize(rsp->tbVerInfo))) { + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); + SCH_ERR_RET(terrno); + } + taosArrayDestroy(rsp->tbVerInfo); pJob->execRes.msgType = TDMT_SCH_QUERY; @@ -630,6 +697,11 @@ int32_t schLaunchJob(SSchJob *pJob) { SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL)); } else { SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx); + if (NULL == level) { + SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", pJob->levelIdx, taosArrayGetSize(pJob->levels)); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + SCH_ERR_RET(schLaunchLevelTasks(pJob, level)); } @@ -662,10 +734,19 @@ void schFreeJobImpl(void *job) { int32_t numOfLevels = taosArrayGetSize(pJob->levels); for (int32_t i = 0; i < numOfLevels; ++i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); + if (NULL == pLevel) { + SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", i, numOfLevels); + continue; + } int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); for (int32_t j = 0; j < numOfTasks; ++j) { SSchTask *pTask = taosArrayGet(pLevel->subTasks, j); + if (NULL == pLevel) { + SCH_JOB_ELOG("fail to get the %dth task, taskNum:%d", j, numOfTasks); + continue; + } + schFreeTask(pJob, pTask); } @@ -688,12 +769,12 @@ void schFreeJobImpl(void *job) { destroyQueryExecRes(&pJob->execRes); qDestroyQueryPlan(pJob->pDag); - nodesReleaseAllocatorWeakRef(pJob->allocatorRefId); + (void)nodesReleaseAllocatorWeakRef(pJob->allocatorRefId); // ignore error taosMemoryFreeClear(pJob->userRes.execRes); taosMemoryFreeClear(pJob->fetchRes); taosMemoryFreeClear(pJob->sql); - tsem_destroy(&pJob->rspSem); + (void)tsem_destroy(&pJob->rspSem); // ignore error taosMemoryFree(pJob); int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1); @@ -712,7 +793,7 @@ int32_t schJobFetchRows(SSchJob *pJob) { if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) { SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); - tsem_wait(&pJob->rspSem); + (void)tsem_wait(&pJob->rspSem); // ignore error SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes)); } } else { @@ -740,9 +821,18 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { pJob->conn = *pReq->pConn; if (pReq->sql) { pJob->sql = taosStrdup(pReq->sql); + if (NULL == pJob->sql) { + qError("QID:0x%" PRIx64 " strdup sql %s failed", pReq->pDag->queryId, pReq->sql); + SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } } pJob->pDag = pReq->pDag; pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId); + if (pJob->allocatorRefId <= 0) { + qError("QID:0x%" PRIx64 " nodesMakeAllocatorWeakRef failed", pReq->pDag->queryId); + SCH_ERR_JRET(terrno); + } + pJob->chkKillFp = pReq->chkKillFp; pJob->chkKillParam = pReq->chkKillParam; pJob->userRes.execFp = pReq->execFp; @@ -753,6 +843,10 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId); } else { pJob->nodeList = taosArrayDup(pReq->pNodeList, NULL); + if (NULL == pJob->nodeList) { + qError("QID:0x%" PRIx64 " taosArrayDup failed, origNum:%d", pReq->pDag->queryId, taosArrayGetSize(pReq->pNodeList)); + SCH_ERR_JRET(terrno); + } } pJob->taskList = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, @@ -801,7 +895,7 @@ _return: } else if (pJob->refId < 0) { schFreeJobImpl(pJob); } else { - taosRemoveRef(schMgmt.jobRef, pJob->refId); + (void)taosRemoveRef(schMgmt.jobRef, pJob->refId); // ignore error } SCH_RET(code); @@ -815,7 +909,7 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) { if (pReq->syncReq) { SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); - tsem_wait(&pJob->rspSem); + (void)tsem_wait(&pJob->rspSem); // ignore error } SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId); @@ -846,7 +940,7 @@ int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) { } SCH_UNLOCK(SCH_WRITE, &pJob->resLock); - schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC); + SCH_ERR_RET(schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC)); } return TSDB_CODE_SUCCESS; @@ -867,6 +961,10 @@ int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) { int32_t numOfLevels = taosArrayGetSize(pJob->levels); for (int32_t i = 0; i < numOfLevels; ++i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); + if (NULL == pLevel) { + SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", i, numOfLevels); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } pLevel->taskExecDoneNum = 0; pLevel->taskLaunchedNum = 0; @@ -874,6 +972,11 @@ int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) { int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); for (int32_t j = 0; j < numOfTasks; ++j) { SSchTask *pTask = taosArrayGet(pLevel->subTasks, j); + if (NULL == pTask) { + SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum:%d", j, i, numOfTasks); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + SCH_LOCK_TASK(pTask); code = schChkUpdateRedirectCtx(pJob, pTask, NULL, rspCode); if (TSDB_CODE_SUCCESS != code) { diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index 82b2e021af..689c98d395 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -316,18 +316,18 @@ void schFreeRpcCtx(SRpcCtx *pCtx) { } } -int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) { +void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) { + *pTask = NULL; + int32_t s = taosHashGetSize(pTaskList); if (s <= 0) { - return TSDB_CODE_SUCCESS; + return; } SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId)); if (NULL == task || NULL == (*task)) { - return TSDB_CODE_SUCCESS; + return; } *pTask = *task; - - return TSDB_CODE_SUCCESS; }