From 3edb51d1bc822af4e998579ab63c94dd4bb578eb Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 7 Jan 2022 16:50:48 +0800 Subject: [PATCH] feature/qnode --- source/libs/scheduler/inc/schedulerInt.h | 1 - source/libs/scheduler/src/scheduler.c | 272 +++++++++++------------ 2 files changed, 126 insertions(+), 147 deletions(-) diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 100e9fb2cf..0982411cf5 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -84,7 +84,6 @@ typedef struct SSchJobAttr { typedef struct SSchJob { uint64_t queryId; SSchJobAttr attr; - int32_t levelNum; void *transport; SArray *nodeList; // qnode/vnode list, element is SQueryNodeAddr diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 0ce2d4af4a..300f7154a4 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -20,109 +20,106 @@ static SSchedulerMgmt schMgmt = {0}; -int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) { - for (int32_t i = 0; i < job->levelNum; ++i) { - SSchLevel *level = taosArrayGet(job->levels, i); +static int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { + for (int32_t i = 0; i < pJob->levelNum; ++i) { + SSchLevel *pLevel = taosArrayGet(pJob->levels, i); - for (int32_t m = 0; m < level->taskNum; ++m) { - SSchTask *task = taosArrayGet(level->subTasks, m); - SSubplan *plan = task->plan; - int32_t childNum = plan->pChildren ? (int32_t)taosArrayGetSize(plan->pChildren) : 0; - int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0; + for (int32_t m = 0; m < pLevel->taskNum; ++m) { + SSchTask *pTask = taosArrayGet(pLevel->subTasks, m); + SSubplan *pPlan = pTask->plan; + int32_t childNum = pPlan->pChildren ? (int32_t)taosArrayGetSize(pPlan->pChildren) : 0; + int32_t parentNum = pPlan->pParents ? (int32_t)taosArrayGetSize(pPlan->pParents) : 0; if (childNum > 0) { - task->children = taosArrayInit(childNum, POINTER_BYTES); - if (NULL == task->children) { - qError("taosArrayInit %d failed", childNum); + pTask->children = taosArrayInit(childNum, POINTER_BYTES); + if (NULL == pTask->children) { + SCH_TASK_ELOG("taosArrayInit %d children failed", childNum); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } for (int32_t n = 0; n < childNum; ++n) { - SSubplan **child = taosArrayGet(plan->pChildren, n); + SSubplan **child = taosArrayGet(pPlan->pChildren, n); SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES); if (NULL == childTask || NULL == *childTask) { - qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); + SCH_TASK_ELOG("subplan children relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - if (NULL == taosArrayPush(task->children, childTask)) { - qError("taosArrayPush failed"); + if (NULL == taosArrayPush(pTask->children, childTask)) { + SCH_TASK_ELOG("taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } if (parentNum > 0) { - task->parents = taosArrayInit(parentNum, POINTER_BYTES); - if (NULL == task->parents) { - qError("taosArrayInit %d failed", parentNum); + pTask->parents = taosArrayInit(parentNum, POINTER_BYTES); + if (NULL == pTask->parents) { + SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } for (int32_t n = 0; n < parentNum; ++n) { - SSubplan **parent = taosArrayGet(plan->pParents, n); + SSubplan **parent = taosArrayGet(pPlan->pParents, n); SSchTask **parentTask = taosHashGet(planToTask, parent, POINTER_BYTES); if (NULL == parentTask || NULL == *parentTask) { - qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); + SCH_TASK_ELOG("subplan parent relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - if (NULL == taosArrayPush(task->parents, parentTask)) { - qError("taosArrayPush failed"); + if (NULL == taosArrayPush(pTask->parents, parentTask)) { + SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - } + } + + SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum); } } - SSchLevel *level = taosArrayGet(job->levels, 0); - if (job->attr.queryJob && level->taskNum > 1) { - qError("invalid plan info, level 0, taskNum:%d", level->taskNum); + SSchLevel *pLevel = taosArrayGet(pJob->levels, 0); + if (pJob->attr.queryJob && pLevel->taskNum > 1) { + SCH_JOB_ELOG("invalid query plan, level:0, taskNum:%d", pLevel->taskNum); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - SSchTask *task = taosArrayGet(level->subTasks, 0); - if (task->parents && taosArrayGetSize(task->parents) > 0) { - qError("invalid plan info, level 0, parentNum:%d", (int32_t)taosArrayGetSize(task->parents)); - SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + for (int32_t i = 0; i < pLevel->taskNum; ++i) { + SSchTask *pTask = taosArrayGet(pLevel->subTasks, i); + + if (pTask->parents && taosArrayGetSize(pTask->parents) > 0) { + SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", (int32_t)taosArrayGetSize(pTask->parents)); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } } - return TSDB_CODE_SUCCESS; } -static SSchTask initTask(SSchJob* pJob, SSubplan* plan, SSchLevel *pLevel) { - SSchTask task = {0}; - if (plan->type == QUERY_TYPE_MODIFY) { - pJob->attr.needFetch = false; - } else { - pJob->attr.queryJob = true; - } +static int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) { + pTask->plan = pPlan; + pTask->level = pLevel; + pTask->status = JOB_TASK_STATUS_NOT_START; + pTask->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); - task.plan = plan; - task.level = pLevel; - task.status = JOB_TASK_STATUS_NOT_START; - task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); - - return task; + return TSDB_CODE_SUCCESS; } -static void cleanupTask(SSchTask* pTask) { +static void schFreeTask(SSchTask* pTask) { taosArrayDestroy(pTask->candidateAddrs); } -int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { +static int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) { int32_t code = 0; - job->queryId = dag->queryId; + pJob->queryId = pDag->queryId; - if (dag->numOfSubplans <= 0) { - SCH_JOB_ELOG("invalid subplan num:%d", dag->numOfSubplans); + if (pDag->numOfSubplans <= 0) { + SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans); + int32_t levelNum = (int32_t)taosArrayGetSize(pDag->pSubplans); if (levelNum <= 0) { SCH_JOB_ELOG("invalid level num:%d", levelNum); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -134,18 +131,16 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - job->levels = taosArrayInit(levelNum, sizeof(SSchLevel)); - if (NULL == job->levels) { + pJob->levels = taosArrayInit(levelNum, sizeof(SSchLevel)); + if (NULL == pJob->levels) { SCH_JOB_ELOG("taosArrayInit %d failed", levelNum); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - job->attr.needFetch = true; + pJob->levelNum = levelNum; + pJob->levelIdx = levelNum - 1; - job->levelNum = levelNum; - job->levelIdx = levelNum - 1; - - job->subPlans = dag->pSubplans; + pJob->subPlans = pDag->pSubplans; SSchLevel level = {0}; SArray *plans = NULL; @@ -155,16 +150,16 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { level.status = JOB_TASK_STATUS_NOT_START; for (int32_t i = 0; i < levelNum; ++i) { - if (NULL == taosArrayPush(job->levels, &level)) { + if (NULL == taosArrayPush(pJob->levels, &level)) { SCH_JOB_ELOG("taosArrayPush level failed, level:%d", i); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - pLevel = taosArrayGet(job->levels, i); + pLevel = taosArrayGet(pJob->levels, i); pLevel->level = i; - plans = taosArrayGetP(dag->pSubplans, i); + plans = taosArrayGetP(pDag->pSubplans, i); if (NULL == plans) { SCH_JOB_ELOG("empty level plan, level:%d", i); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); @@ -186,38 +181,34 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { for (int32_t n = 0; n < taskNum; ++n) { SSubplan *plan = taosArrayGetP(plans, n); - if (plan->type == QUERY_TYPE_MODIFY) { - job->attr.needFetch = false; - } else { - job->attr.queryJob = true; - } - SSchTask task = initTask(job, plan, pLevel); + SCH_SET_JOB_TYPE(&pJob->attr, plan->type); + + SSchTask task = {0}; + SSchTask *pTask = &task; + + schInitTask(pJob, &task, plan, pLevel); + void *p = taosArrayPush(pLevel->subTasks, &task); if (NULL == p) { - qError("taosArrayPush failed"); + SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) { - qError("taosHashPut failed"); + SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + + SCH_TASK_DLOG("task initialized, level:%d", pLevel->level); } + + SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum); } - SCH_ERR_JRET(schBuildTaskRalation(job, planToTask)); - - if (planToTask) { - taosHashCleanup(planToTask); - } - - return TSDB_CODE_SUCCESS; + SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask)); _return: - if (pLevel->subTasks) { - taosArrayDestroy(pLevel->subTasks); - } if (planToTask) { taosHashCleanup(planToTask); @@ -226,7 +217,7 @@ _return: SCH_RET(code); } -int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) { +static int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) { if (task->candidateAddrs) { return TSDB_CODE_SUCCESS; } @@ -273,7 +264,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) { return TSDB_CODE_SUCCESS; } -int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { +static int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { if (0 != taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) { qError("failed to add new task, taskId:0x%"PRIx64", reqId:0x"PRIx64", out of memory", pJob->queryId); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -284,7 +275,7 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { +static int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { qError("remove task taskId:0x%"PRIx64" from execTasks failed, reqId:0x%"PRIx64, task->taskId, job->queryId); return TSDB_CODE_SUCCESS; @@ -300,7 +291,7 @@ int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { return TSDB_CODE_SUCCESS; } -int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { +static int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { qWarn("remove task[%"PRIx64"] from execTasks failed, it may not exist", task->taskId); } @@ -315,7 +306,7 @@ int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { return TSDB_CODE_SUCCESS; } -int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { +static int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { // TODO set retry or not based on task type/errCode/retry times/job status/available eps... // TODO if needRetry, set task retry info @@ -325,7 +316,7 @@ int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, b } -int32_t schFetchFromRemote(SSchJob *job) { +static int32_t schFetchFromRemote(SSchJob *job) { int32_t code = 0; if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) { @@ -344,11 +335,11 @@ _return: } -int32_t schProcessOnJobPartialSuccess(SSchJob *job) { +static int32_t schProcessOnJobPartialSuccess(SSchJob *job) { job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED; bool needFetch = job->userFetch; - if ((!job->attr.needFetch) && job->attr.syncSchedule) { + if ((!SCH_JOB_NEED_FETCH(&job->attr)) && job->attr.syncSchedule) { tsem_post(&job->rspSem); } @@ -359,27 +350,27 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *job) { return TSDB_CODE_SUCCESS; } -int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) { +static int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) { job->status = JOB_TASK_STATUS_FAILED; job->errCode = errCode; atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); - if (job->userFetch || ((!job->attr.needFetch) && job->attr.syncSchedule)) { + if (job->userFetch || ((!SCH_JOB_NEED_FETCH(&job->attr)) && job->attr.syncSchedule)) { tsem_post(&job->rspSem); } return TSDB_CODE_SUCCESS; } -int32_t schProcessOnDataFetched(SSchJob *job) { +static int32_t schProcessOnDataFetched(SSchJob *job) { atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); tsem_post(&job->rspSem); } -int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { +static int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { bool moved = false; SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); @@ -451,7 +442,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { return TSDB_CODE_SUCCESS; } -int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { +static int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { bool needRetry = false; bool moved = false; int32_t taskDone = 0; @@ -488,7 +479,7 @@ int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { return TSDB_CODE_SUCCESS; } -int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { +static int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t code = 0; switch (msgType) { @@ -576,7 +567,7 @@ _return: } -int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) { +static int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) { int32_t code = 0; SSchCallbackParam *pParam = (SSchCallbackParam *)param; @@ -602,32 +593,32 @@ _return: SCH_RET(code); } -int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) { +static int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code); } -int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) { +static int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code); } -int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) { +static int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code); } -int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) { +static int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code); } -int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) { +static int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code); } -int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) { +static int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) { SSchCallbackParam *pParam = (SSchCallbackParam *)param; qDebug("drop task rsp received, queryId:%"PRIx64 ",taksId:%"PRIx64 ",code:%d", pParam->queryId, pParam->taskId, code); } -int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { +static int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { switch (msgType) { case TDMT_VND_CREATE_TABLE: *fp = schHandleCreateTableCallback; @@ -656,7 +647,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { } -int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) { +static int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) { int32_t code = 0; SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { @@ -695,7 +686,7 @@ _return: SCH_RET(code); } -void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { +static void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { epSet->inUse = addr->inUse; epSet->numOfEps = addr->numOfEps; @@ -706,7 +697,7 @@ void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { } -int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { +static int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { uint32_t msgSize = 0; void *msg = NULL; int32_t code = 0; @@ -819,7 +810,7 @@ _return: } -int32_t schLaunchTask(SSchJob *job, SSchTask *task) { +static int32_t schLaunchTask(SSchJob *job, SSchTask *task) { SSubplan *plan = task->plan; SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen)); SCH_ERR_RET(schSetTaskCandidateAddrs(job, task)); @@ -837,7 +828,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) { return TSDB_CODE_SUCCESS; } -int32_t schLaunchJob(SSchJob *job) { +static int32_t schLaunchJob(SSchJob *job) { SSchLevel *level = taosArrayGet(job->levels, job->levelIdx); for (int32_t i = 0; i < level->taskNum; ++i) { SSchTask *task = taosArrayGet(level->subTasks, i); @@ -849,7 +840,7 @@ int32_t schLaunchJob(SSchJob *job) { return TSDB_CODE_SUCCESS; } -void schDropJobAllTasks(SSchJob *job) { +static void schDropJobAllTasks(SSchJob *job) { void *pIter = taosHashIterate(job->succTasks, NULL); while (pIter) { SSchTask *task = *(SSchTask **)pIter; @@ -877,82 +868,71 @@ void schDropJobAllTasks(SSchJob *job) { } } -uint64_t schGenSchId(void) { - uint64_t sId = 0; - - // TODO - - qDebug("Gen sId:0x%"PRIx64, sId); - - return sId; -} - - - -int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) { +static int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** job, bool syncSchedule) { if (nodeList && taosArrayGetSize(nodeList) <= 0) { qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId); } int32_t code = 0; - SSchJob *job = calloc(1, sizeof(SSchJob)); - if (NULL == job) { + SSchJob *pJob = calloc(1, sizeof(SSchJob)); + if (NULL == pJob) { qError("QID:%"PRIx64" calloc %d failed", sizeof(SSchJob)); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - job->attr.syncSchedule = syncSchedule; - job->transport = transport; - job->nodeList = nodeList; + pJob->attr.syncSchedule = syncSchedule; + pJob->transport = transport; + pJob->nodeList = nodeList; - SCH_ERR_JRET(schValidateAndBuildJob(pDag, job)); + SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob)); - job->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == job->execTasks) { - qError("taosHashInit %d failed", pDag->numOfSubplans); + pJob->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == pJob->execTasks) { + SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - job->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == job->succTasks) { - qError("taosHashInit %d failed", pDag->numOfSubplans); + pJob->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == pJob->succTasks) { + SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - job->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == job->failTasks) { - qError("taosHashInit %d failed", pDag->numOfSubplans); + pJob->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == pJob->failTasks) { + SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - tsem_init(&job->rspSem, 0, 0); + tsem_init(&pJob->rspSem, 0, 0); - code = taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES); + code = taosHashPut(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId), &pJob, POINTER_BYTES); if (0 != code) { if (HASH_NODE_EXIST(code)) { - qError("taosHashPut queryId:0x%"PRIx64" already exist", job->queryId); + SCH_JOB_ELOG("job already exist, type:%d", pJob->attr.queryJob); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } else { - qError("taosHashPut queryId:0x%"PRIx64" failed", job->queryId); + qError("taosHashPut queryId:0x%"PRIx64" failed", pJob->queryId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } } - job->status = JOB_TASK_STATUS_NOT_START; - SCH_ERR_JRET(schLaunchJob(job)); + pJob->status = JOB_TASK_STATUS_NOT_START; + SCH_ERR_JRET(schLaunchJob(pJob)); - *(SSchJob **)pJob = job; + *(SSchJob **)job = pJob; if (syncSchedule) { - tsem_wait(&job->rspSem); + tsem_wait(&pJob->rspSem); } return TSDB_CODE_SUCCESS; _return: - *(SSchJob **)pJob = NULL; - scheduleFreeJob(job); + *(SSchJob **)job = NULL; + + scheduleFreeJob(pJob); SCH_RET(code); } @@ -1022,7 +1002,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) { SSchJob *job = pJob; int32_t code = 0; - if (!job->attr.needFetch) { + if (!SCH_JOB_NEED_FETCH(&job->attr)) { qError("no need to fetch data"); SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } @@ -1101,7 +1081,7 @@ void scheduleFreeJob(void *pJob) { int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); for(int32_t j = 0; j < numOfTasks; ++j) { SSchTask* pTask = taosArrayGet(pLevel->subTasks, j); - cleanupTask(pTask); + schFreeTask(pTask); } taosArrayDestroy(pLevel->subTasks);