From feee87ddfa14be2267a6035af257ea04adef76f9 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 5 Jan 2022 10:11:26 +0800 Subject: [PATCH] feature/qnode --- source/libs/scheduler/inc/schedulerInt.h | 5 ++-- source/libs/scheduler/src/scheduler.c | 31 +++++++++++++++++------- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index fa4ae0d152..1d5d0599e6 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -114,8 +114,9 @@ typedef struct SSchJob { #define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN) #define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY) -#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) -#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) +#define SCH_JOB_ELOG(param, ...) qError("QID:% "PRIx64 param, job->queryId, __VA_ARGS__) +#define SCH_TASK_ELOG(param, ...) qError("QID:%"PRIx64",TID:% "PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) +#define SCH_TASK_DLOG(param, ...) qDebug("QID:%"PRIx64",TID:% "PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 22556f742b..ff254da6fb 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -270,6 +270,8 @@ int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + SCH_TASK_DLOG("push to %s list", "execTasks"); + return TSDB_CODE_SUCCESS; } @@ -284,6 +286,8 @@ int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + SCH_TASK_DLOG("push to %s list", "succTasks"); + *moved = true; return TSDB_CODE_SUCCESS; @@ -299,6 +303,8 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + SCH_TASK_DLOG("push to %s list", "failTasks"); + *moved = true; return TSDB_CODE_SUCCESS; @@ -372,7 +378,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); if (!moved) { - SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status); + SCH_TASK_ELOG("task may already moved, status:%d", task->status); return TSDB_CODE_SUCCESS; } @@ -447,11 +453,11 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry)); if (!needRetry) { - SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode); + SCH_TASK_ELOG("task failed[%x], no more retry", errCode); SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved)); if (!moved) { - SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status); + SCH_TASK_ELOG("task may already moved, status:%d", task->status); } if (SCH_TASK_NEED_WAIT_ALL(task)) { @@ -492,6 +498,7 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms goto _task_error; } } + break; } case TDMT_VND_SUBMIT_RSP: { if (rspCode != TSDB_CODE_SUCCESS) { @@ -567,19 +574,25 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in int32_t code = 0; SSchCallbackParam *pParam = (SSchCallbackParam *)param; - SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId)); - if (NULL == job || NULL == (*job)) { + SSchJob **pjob = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId)); + if (NULL == pjob || NULL == (*pjob)) { qError("taosHashGet queryId:%"PRIx64" not exist", pParam->queryId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } - SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId)); - if (NULL == task || NULL == (*task)) { + SSchJob *job = *pjob; + + SSchTask **ptask = taosHashGet(job->execTasks, &pParam->taskId, sizeof(pParam->taskId)); + if (NULL == ptask || NULL == (*ptask)) { qError("taosHashGet taskId:%"PRIx64" not exist", pParam->taskId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } + + SSchTask *task = *ptask; + + SCH_TASK_DLOG("Got msg:%d, rspCode:%d", msgType, rspCode); - schProcessRspMsg(*job, *task, msgType, pMsg->pData, pMsg->len, rspCode); + schProcessRspMsg(job, task, msgType, pMsg->pData, pMsg->len, rspCode); _return: tfree(param); @@ -809,7 +822,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) { SCH_ERR_RET(schSetTaskCondidateAddrs(job, task)); if (NULL == task->condidateAddrs || taosArrayGetSize(task->condidateAddrs) <= 0) { - SCH_TASK_ERR_LOG("no valid condidate node for task:%"PRIx64, task->taskId); + SCH_TASK_ELOG("no valid condidate node for task:%"PRIx64, task->taskId); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); }