scheduler init version
This commit is contained in:
parent
ff7168f18b
commit
eed7b6f012
|
@ -67,6 +67,8 @@ int32_t scheduleFetchRows(void *pJob, void *data);
|
|||
*/
|
||||
int32_t scheduleCancelJob(void *pJob);
|
||||
|
||||
void scheduleFreeJob(void *pJob);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -506,6 +506,11 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_CTG_MEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403) //catalog memory error
|
||||
#define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2404) //catalog system error
|
||||
|
||||
//scheduler
|
||||
#define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) //scheduler status error
|
||||
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -38,12 +38,13 @@ enum {
|
|||
};
|
||||
|
||||
typedef struct SSchedulerMgmt {
|
||||
uint64_t taskId;
|
||||
SHashObj *Jobs; // key: queryId, value: SQueryJob*
|
||||
} SSchedulerMgmt;
|
||||
|
||||
typedef struct SQueryTask {
|
||||
uint64_t taskId; // task id
|
||||
char *pSubplan; // operator tree
|
||||
char *msg; // operator tree
|
||||
int8_t status; // task status
|
||||
SQueryProfileSummary summary; // task execution summary
|
||||
} SQueryTask;
|
||||
|
@ -68,9 +69,12 @@ typedef struct SQueryJob {
|
|||
} SQueryJob;
|
||||
|
||||
|
||||
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0)
|
||||
#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); return _code; } } while (0)
|
||||
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { goto _return; } } while (0)
|
||||
#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__)
|
||||
|
||||
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
|
||||
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
|
||||
#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
|
||||
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -51,16 +51,17 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_
|
|||
}
|
||||
|
||||
int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
|
||||
int32_t code = 0;
|
||||
int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans);
|
||||
if (levelNum <= 0) {
|
||||
qError("invalid level num:%d", levelNum);
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
job->levels = taosArrayInit(levelNum, sizeof(SQueryLevel));
|
||||
if (NULL == job->levels) {
|
||||
qError("taosArrayInit %d failed", levelNum);
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
job->levelNum = levelNum;
|
||||
|
@ -73,28 +74,66 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
|
|||
SArray *levelPlans = NULL;
|
||||
int32_t levelPlanNum = 0;
|
||||
|
||||
level.status = SCH_STATUS_NOT_START;
|
||||
|
||||
for (int32_t i = 0; i < levelNum; ++i) {
|
||||
levelPlans = taosArrayGetP(dag->pSubplans, i);
|
||||
if (NULL == levelPlans) {
|
||||
qError("no level plans for level %d", i);
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
levelPlanNum = (int32_t)taosArrayGetSize(levelPlans);
|
||||
if (levelPlanNum <= 0) {
|
||||
qError("invalid level plans number:%d, level:%d", levelPlanNum, i);
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
level.taskNum = levelPlanNum;
|
||||
level.subPlans = levelPlans;
|
||||
|
||||
level.subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask));
|
||||
if (NULL == level.subTasks) {
|
||||
qError("taosArrayInit %d failed", levelPlanNum);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
for (int32_t n = 0; n < levelPlanNum; ++n) {
|
||||
SQueryTask *task = taosArrayGet(level.subTasks, n);
|
||||
|
||||
task->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
|
||||
task->status = SCH_STATUS_NOT_START;
|
||||
}
|
||||
|
||||
if (NULL == taosArrayPush(job->levels, &level)) {
|
||||
qError("taosArrayPush failed");
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
if (level.subTasks) {
|
||||
taosArrayDestroy(level.subTasks);
|
||||
}
|
||||
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t schJobExecute(SQueryJob *job) {
|
||||
switch (job->status) {
|
||||
case SCH_STATUS_NOT_START:
|
||||
|
||||
break;
|
||||
|
||||
default:
|
||||
SCH_JOB_ERR_LOG("invalid job status:%d", job->status);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int32_t schedulerInit(SSchedulerCfg *cfg) {
|
||||
schMgmt.Jobs = taosHashInit(SCHEDULE_DEFAULT_JOB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
|
||||
|
@ -108,32 +147,39 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
|
|||
|
||||
int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob) {
|
||||
if (NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
|
||||
int32_t code = 0;
|
||||
SQueryJob *job = calloc(1, sizeof(SQueryJob));
|
||||
if (NULL == job) {
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
schValidateAndBuildJob(pDag, job);
|
||||
|
||||
|
||||
|
||||
SCH_ERR_JRET(schValidateAndBuildJob(pDag, job));
|
||||
|
||||
SCH_ERR_JRET(schJobExecute(job));
|
||||
|
||||
*(SQueryJob **)pJob = job;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
|
||||
|
||||
*(SQueryJob **)pJob = NULL;
|
||||
scheduleFreeJob(job);
|
||||
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
int32_t scheduleFetchRows(void *pJob, void *data);
|
||||
|
||||
int32_t scheduleCancelJob(void *pJob);
|
||||
|
||||
void scheduleFreeJob(void *pJob) {
|
||||
|
||||
}
|
||||
|
||||
void schedulerDestroy(void) {
|
||||
if (schMgmt.Jobs) {
|
||||
taosHashCleanup(schMgmt.Jobs); //TBD
|
||||
|
|
|
@ -501,6 +501,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready"
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error")
|
||||
|
||||
//scheduler
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error")
|
||||
|
||||
|
||||
#ifdef TAOS_ERROR_C
|
||||
|
|
Loading…
Reference in New Issue