enh: add scheduler return code processing

This commit is contained in:
dapan1121 2024-07-15 19:28:04 +08:00
parent abe49ff61a
commit 82b517dd2a
3 changed files with 129 additions and 26 deletions

View File

@ -606,7 +606,7 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask);
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list);
int32_t schNotifyTaskInHashList(SSchJob *pJob, SHashObj *list, ETaskNotifyType type, SSchTask *pTask);
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel);
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask);
void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode);

View File

@ -167,9 +167,18 @@ _return:
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
for (int32_t i = 0; i < pJob->levelNum; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
if (NULL == pLevel) {
SCH_JOB_ELOG("fail to get the %dth level, levelNum: %d", i, pJob->levelNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
for (int32_t m = 0; m < pLevel->taskNum; ++m) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
if (NULL == pTask) {
SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum: %d", m, pLevel->level, pLevel->taskNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SSubplan *pPlan = pTask->plan;
int32_t childNum = pPlan->pChildren ? (int32_t)LIST_LENGTH(pPlan->pChildren) : 0;
int32_t parentNum = pPlan->pParents ? (int32_t)LIST_LENGTH(pPlan->pParents) : 0;
@ -189,6 +198,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
for (int32_t n = 0; n < childNum; ++n) {
SSubplan *child = (SSubplan *)nodesListGetNode(pPlan->pChildren, n);
if (NULL == child) {
SCH_JOB_ELOG("fail to get the %dth child subplan, childNum: %d", n, childNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SSchTask **childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
if (NULL == childTask || NULL == *childTask) {
SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
@ -223,6 +237,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
for (int32_t n = 0; n < parentNum; ++n) {
SSubplan *parent = (SSubplan *)nodesListGetNode(pPlan->pParents, n);
if (NULL == parent) {
SCH_JOB_ELOG("fail to get the %dth parent subplan, parentNum: %d", n, parentNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SSchTask **parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
if (NULL == parentTask || NULL == *parentTask) {
SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
@ -242,6 +261,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
}
SSchLevel *pLevel = taosArrayGet(pJob->levels, 0);
if (NULL == pLevel) {
SCH_JOB_ELOG("fail to get level 0 level, levelNum:%d", taosArrayGetSize(pJob->levels));
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
if (SCH_IS_QUERY_JOB(pJob)) {
if (pLevel->taskNum > 1) {
SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum);
@ -249,6 +273,11 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
}
SSchTask *pTask = taosArrayGet(pLevel->subTasks, 0);
if (NULL == pLevel) {
SCH_JOB_ELOG("fail to get the first task in level 0, taskNum:%d", taosArrayGetSize(pLevel->subTasks));
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
if (SUBPLAN_TYPE_MODIFY != pTask->plan->subplanType || EXPLAIN_MODE_DISABLE != pJob->attr.explainMode) {
pJob->attr.needFetch = true;
}
@ -262,7 +291,9 @@ int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}
taosArrayPush(pJob->dataSrcTasks, &pTask);
if (NULL == taosArrayPush(pJob->dataSrcTasks, &pTask)) {
SCH_ERR_RET(terrno);
}
return TSDB_CODE_SUCCESS;
}
@ -319,6 +350,11 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
}
pLevel = taosArrayGet(pJob->levels, i);
if (NULL == pLevel) {
SCH_JOB_ELOG("fail to get the %dth level, levelNum: %d", i, levelNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
pLevel->level = i;
plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
@ -343,6 +379,10 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
for (int32_t n = 0; n < taskNum; ++n) {
SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
if (NULL == plan) {
SCH_JOB_ELOG("fail to get the %dth subplan, taskNum: %d", n, taskNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_SET_JOB_TYPE(pJob, plan->subplanType);
@ -422,10 +462,13 @@ int32_t schDumpJobFetchRes(SSchJob *pJob, void **pData) {
if (NULL == *pData) {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp));
if (rsp) {
rsp->completed = 1;
if (NULL == rsp) {
SCH_JOB_ELOG("malloc SRetrieveTableRsp %d failed, code:%x", sizeof(SRetrieveTableRsp), terrno);
SCH_ERR_JRET(terrno);
}
rsp->completed = 1;
*pData = rsp;
SCH_JOB_DLOG("empty res and set query complete, code:%x", code);
}
@ -441,8 +484,14 @@ _return:
int32_t schNotifyUserExecRes(SSchJob *pJob) {
SExecResult *pRes = taosMemoryCalloc(1, sizeof(SExecResult));
if (pRes) {
schDumpJobExecRes(pJob, pRes);
if (NULL == pRes) {
qError("malloc execResult %d failed, error: %x", sizeof(SExecResult), terrno);
SCH_RET(terrno);
}
int32_t code = schDumpJobExecRes(pJob, pRes);
if (TSDB_CODE_SUCCESS != code && TSDB_CODE_SUCCESS == atomic_load_32(pJob->errCode)) {
atomic_store_32(&pJob->errCode, code);
}
SCH_JOB_DLOG("sch start to invoke exec cb, code: %s", tstrerror(pJob->errCode));
@ -455,7 +504,10 @@ int32_t schNotifyUserExecRes(SSchJob *pJob) {
int32_t schNotifyUserFetchRes(SSchJob *pJob) {
void *pRes = NULL;
schDumpJobFetchRes(pJob, &pRes);
int32_t code = schDumpJobFetchRes(pJob, &pRes);
if (TSDB_CODE_SUCCESS != code && TSDB_CODE_SUCCESS == atomic_load_32(pJob->errCode)) {
atomic_store_32(&pJob->errCode, code);
}
SCH_JOB_DLOG("sch start to invoke fetch cb, code: %s", tstrerror(pJob->errCode));
(*pJob->userRes.fetchFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
@ -479,13 +531,13 @@ void schPostJobRes(SSchJob *pJob, SCH_OP_TYPE op) {
if (SCH_JOB_IN_SYNC_OP(pJob)) {
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
tsem_post(&pJob->rspSem);
(void)tsem_post(&pJob->rspSem); // ignore error
} else if (SCH_JOB_IN_ASYNC_EXEC_OP(pJob)) {
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
schNotifyUserExecRes(pJob);
(void)schNotifyUserExecRes(pJob); // ignore error
} else if (SCH_JOB_IN_ASYNC_FETCH_OP(pJob)) {
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
schNotifyUserFetchRes(pJob);
(void)schNotifyUserFetchRes(pJob); // ignore error
} else {
SCH_UNLOCK(SCH_WRITE, &pJob->opStatus.lock);
SCH_JOB_ELOG("job not in any operation, status:%s", jobTaskStatusStr(pJob->status));
@ -520,7 +572,8 @@ int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode) {
return TSDB_CODE_SCH_IGNORE_ERROR;
}
schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, &errCode);
(void)schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, &errCode); // ignore error
return TSDB_CODE_SCH_IGNORE_ERROR;
}
@ -531,7 +584,8 @@ int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) {
return TSDB_CODE_SCH_IGNORE_ERROR;
}
schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, &errCode);
(void)schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, &errCode); // ignore error
return TSDB_CODE_SCH_IGNORE_ERROR;
}
@ -573,9 +627,18 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
atomic_sub_fetch_32(&pJob->levelIdx, 1);
pLevel = taosArrayGet(pJob->levels, pJob->levelIdx);
if (NULL == pLevel) {
SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", pJob->levelIdx, taosArrayGetSize(pJob->levels));
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
for (int32_t i = 0; i < pLevel->taskNum; ++i) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, i);
if (NULL == pTask) {
SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum:%d", i, pLevel->level, pLevel->taskNum);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
if (pTask->children && taosArrayGetSize(pTask->children) > 0) {
continue;
}
@ -603,7 +666,11 @@ int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp) {
}
}
taosArrayAddBatch((SArray *)pJob->execRes.res, taosArrayGet(rsp->tbVerInfo, 0), taosArrayGetSize(rsp->tbVerInfo));
if (NULL == taosArrayAddBatch((SArray *)pJob->execRes.res, taosArrayGet(rsp->tbVerInfo, 0), taosArrayGetSize(rsp->tbVerInfo))) {
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
SCH_ERR_RET(terrno);
}
taosArrayDestroy(rsp->tbVerInfo);
pJob->execRes.msgType = TDMT_SCH_QUERY;
@ -630,6 +697,11 @@ int32_t schLaunchJob(SSchJob *pJob) {
SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
} else {
SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx);
if (NULL == level) {
SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", pJob->levelIdx, taosArrayGetSize(pJob->levels));
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_ERR_RET(schLaunchLevelTasks(pJob, level));
}
@ -662,10 +734,19 @@ void schFreeJobImpl(void *job) {
int32_t numOfLevels = taosArrayGetSize(pJob->levels);
for (int32_t i = 0; i < numOfLevels; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
if (NULL == pLevel) {
SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", i, numOfLevels);
continue;
}
int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
for (int32_t j = 0; j < numOfTasks; ++j) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
if (NULL == pLevel) {
SCH_JOB_ELOG("fail to get the %dth task, taskNum:%d", j, numOfTasks);
continue;
}
schFreeTask(pJob, pTask);
}
@ -688,12 +769,12 @@ void schFreeJobImpl(void *job) {
destroyQueryExecRes(&pJob->execRes);
qDestroyQueryPlan(pJob->pDag);
nodesReleaseAllocatorWeakRef(pJob->allocatorRefId);
(void)nodesReleaseAllocatorWeakRef(pJob->allocatorRefId); // ignore error
taosMemoryFreeClear(pJob->userRes.execRes);
taosMemoryFreeClear(pJob->fetchRes);
taosMemoryFreeClear(pJob->sql);
tsem_destroy(&pJob->rspSem);
(void)tsem_destroy(&pJob->rspSem); // ignore error
taosMemoryFree(pJob);
int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
@ -712,7 +793,7 @@ int32_t schJobFetchRows(SSchJob *pJob) {
if (schChkCurrentOp(pJob, SCH_OP_FETCH, true)) {
SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
tsem_wait(&pJob->rspSem);
(void)tsem_wait(&pJob->rspSem); // ignore error
SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
}
} else {
@ -740,9 +821,18 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
pJob->conn = *pReq->pConn;
if (pReq->sql) {
pJob->sql = taosStrdup(pReq->sql);
if (NULL == pJob->sql) {
qError("QID:0x%" PRIx64 " strdup sql %s failed", pReq->pDag->queryId, pReq->sql);
SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
}
pJob->pDag = pReq->pDag;
pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId);
if (pJob->allocatorRefId <= 0) {
qError("QID:0x%" PRIx64 " nodesMakeAllocatorWeakRef failed", pReq->pDag->queryId);
SCH_ERR_JRET(terrno);
}
pJob->chkKillFp = pReq->chkKillFp;
pJob->chkKillParam = pReq->chkKillParam;
pJob->userRes.execFp = pReq->execFp;
@ -753,6 +843,10 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
} else {
pJob->nodeList = taosArrayDup(pReq->pNodeList, NULL);
if (NULL == pJob->nodeList) {
qError("QID:0x%" PRIx64 " taosArrayDup failed, origNum:%d", pReq->pDag->queryId, taosArrayGetSize(pReq->pNodeList));
SCH_ERR_JRET(terrno);
}
}
pJob->taskList = taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false,
@ -801,7 +895,7 @@ _return:
} else if (pJob->refId < 0) {
schFreeJobImpl(pJob);
} else {
taosRemoveRef(schMgmt.jobRef, pJob->refId);
(void)taosRemoveRef(schMgmt.jobRef, pJob->refId); // ignore error
}
SCH_RET(code);
@ -815,7 +909,7 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
if (pReq->syncReq) {
SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
tsem_wait(&pJob->rspSem);
(void)tsem_wait(&pJob->rspSem); // ignore error
}
SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%" PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
@ -846,7 +940,7 @@ int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) {
}
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC);
SCH_ERR_RET(schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC));
}
return TSDB_CODE_SUCCESS;
@ -867,6 +961,10 @@ int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) {
int32_t numOfLevels = taosArrayGetSize(pJob->levels);
for (int32_t i = 0; i < numOfLevels; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
if (NULL == pLevel) {
SCH_JOB_ELOG("fail to get the %dth level, levelNum:%d", i, numOfLevels);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
pLevel->taskExecDoneNum = 0;
pLevel->taskLaunchedNum = 0;
@ -874,6 +972,11 @@ int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) {
int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
for (int32_t j = 0; j < numOfTasks; ++j) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
if (NULL == pTask) {
SCH_JOB_ELOG("fail to get the %dth task in level %d, taskNum:%d", j, i, numOfTasks);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SCH_LOCK_TASK(pTask);
code = schChkUpdateRedirectCtx(pJob, pTask, NULL, rspCode);
if (TSDB_CODE_SUCCESS != code) {

View File

@ -316,18 +316,18 @@ void schFreeRpcCtx(SRpcCtx *pCtx) {
}
}
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
void schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) {
*pTask = NULL;
int32_t s = taosHashGetSize(pTaskList);
if (s <= 0) {
return TSDB_CODE_SUCCESS;
return;
}
SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId));
if (NULL == task || NULL == (*task)) {
return TSDB_CODE_SUCCESS;
return;
}
*pTask = *task;
return TSDB_CODE_SUCCESS;
}