From eed7b6f01220df1e2842dfa569e7e1b64b879671 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 17 Dec 2021 13:41:16 +0800 Subject: [PATCH] scheduler init version --- include/libs/scheduler/scheduler.h | 2 + include/util/taoserror.h | 5 ++ source/libs/scheduler/inc/schedulerInt.h | 12 ++-- source/libs/scheduler/src/scheduler.c | 72 +++++++++++++++++++----- source/util/src/terror.c | 2 + 5 files changed, 76 insertions(+), 17 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index d73e388c20..40c85520d4 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -67,6 +67,8 @@ int32_t scheduleFetchRows(void *pJob, void *data); */ int32_t scheduleCancelJob(void *pJob); +void scheduleFreeJob(void *pJob); + #ifdef __cplusplus } #endif diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 689d2676d1..5abb678a07 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -506,6 +506,11 @@ int32_t* taosGetErrno(); #define TSDB_CODE_CTG_MEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403) //catalog memory error #define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2404) //catalog system error +//scheduler +#define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) //scheduler status error + + + #ifdef __cplusplus } #endif diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 8e30ce1403..4620a816da 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -38,12 +38,13 @@ enum { }; typedef struct SSchedulerMgmt { + uint64_t taskId; SHashObj *Jobs; // key: queryId, value: SQueryJob* } SSchedulerMgmt; typedef struct SQueryTask { uint64_t taskId; // task id - char *pSubplan; // operator tree + char *msg; // operator tree int8_t status; // task status SQueryProfileSummary summary; // task execution summary } SQueryTask; @@ -68,9 +69,12 @@ typedef struct SQueryJob { } SQueryJob; -#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0) -#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); return _code; } } while (0) -#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { goto _return; } } while (0) +#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) + +#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) +#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) +#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0) +#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) #ifdef __cplusplus diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 8d2e1ed916..7d387cbc66 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -51,16 +51,17 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_ } int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { + int32_t code = 0; int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans); if (levelNum <= 0) { qError("invalid level num:%d", levelNum); - return TSDB_CODE_QRY_INVALID_INPUT; + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } job->levels = taosArrayInit(levelNum, sizeof(SQueryLevel)); if (NULL == job->levels) { qError("taosArrayInit %d failed", levelNum); - return TSDB_CODE_QRY_OUT_OF_MEMORY; + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } job->levelNum = levelNum; @@ -73,28 +74,66 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { SArray *levelPlans = NULL; int32_t levelPlanNum = 0; + level.status = SCH_STATUS_NOT_START; + for (int32_t i = 0; i < levelNum; ++i) { levelPlans = taosArrayGetP(dag->pSubplans, i); if (NULL == levelPlans) { qError("no level plans for level %d", i); - return TSDB_CODE_QRY_INVALID_INPUT; + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } levelPlanNum = (int32_t)taosArrayGetSize(levelPlans); if (levelPlanNum <= 0) { qError("invalid level plans number:%d, level:%d", levelPlanNum, i); - return TSDB_CODE_QRY_INVALID_INPUT; + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + level.taskNum = levelPlanNum; + level.subPlans = levelPlans; + + level.subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask)); + if (NULL == level.subTasks) { + qError("taosArrayInit %d failed", levelPlanNum); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } for (int32_t n = 0; n < levelPlanNum; ++n) { + SQueryTask *task = taosArrayGet(level.subTasks, n); + + task->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); + task->status = SCH_STATUS_NOT_START; + } + if (NULL == taosArrayPush(job->levels, &level)) { + qError("taosArrayPush failed"); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } return TSDB_CODE_SUCCESS; + +_return: + if (level.subTasks) { + taosArrayDestroy(level.subTasks); + } + + SCH_RET(code); } +int32_t schJobExecute(SQueryJob *job) { + switch (job->status) { + case SCH_STATUS_NOT_START: + + break; + + default: + SCH_JOB_ERR_LOG("invalid job status:%d", job->status); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } +} + int32_t schedulerInit(SSchedulerCfg *cfg) { schMgmt.Jobs = taosHashInit(SCHEDULE_DEFAULT_JOB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); @@ -108,32 +147,39 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob) { if (NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { - return TSDB_CODE_QRY_INVALID_INPUT; + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - + int32_t code = 0; SQueryJob *job = calloc(1, sizeof(SQueryJob)); if (NULL == job) { - return TSDB_CODE_QRY_OUT_OF_MEMORY; + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - schValidateAndBuildJob(pDag, job); - - - + SCH_ERR_JRET(schValidateAndBuildJob(pDag, job)); + SCH_ERR_JRET(schJobExecute(job)); *(SQueryJob **)pJob = job; + + return TSDB_CODE_SUCCESS; +_return: - - + *(SQueryJob **)pJob = NULL; + scheduleFreeJob(job); + + SCH_RET(code); } int32_t scheduleFetchRows(void *pJob, void *data); int32_t scheduleCancelJob(void *pJob); +void scheduleFreeJob(void *pJob) { + +} + void schedulerDestroy(void) { if (schMgmt.Jobs) { taosHashCleanup(schMgmt.Jobs); //TBD diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 5518ec2a31..e3a5c9e034 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -501,6 +501,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready" TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error") +//scheduler +TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error") #ifdef TAOS_ERROR_C