From 99d602b809f40b0654367366a2ab524e1a54772e Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 7 Jan 2022 16:12:15 +0800 Subject: [PATCH] schduler add debug info --- include/libs/scheduler/scheduler.h | 2 +- source/libs/scheduler/inc/schedulerInt.h | 48 +++---- source/libs/scheduler/src/scheduler.c | 155 ++++++++++++----------- 3 files changed, 109 insertions(+), 96 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index b2ba7acebf..6d7c5bdd01 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -24,7 +24,7 @@ extern "C" { #include "catalog.h" typedef struct SSchedulerCfg { - int32_t maxJobNum; + uint32_t maxJobNum; } SSchedulerCfg; typedef struct SQueryProfileSummary { diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index a7ec39bfde..100e9fb2cf 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -37,10 +37,10 @@ enum { }; typedef struct SSchedulerMgmt { - uint64_t taskId; - uint64_t sId; + uint64_t taskId; // sequential taksId + uint64_t sId; // schedulerId SSchedulerCfg cfg; - SHashObj *jobs; // key: queryId, value: SQueryJob* + SHashObj *jobs; // key: queryId, value: SQueryJob* } SSchedulerMgmt; typedef struct SSchCallbackParam { @@ -83,30 +83,29 @@ typedef struct SSchJobAttr { typedef struct SSchJob { uint64_t queryId; - int32_t levelNum; - int32_t levelIdx; - int8_t status; SSchJobAttr attr; - SEpSet dataSrcEps; - SEpAddr resEp; + + int32_t levelNum; void *transport; SArray *nodeList; // qnode/vnode list, element is SQueryNodeAddr - tsem_t rspSem; - int32_t userFetch; - int32_t remoteFetch; - SSchTask *fetchTask; - - int32_t errCode; - void *res; - int32_t resNumOfRows; + SArray *levels; // Element is SQueryLevel, starting from 0. SArray + SArray *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler + int32_t levelIdx; + SEpSet dataSrcEps; SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* - SArray *levels; // Element is SQueryLevel, starting from 0. SArray - SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. SArray - + int8_t status; + SEpAddr resEp; + tsem_t rspSem; + int32_t userFetch; + int32_t remoteFetch; + SSchTask *fetchTask; + int32_t errCode; + void *res; + int32_t resNumOfRows; SQueryProfileSummary summary; } SSchJob; @@ -115,12 +114,17 @@ typedef struct SSchJob { #define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN) #define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY) -#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) -#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) +#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != QUERY_TYPE_MODIFY) +#define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob) + +#define SCH_JOB_ELOG(param, ...) qError("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__) +#define SCH_JOB_DLOG(param, ...) qDebug("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__) + +#define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__) +#define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) -#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0) #define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) #define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock)) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index df4f121773..0ce2d4af4a 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -112,87 +112,87 @@ static void cleanupTask(SSchTask* pTask) { taosArrayDestroy(pTask->candidateAddrs); } -int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *pJob) { +int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { int32_t code = 0; - pJob->queryId = dag->queryId; + job->queryId = dag->queryId; if (dag->numOfSubplans <= 0) { - qError("invalid subplan num:%d", dag->numOfSubplans); + SCH_JOB_ELOG("invalid subplan num:%d", dag->numOfSubplans); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans); if (levelNum <= 0) { - qError("invalid level num:%d", levelNum); + SCH_JOB_ELOG("invalid level num:%d", levelNum); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_TASK_NUMBER, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); if (NULL == planToTask) { - qError("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER); + SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel)); - if (NULL == pJob->levels) { - qError("taosArrayInit %d failed", levelNum); + job->levels = taosArrayInit(levelNum, sizeof(SSchLevel)); + if (NULL == job->levels) { + SCH_JOB_ELOG("taosArrayInit %d failed", levelNum); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - //?? - pJob->attr.needFetch = true; + job->attr.needFetch = true; - pJob->levelNum = levelNum; - pJob->levelIdx = levelNum - 1; + job->levelNum = levelNum; + job->levelIdx = levelNum - 1; - pJob->subPlans = dag->pSubplans; + job->subPlans = dag->pSubplans; SSchLevel level = {0}; - SArray *levelPlans = NULL; - int32_t levelPlanNum = 0; + SArray *plans = NULL; + int32_t taskNum = 0; SSchLevel *pLevel = NULL; level.status = JOB_TASK_STATUS_NOT_START; for (int32_t i = 0; i < levelNum; ++i) { - if (NULL == taosArrayPush(pJob->levels, &level)) { - qError("taosArrayPush failed"); + if (NULL == taosArrayPush(job->levels, &level)) { + SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - pLevel = taosArrayGet(pJob->levels, i); + pLevel = taosArrayGet(job->levels, i); pLevel->level = i; - levelPlans = taosArrayGetP(dag->pSubplans, i); - if (NULL == levelPlans) { - qError("no level plans for level %d", i); - SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); - } - - levelPlanNum = (int32_t)taosArrayGetSize(levelPlans); - if (levelPlanNum <= 0) { - qError("invalid level plans number:%d, level:%d", levelPlanNum, i); - SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); - } - - pLevel->taskNum = levelPlanNum; - pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SSchTask)); + plans = taosArrayGetP(dag->pSubplans, i); + if (NULL == plans) { + SCH_JOB_ELOG("empty level plan, level:%d", i); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } + + taskNum = (int32_t)taosArrayGetSize(plans); + if (taskNum <= 0) { + SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } + + pLevel->taskNum = taskNum; + + pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask)); if (NULL == pLevel->subTasks) { - qError("taosArrayInit %d failed", levelPlanNum); + SCH_JOB_ELOG("taosArrayInit %d failed", taskNum); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - for (int32_t n = 0; n < levelPlanNum; ++n) { - SSubplan *plan = taosArrayGetP(levelPlans, n); + for (int32_t n = 0; n < taskNum; ++n) { + SSubplan *plan = taosArrayGetP(plans, n); if (plan->type == QUERY_TYPE_MODIFY) { - pJob->attr.needFetch = false; + job->attr.needFetch = false; } else { - pJob->attr.queryJob = true; + job->attr.queryJob = true; } - SSchTask task = initTask(pJob, plan, pLevel); + SSchTask task = initTask(job, plan, pLevel); void *p = taosArrayPush(pLevel->subTasks, &task); if (NULL == p) { qError("taosArrayPush failed"); @@ -206,7 +206,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *pJob) { } } - SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask)); + SCH_ERR_JRET(schBuildTaskRalation(job, planToTask)); if (planToTask) { taosHashCleanup(planToTask); @@ -384,7 +384,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); if (!moved) { - SCH_TASK_ERR_LOG(" task may already moved, status:%d", task->status); + SCH_TASK_ELOG(" task may already moved, status:%d", task->status); return TSDB_CODE_SUCCESS; } @@ -458,11 +458,11 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry)); if (!needRetry) { - SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode); + SCH_TASK_ELOG("task failed[%x], no more retry", errCode); SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved)); if (!moved) { - SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status); + SCH_TASK_ELOG("task may already moved, status:%d", task->status); } if (SCH_TASK_NEED_WAIT_ALL(task)) { @@ -825,7 +825,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) { SCH_ERR_RET(schSetTaskCandidateAddrs(job, task)); if (NULL == task->candidateAddrs || taosArrayGetSize(task->candidateAddrs) <= 0) { - SCH_TASK_ERR_LOG("no valid candidate node for task:%"PRIx64, task->taskId); + SCH_TASK_ELOG("no valid candidate node for task:%"PRIx64, task->taskId); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -888,41 +888,16 @@ uint64_t schGenSchId(void) { } -int32_t schedulerInit(SSchedulerCfg *cfg) { - if (schMgmt.jobs) { - qError("scheduler already init"); - return TSDB_CODE_QRY_INVALID_INPUT; - } - if (cfg) { - schMgmt.cfg = *cfg; - - if (schMgmt.cfg.maxJobNum <= 0) { - schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; - } - } else { - schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; - } - - schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == schMgmt.jobs) { - SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum); - } - - schMgmt.sId = schGenSchId(); - - return TSDB_CODE_SUCCESS; -} - - -int32_t scheduleExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) { +int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) { if (nodeList && taosArrayGetSize(nodeList) <= 0) { - qInfo("qnodeList is empty"); + qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId); } int32_t code = 0; SSchJob *job = calloc(1, sizeof(SSchJob)); if (NULL == job) { + qError("QID:%"PRIx64" calloc %d failed", sizeof(SSchJob)); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -975,18 +950,52 @@ int32_t scheduleExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, return TSDB_CODE_SUCCESS; _return: + *(SSchJob **)pJob = NULL; scheduleFreeJob(job); SCH_RET(code); } + +int32_t schedulerInit(SSchedulerCfg *cfg) { + if (schMgmt.jobs) { + qError("scheduler already initialized"); + return TSDB_CODE_QRY_INVALID_INPUT; + } + + if (cfg) { + schMgmt.cfg = *cfg; + + if (schMgmt.cfg.maxJobNum == 0) { + schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; + } + } else { + schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; + } + + schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == schMgmt.jobs) { + qError("init schduler jobs failed, num:%u", schMgmt.cfg.maxJobNum); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + if (taosGetSystemUUID(&schMgmt.sId, sizeof(schMgmt.sId))) { + qError("generate schdulerId failed, errno:%d", errno); + SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR); + } + + qInfo("scheduler %"PRIx64" initizlized, maxJob:%u", schMgmt.cfg.maxJobNum); + + return TSDB_CODE_SUCCESS; +} + int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SCH_ERR_RET(scheduleExecJobImpl(transport, nodeList, pDag, pJob, true)); + SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, true)); SSchJob *job = *(SSchJob **)pJob; @@ -1001,7 +1010,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - return scheduleExecJobImpl(transport, nodeList, pDag, pJob, false); + return schExecJobImpl(transport, nodeList, pDag, pJob, false); }