diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 1e2ccf8705..8ab3b898ca 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -23,8 +23,6 @@ extern "C" { #include "catalog.h" #include "planner.h" -extern tsem_t schdRspSem; - typedef struct SQueryProfileSummary { int64_t startTs; // Object created and added into the message queue int64_t endTs; // the timestamp when the task is completed @@ -101,8 +99,6 @@ void schedulerFreeJob(int64_t* job, int32_t errCode); void schedulerDestroy(void); -void schdExecCallback(SExecResult* pResult, void* param, int32_t code); - #ifdef __cplusplus } #endif diff --git a/source/libs/scheduler/src/schDbg.c b/source/libs/scheduler/src/schDbg.c index d6c2b638b8..deaaaec7c1 100644 --- a/source/libs/scheduler/src/schDbg.c +++ b/source/libs/scheduler/src/schDbg.c @@ -16,17 +16,5 @@ #include "query.h" #include "schInt.h" -tsem_t schdRspSem; SSchDebug gSCHDebug = {0}; -void schdExecCallback(SExecResult* pResult, void* param, int32_t code) { - if (code) { - pResult->code = code; - } - - *(SExecResult*)param = *pResult; - - taosMemoryFree(pResult); - - tsem_post(&schdRspSem); -} diff --git a/source/libs/scheduler/src/schFlowCtrl.c b/source/libs/scheduler/src/schFlowCtrl.c index 8c2b65e125..1f6e7fbd63 100644 --- a/source/libs/scheduler/src/schFlowCtrl.c +++ b/source/libs/scheduler/src/schFlowCtrl.c @@ -50,6 +50,10 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) { int32_t taskNum = taosArrayGetSize(pJob->dataSrcTasks); for (int32_t i = 0; i < taskNum; ++i) { SSchTask *pTask = *(SSchTask **)taosArrayGet(pJob->dataSrcTasks, i); + if (NULL == pTask) { + SCH_JOB_DLOG("fail to get the %dth task", i); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } sum += pTask->plan->execNodeStat.tableNum; } @@ -214,6 +218,10 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) { for (int32_t i = 0; i < taskNum; ++i) { pTask = *(SSchTask **)taosArrayGet(ctrl->taskList, i); + if (NULL == pTask) { + SCH_JOB_ELOG("fail to get the %dth task", i); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode); if (pTask->plan->execNodeStat.tableNum > remainNum && ctrl->execTaskNum > 0) { @@ -243,6 +251,11 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) { if (i < (taskNum - 1)) { SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList); + if (NULL == pLastTask) { + SCH_JOB_ELOG("fail to get the last task, num:%d", taosArrayGetSize(ctrl->taskList)); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + if (remainNum < pLastTask->plan->execNodeStat.tableNum) { SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d, smallestInList:%d", ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum, pLastTask->plan->execNodeStat.tableNum); diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 278768981a..9ad1c1ff30 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -47,6 +47,7 @@ void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) { return; _return: + SCH_JOB_DLOG("job errCode updated to %s", tstrerror(errCode)); } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index fc92be8214..53ffc3bc6e 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -108,14 +108,26 @@ int32_t schedulerGetTasksStatus(int64_t jobId, SArray *pSub) { for (int32_t i = pJob->levelNum - 1; i >= 0; --i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); + if (NULL == pLevel) { + qError("failed to get level %d", i); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } for (int32_t m = 0; m < pLevel->taskNum; ++m) { SSchTask *pTask = taosArrayGet(pLevel->subTasks, m); + if (NULL == pTask) { + qError("failed to get task %d, total: %d", m, pLevel->taskNum); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + SQuerySubDesc subDesc = {0}; subDesc.tid = pTask->taskId; strcpy(subDesc.status, jobTaskStatusStr(pTask->status)); - taosArrayPush(pSub, &subDesc); + if (NULL == taosArrayPush(pSub, &subDesc)) { + qError("taosArrayPush task %d failed, error: %x, ", m, terrno); + SCH_ERR_JRET(terrno); + } } } @@ -141,7 +153,7 @@ int32_t schedulerUpdatePolicy(int32_t policy) { qDebug("schedule policy updated to %d", schMgmt.cfg.schPolicy); break; default: - return TSDB_CODE_TSC_INVALID_INPUT; + SCH_RET(TSDB_CODE_TSC_INVALID_INPUT); } return TSDB_CODE_SUCCESS; @@ -159,14 +171,14 @@ void schedulerFreeJob(int64_t *jobId, int32_t errCode) { SSchJob *pJob = schAcquireJob(*jobId); if (NULL == pJob) { - qDebug("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId); + qWarn("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId); return; } SCH_JOB_DLOG("start to free job 0x%" PRIx64 ", code:%s", *jobId, tstrerror(errCode)); - schHandleJobDrop(pJob, errCode); + (void)schHandleJobDrop(pJob, errCode); // ignore any error - schReleaseJob(*jobId); + (void)schReleaseJob(*jobId); // ignore error *jobId = 0; } @@ -182,7 +194,7 @@ void schedulerDestroy(void) { if (refId == 0) { break; } - taosRemoveRef(schMgmt.jobRef, pJob->refId); + (void)taosRemoveRef(schMgmt.jobRef, pJob->refId); // ignore error pJob = taosIterateRef(schMgmt.jobRef, refId); }