schduler add debug info

This commit is contained in:
dapan1121 2022-01-07 16:12:15 +08:00
parent b3c24f6e8f
commit 99d602b809
3 changed files with 109 additions and 96 deletions

View File

@ -24,7 +24,7 @@ extern "C" {
#include "catalog.h"
typedef struct SSchedulerCfg {
int32_t maxJobNum;
uint32_t maxJobNum;
} SSchedulerCfg;
typedef struct SQueryProfileSummary {

View File

@ -37,10 +37,10 @@ enum {
};
typedef struct SSchedulerMgmt {
uint64_t taskId;
uint64_t sId;
uint64_t taskId; // sequential taksId
uint64_t sId; // schedulerId
SSchedulerCfg cfg;
SHashObj *jobs; // key: queryId, value: SQueryJob*
SHashObj *jobs; // key: queryId, value: SQueryJob*
} SSchedulerMgmt;
typedef struct SSchCallbackParam {
@ -83,30 +83,29 @@ typedef struct SSchJobAttr {
typedef struct SSchJob {
uint64_t queryId;
int32_t levelNum;
int32_t levelIdx;
int8_t status;
SSchJobAttr attr;
SEpSet dataSrcEps;
SEpAddr resEp;
int32_t levelNum;
void *transport;
SArray *nodeList; // qnode/vnode list, element is SQueryNodeAddr
tsem_t rspSem;
int32_t userFetch;
int32_t remoteFetch;
SSchTask *fetchTask;
int32_t errCode;
void *res;
int32_t resNumOfRows;
SArray *levels; // Element is SQueryLevel, starting from 0. SArray<SSchLevel>
SArray *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler
int32_t levelIdx;
SEpSet dataSrcEps;
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*
SArray *levels; // Element is SQueryLevel, starting from 0. SArray<SSchLevel>
SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. SArray<void*>
int8_t status;
SEpAddr resEp;
tsem_t rspSem;
int32_t userFetch;
int32_t remoteFetch;
SSchTask *fetchTask;
int32_t errCode;
void *res;
int32_t resNumOfRows;
SQueryProfileSummary summary;
} SSchJob;
@ -115,12 +114,17 @@ typedef struct SSchJob {
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__)
#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
#define SCH_SET_JOB_TYPE(pAttr, type) (pAttr)->queryJob = ((type) != QUERY_TYPE_MODIFY)
#define SCH_JOB_NEED_FETCH(pAttr) ((pAttr)->queryJob)
#define SCH_JOB_ELOG(param, ...) qError("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("QID:%"PRIx64" " param, pJob->queryId, __VA_ARGS__)
#define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:%"PRIx64" " param, pJob->queryId, pTask->taskId, __VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))

View File

@ -112,87 +112,87 @@ static void cleanupTask(SSchTask* pTask) {
taosArrayDestroy(pTask->candidateAddrs);
}
int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *pJob) {
int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) {
int32_t code = 0;
pJob->queryId = dag->queryId;
job->queryId = dag->queryId;
if (dag->numOfSubplans <= 0) {
qError("invalid subplan num:%d", dag->numOfSubplans);
SCH_JOB_ELOG("invalid subplan num:%d", dag->numOfSubplans);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans);
if (levelNum <= 0) {
qError("invalid level num:%d", levelNum);
SCH_JOB_ELOG("invalid level num:%d", levelNum);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_TASK_NUMBER, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == planToTask) {
qError("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER);
SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
if (NULL == pJob->levels) {
qError("taosArrayInit %d failed", levelNum);
job->levels = taosArrayInit(levelNum, sizeof(SSchLevel));
if (NULL == job->levels) {
SCH_JOB_ELOG("taosArrayInit %d failed", levelNum);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
//??
pJob->attr.needFetch = true;
job->attr.needFetch = true;
pJob->levelNum = levelNum;
pJob->levelIdx = levelNum - 1;
job->levelNum = levelNum;
job->levelIdx = levelNum - 1;
pJob->subPlans = dag->pSubplans;
job->subPlans = dag->pSubplans;
SSchLevel level = {0};
SArray *levelPlans = NULL;
int32_t levelPlanNum = 0;
SArray *plans = NULL;
int32_t taskNum = 0;
SSchLevel *pLevel = NULL;
level.status = JOB_TASK_STATUS_NOT_START;
for (int32_t i = 0; i < levelNum; ++i) {
if (NULL == taosArrayPush(pJob->levels, &level)) {
qError("taosArrayPush failed");
if (NULL == taosArrayPush(job->levels, &level)) {
SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pLevel = taosArrayGet(pJob->levels, i);
pLevel = taosArrayGet(job->levels, i);
pLevel->level = i;
levelPlans = taosArrayGetP(dag->pSubplans, i);
if (NULL == levelPlans) {
qError("no level plans for level %d", i);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
levelPlanNum = (int32_t)taosArrayGetSize(levelPlans);
if (levelPlanNum <= 0) {
qError("invalid level plans number:%d, level:%d", levelPlanNum, i);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
pLevel->taskNum = levelPlanNum;
pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SSchTask));
plans = taosArrayGetP(dag->pSubplans, i);
if (NULL == plans) {
SCH_JOB_ELOG("empty level plan, level:%d", i);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
taskNum = (int32_t)taosArrayGetSize(plans);
if (taskNum <= 0) {
SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
pLevel->taskNum = taskNum;
pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask));
if (NULL == pLevel->subTasks) {
qError("taosArrayInit %d failed", levelPlanNum);
SCH_JOB_ELOG("taosArrayInit %d failed", taskNum);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
for (int32_t n = 0; n < levelPlanNum; ++n) {
SSubplan *plan = taosArrayGetP(levelPlans, n);
for (int32_t n = 0; n < taskNum; ++n) {
SSubplan *plan = taosArrayGetP(plans, n);
if (plan->type == QUERY_TYPE_MODIFY) {
pJob->attr.needFetch = false;
job->attr.needFetch = false;
} else {
pJob->attr.queryJob = true;
job->attr.queryJob = true;
}
SSchTask task = initTask(pJob, plan, pLevel);
SSchTask task = initTask(job, plan, pLevel);
void *p = taosArrayPush(pLevel->subTasks, &task);
if (NULL == p) {
qError("taosArrayPush failed");
@ -206,7 +206,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *pJob) {
}
}
SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask));
SCH_ERR_JRET(schBuildTaskRalation(job, planToTask));
if (planToTask) {
taosHashCleanup(planToTask);
@ -384,7 +384,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) {
SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved));
if (!moved) {
SCH_TASK_ERR_LOG(" task may already moved, status:%d", task->status);
SCH_TASK_ELOG(" task may already moved, status:%d", task->status);
return TSDB_CODE_SUCCESS;
}
@ -458,11 +458,11 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) {
SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry));
if (!needRetry) {
SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode);
SCH_TASK_ELOG("task failed[%x], no more retry", errCode);
SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved));
if (!moved) {
SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status);
SCH_TASK_ELOG("task may already moved, status:%d", task->status);
}
if (SCH_TASK_NEED_WAIT_ALL(task)) {
@ -825,7 +825,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
SCH_ERR_RET(schSetTaskCandidateAddrs(job, task));
if (NULL == task->candidateAddrs || taosArrayGetSize(task->candidateAddrs) <= 0) {
SCH_TASK_ERR_LOG("no valid candidate node for task:%"PRIx64, task->taskId);
SCH_TASK_ELOG("no valid candidate node for task:%"PRIx64, task->taskId);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
@ -888,41 +888,16 @@ uint64_t schGenSchId(void) {
}
int32_t schedulerInit(SSchedulerCfg *cfg) {
if (schMgmt.jobs) {
qError("scheduler already init");
return TSDB_CODE_QRY_INVALID_INPUT;
}
if (cfg) {
schMgmt.cfg = *cfg;
if (schMgmt.cfg.maxJobNum <= 0) {
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
}
} else {
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
}
schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == schMgmt.jobs) {
SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum);
}
schMgmt.sId = schGenSchId();
return TSDB_CODE_SUCCESS;
}
int32_t scheduleExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) {
int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) {
if (nodeList && taosArrayGetSize(nodeList) <= 0) {
qInfo("qnodeList is empty");
qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId);
}
int32_t code = 0;
SSchJob *job = calloc(1, sizeof(SSchJob));
if (NULL == job) {
qError("QID:%"PRIx64" calloc %d failed", sizeof(SSchJob));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
@ -975,18 +950,52 @@ int32_t scheduleExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag,
return TSDB_CODE_SUCCESS;
_return:
*(SSchJob **)pJob = NULL;
scheduleFreeJob(job);
SCH_RET(code);
}
int32_t schedulerInit(SSchedulerCfg *cfg) {
if (schMgmt.jobs) {
qError("scheduler already initialized");
return TSDB_CODE_QRY_INVALID_INPUT;
}
if (cfg) {
schMgmt.cfg = *cfg;
if (schMgmt.cfg.maxJobNum == 0) {
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
}
} else {
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
}
schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == schMgmt.jobs) {
qError("init schduler jobs failed, num:%u", schMgmt.cfg.maxJobNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (taosGetSystemUUID(&schMgmt.sId, sizeof(schMgmt.sId))) {
qError("generate schdulerId failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR);
}
qInfo("scheduler %"PRIx64" initizlized, maxJob:%u", schMgmt.cfg.maxJobNum);
return TSDB_CODE_SUCCESS;
}
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SCH_ERR_RET(scheduleExecJobImpl(transport, nodeList, pDag, pJob, true));
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, true));
SSchJob *job = *(SSchJob **)pJob;
@ -1001,7 +1010,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
return scheduleExecJobImpl(transport, nodeList, pDag, pJob, false);
return schExecJobImpl(transport, nodeList, pDag, pJob, false);
}