enh: add scheduler return code processing

This commit is contained in:
dapan1121 2024-07-15 17:11:36 +08:00
parent 4951ed9fa7
commit abe49ff61a
5 changed files with 32 additions and 22 deletions

View File

@ -23,8 +23,6 @@ extern "C" {
#include "catalog.h"
#include "planner.h"
extern tsem_t schdRspSem;
typedef struct SQueryProfileSummary {
int64_t startTs; // Object created and added into the message queue
int64_t endTs; // the timestamp when the task is completed
@ -101,8 +99,6 @@ void schedulerFreeJob(int64_t* job, int32_t errCode);
void schedulerDestroy(void);
void schdExecCallback(SExecResult* pResult, void* param, int32_t code);
#ifdef __cplusplus
}
#endif

View File

@ -16,17 +16,5 @@
#include "query.h"
#include "schInt.h"
tsem_t schdRspSem;
SSchDebug gSCHDebug = {0};
void schdExecCallback(SExecResult* pResult, void* param, int32_t code) {
if (code) {
pResult->code = code;
}
*(SExecResult*)param = *pResult;
taosMemoryFree(pResult);
tsem_post(&schdRspSem);
}

View File

@ -50,6 +50,10 @@ int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
int32_t taskNum = taosArrayGetSize(pJob->dataSrcTasks);
for (int32_t i = 0; i < taskNum; ++i) {
SSchTask *pTask = *(SSchTask **)taosArrayGet(pJob->dataSrcTasks, i);
if (NULL == pTask) {
SCH_JOB_DLOG("fail to get the %dth task", i);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
sum += pTask->plan->execNodeStat.tableNum;
}
@ -214,6 +218,10 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
for (int32_t i = 0; i < taskNum; ++i) {
pTask = *(SSchTask **)taosArrayGet(ctrl->taskList, i);
if (NULL == pTask) {
SCH_JOB_ELOG("fail to get the %dth task", i);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SEp *ep = SCH_GET_CUR_EP(&pTask->plan->execNode);
if (pTask->plan->execNodeStat.tableNum > remainNum && ctrl->execTaskNum > 0) {
@ -243,6 +251,11 @@ int32_t schLaunchTasksInFlowCtrlListImpl(SSchJob *pJob, SSchFlowControl *ctrl) {
if (i < (taskNum - 1)) {
SSchTask *pLastTask = *(SSchTask **)taosArrayGetLast(ctrl->taskList);
if (NULL == pLastTask) {
SCH_JOB_ELOG("fail to get the last task, num:%d", taosArrayGetSize(ctrl->taskList));
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
if (remainNum < pLastTask->plan->execNodeStat.tableNum) {
SCH_TASK_DLOG("no more task to launch, fqdn:%s, port:%d, remainNum:%" PRId64 ", remainExecTaskNum:%d, smallestInList:%d",
ep->fqdn, ep->port, ctrl->tableNumSum, ctrl->execTaskNum, pLastTask->plan->execNodeStat.tableNum);

View File

@ -47,6 +47,7 @@ void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
return;
_return:
SCH_JOB_DLOG("job errCode updated to %s", tstrerror(errCode));
}

View File

@ -108,14 +108,26 @@ int32_t schedulerGetTasksStatus(int64_t jobId, SArray *pSub) {
for (int32_t i = pJob->levelNum - 1; i >= 0; --i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
if (NULL == pLevel) {
qError("failed to get level %d", i);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
for (int32_t m = 0; m < pLevel->taskNum; ++m) {
SSchTask *pTask = taosArrayGet(pLevel->subTasks, m);
if (NULL == pTask) {
qError("failed to get task %d, total: %d", m, pLevel->taskNum);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
SQuerySubDesc subDesc = {0};
subDesc.tid = pTask->taskId;
strcpy(subDesc.status, jobTaskStatusStr(pTask->status));
taosArrayPush(pSub, &subDesc);
if (NULL == taosArrayPush(pSub, &subDesc)) {
qError("taosArrayPush task %d failed, error: %x, ", m, terrno);
SCH_ERR_JRET(terrno);
}
}
}
@ -141,7 +153,7 @@ int32_t schedulerUpdatePolicy(int32_t policy) {
qDebug("schedule policy updated to %d", schMgmt.cfg.schPolicy);
break;
default:
return TSDB_CODE_TSC_INVALID_INPUT;
SCH_RET(TSDB_CODE_TSC_INVALID_INPUT);
}
return TSDB_CODE_SUCCESS;
@ -159,14 +171,14 @@ void schedulerFreeJob(int64_t *jobId, int32_t errCode) {
SSchJob *pJob = schAcquireJob(*jobId);
if (NULL == pJob) {
qDebug("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId);
qWarn("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId);
return;
}
SCH_JOB_DLOG("start to free job 0x%" PRIx64 ", code:%s", *jobId, tstrerror(errCode));
schHandleJobDrop(pJob, errCode);
(void)schHandleJobDrop(pJob, errCode); // ignore any error
schReleaseJob(*jobId);
(void)schReleaseJob(*jobId); // ignore error
*jobId = 0;
}
@ -182,7 +194,7 @@ void schedulerDestroy(void) {
if (refId == 0) {
break;
}
taosRemoveRef(schMgmt.jobRef, pJob->refId);
(void)taosRemoveRef(schMgmt.jobRef, pJob->refId); // ignore error
pJob = taosIterateRef(schMgmt.jobRef, refId);
}