From af406ab31a54f7ff2356da383de3f1ffeac1fd34 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 24 Jan 2022 02:31:19 +0000 Subject: [PATCH] no refact --- cmake/cmake.options | 2 +- source/libs/scheduler/src/scheduler.c | 577 +++++++++++++------------- 2 files changed, 298 insertions(+), 281 deletions(-) diff --git a/cmake/cmake.options b/cmake/cmake.options index 4883b6ee8e..faa45256fb 100644 --- a/cmake/cmake.options +++ b/cmake/cmake.options @@ -47,7 +47,7 @@ option( option( BUILD_WITH_UV "If build with libuv" - ON + OFF ) option( diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index a6b499aca8..f31a27cb42 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -13,18 +13,20 @@ * along with this program. If not, see . */ -#include "catalog.h" -#include "query.h" #include "schedulerInt.h" #include "tmsg.h" +#include "query.h" +#include "catalog.h" static SSchedulerMgmt schMgmt = {0}; -uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); } +uint64_t schGenTaskId(void) { + return atomic_add_fetch_64(&schMgmt.taskId, 1); +} uint64_t schGenUUID(void) { static uint64_t hashId = 0; - static int32_t requestSerialId = 0; + static int32_t requestSerialId = 0; if (hashId == 0) { char uid[64]; @@ -36,17 +38,18 @@ uint64_t schGenUUID(void) { } } - int64_t ts = taosGetTimestampMs(); - uint64_t pid = taosGetPId(); - int32_t val = atomic_add_fetch_32(&requestSerialId, 1); + int64_t ts = taosGetTimestampMs(); + uint64_t pid = taosGetPId(); + int32_t val = atomic_add_fetch_32(&requestSerialId, 1); uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF); return id; } -int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) { - pTask->plan = pPlan; - pTask->level = pLevel; + +int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) { + pTask->plan = pPlan; + pTask->level = pLevel; SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); pTask->taskId = schGenTaskId(); pTask->execAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); @@ -58,7 +61,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * return TSDB_CODE_SUCCESS; } -void schFreeTask(SSchTask *pTask) { +void schFreeTask(SSchTask* pTask) { if (pTask->candidateAddrs) { taosArrayDestroy(pTask->candidateAddrs); } @@ -78,9 +81,10 @@ void schFreeTask(SSchTask *pTask) { } } + int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { int32_t lastMsgType = atomic_load_32(&pTask->lastMsgType); - + switch (msgType) { case TDMT_VND_CREATE_TABLE_RSP: case TDMT_VND_SUBMIT_RSP: @@ -93,23 +97,22 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING && - SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_PARTIAL_SUCCEED) { - SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%d", SCH_GET_TASK_STATUS(pTask), - msgType); + if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING && SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_PARTIAL_SUCCEED) { + SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%d", SCH_GET_TASK_STATUS(pTask), msgType); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } break; default: SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%d", msgType, SCH_GET_TASK_STATUS(pTask)); - + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } return TSDB_CODE_SUCCESS; } + int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { int32_t code = 0; @@ -121,34 +124,37 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { if (oriStatus == newStatus) { SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - + switch (oriStatus) { case JOB_TASK_STATUS_NULL: if (newStatus != JOB_TASK_STATUS_NOT_START) { SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - + break; case JOB_TASK_STATUS_NOT_START: if (newStatus != JOB_TASK_STATUS_EXECUTING) { SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - + break; case JOB_TASK_STATUS_EXECUTING: - if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED && newStatus != JOB_TASK_STATUS_FAILED && - newStatus != JOB_TASK_STATUS_CANCELLING && newStatus != JOB_TASK_STATUS_CANCELLED && - newStatus != JOB_TASK_STATUS_DROPPING) { + if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED + && newStatus != JOB_TASK_STATUS_FAILED + && newStatus != JOB_TASK_STATUS_CANCELLING + && newStatus != JOB_TASK_STATUS_CANCELLED + && newStatus != JOB_TASK_STATUS_DROPPING) { SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - + break; case JOB_TASK_STATUS_PARTIAL_SUCCEED: - if (newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_SUCCEED && - newStatus != JOB_TASK_STATUS_DROPPING) { + if (newStatus != JOB_TASK_STATUS_FAILED + && newStatus != JOB_TASK_STATUS_SUCCEED + && newStatus != JOB_TASK_STATUS_DROPPING) { SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - + break; case JOB_TASK_STATUS_SUCCEED: case JOB_TASK_STATUS_FAILED: @@ -156,13 +162,13 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { if (newStatus != JOB_TASK_STATUS_DROPPING) { SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - + break; case JOB_TASK_STATUS_CANCELLED: case JOB_TASK_STATUS_DROPPING: SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); break; - + default: SCH_JOB_ELOG("invalid job status:%d", oriStatus); SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); @@ -185,22 +191,23 @@ _return: SCH_ERR_RET(code); } + int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { for (int32_t i = 0; i < pJob->levelNum; ++i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); - + for (int32_t m = 0; m < pLevel->taskNum; ++m) { SSchTask *pTask = taosArrayGet(pLevel->subTasks, m); SSubplan *pPlan = pTask->plan; - int32_t childNum = pPlan->pChildren ? (int32_t)taosArrayGetSize(pPlan->pChildren) : 0; - int32_t parentNum = pPlan->pParents ? (int32_t)taosArrayGetSize(pPlan->pParents) : 0; + int32_t childNum = pPlan->pChildren ? (int32_t)taosArrayGetSize(pPlan->pChildren) : 0; + int32_t parentNum = pPlan->pParents ? (int32_t)taosArrayGetSize(pPlan->pParents) : 0; if (childNum > 0) { if (pJob->levelIdx == pLevel->level) { SCH_JOB_ELOG("invalid query plan, lowest level, childNum:%d", childNum); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - + pTask->children = taosArrayInit(childNum, POINTER_BYTES); if (NULL == pTask->children) { SCH_TASK_ELOG("taosArrayInit %d children failed", childNum); @@ -227,7 +234,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { SCH_TASK_ELOG("invalid task info, level:0, parentNum:%d", parentNum); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - + pTask->parents = taosArrayInit(parentNum, POINTER_BYTES); if (NULL == pTask->parents) { SCH_TASK_ELOG("taosArrayInit %d parents failed", parentNum); @@ -252,7 +259,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { SCH_TASK_ELOG("taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - } + } SCH_TASK_DLOG("level:%d, parentNum:%d, childNum:%d", i, parentNum, childNum); } @@ -267,6 +274,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { return TSDB_CODE_SUCCESS; } + int32_t schRecordTaskSucceedNode(SSchTask *pTask) { SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx)); @@ -277,6 +285,7 @@ int32_t schRecordTaskSucceedNode(SSchTask *pTask) { return TSDB_CODE_SUCCESS; } + int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr) { if (NULL == taosArrayPush(pTask->execAddrs, addr)) { SCH_TASK_ELOG("taosArrayPush addr to execAddr list failed, errno:%d", errno); @@ -286,25 +295,23 @@ int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *ad return TSDB_CODE_SUCCESS; } + int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) { int32_t code = 0; pJob->queryId = pDag->queryId; - + if (pDag->numOfSubplans <= 0) { SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - + int32_t levelNum = (int32_t)taosArrayGetSize(pDag->pSubplans); if (levelNum <= 0) { SCH_JOB_ELOG("invalid level num:%d", levelNum); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SHashObj *planToTask = taosHashInit( - SCHEDULE_DEFAULT_TASK_NUMBER, - taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, - HASH_NO_LOCK); + SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_TASK_NUMBER, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); if (NULL == planToTask) { SCH_JOB_ELOG("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -321,9 +328,9 @@ int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) { pJob->subPlans = pDag->pSubplans; - SSchLevel level = {0}; - SArray * plans = NULL; - int32_t taskNum = 0; + SSchLevel level = {0}; + SArray *plans = NULL; + int32_t taskNum = 0; SSchLevel *pLevel = NULL; level.status = JOB_TASK_STATUS_NOT_START; @@ -335,9 +342,9 @@ int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) { } pLevel = taosArrayGet(pJob->levels, i); - + pLevel->level = i; - + plans = taosArrayGetP(pDag->pSubplans, i); if (NULL == plans) { SCH_JOB_ELOG("empty level plan, level:%d", i); @@ -351,13 +358,13 @@ int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) { } pLevel->taskNum = taskNum; - + pLevel->subTasks = taosArrayInit(taskNum, sizeof(SSchTask)); if (NULL == pLevel->subTasks) { SCH_JOB_ELOG("taosArrayInit %d failed", taskNum); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - + for (int32_t n = 0; n < taskNum; ++n) { SSubplan *plan = taosArrayGetP(plans, n); @@ -365,15 +372,15 @@ int32_t schValidateAndBuildJob(SQueryDag *pDag, SSchJob *pJob) { SSchTask task = {0}; SSchTask *pTask = &task; - + SCH_ERR_JRET(schInitTask(pJob, &task, plan, pLevel)); - + void *p = taosArrayPush(pLevel->subTasks, &task); if (NULL == p) { SCH_TASK_ELOG("taosArrayPush task to level failed, level:%d, taskIdx:%d", pLevel->level, n); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - + if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) { SCH_TASK_ELOG("taosHashPut to planToTaks failed, taskIdx:%d", n); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -422,10 +429,10 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { int32_t nodeNum = 0; if (pJob->nodeList) { nodeNum = taosArrayGetSize(pJob->nodeList); - + for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) { SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i); - + if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) { SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -440,14 +447,14 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_QRY_INVALID_INPUT; } - /* - for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) { - strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); - epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i]; - - ++epSet->numOfEps; - } - */ +/* + for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) { + strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); + epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i]; + + ++epSet->numOfEps; + } +*/ return TSDB_CODE_SUCCESS; } @@ -459,7 +466,7 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { SCH_TASK_ELOG("task already in execTask list, code:%x", code); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - + SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -478,11 +485,11 @@ int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) { if (0 != code) { if (HASH_NODE_EXIST(code)) { *moved = true; - + SCH_TASK_ELOG("task already in succTask list, status:%d", SCH_GET_TASK_STATUS(pTask)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - + SCH_TASK_ELOG("taosHashPut task to succTask list failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -490,13 +497,13 @@ int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) { *moved = true; SCH_TASK_DLOG("task moved to succTask list, numOfTasks:%d", taosHashGetSize(pJob->succTasks)); - + return TSDB_CODE_SUCCESS; } int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) { *moved = false; - + if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) { SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask)); } @@ -505,11 +512,11 @@ int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) { if (0 != code) { if (HASH_NODE_EXIST(code)) { *moved = true; - + SCH_TASK_WLOG("task already in failTask list, status:%d", SCH_GET_TASK_STATUS(pTask)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - + SCH_TASK_ELOG("taosHashPut task to failTask list failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -517,10 +524,11 @@ int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) { *moved = true; SCH_TASK_DLOG("task moved to failTask list, numOfTasks:%d", taosHashGetSize(pJob->failTasks)); - + return TSDB_CODE_SUCCESS; } + int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) { SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask)); @@ -530,11 +538,11 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { if (0 != code) { if (HASH_NODE_EXIST(code)) { *moved = true; - + SCH_TASK_ELOG("task already in execTask list, status:%d", SCH_GET_TASK_STATUS(pTask)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - + SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -542,10 +550,11 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { *moved = true; SCH_TASK_DLOG("task moved to execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks)); - + return TSDB_CODE_SUCCESS; } + int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { // TODO set retry or not based on task type/errCode/retry times/job status/available eps... // TODO if needRetry, set task retry info @@ -560,7 +569,7 @@ int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, b int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) { // if already FAILED, no more processing SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, status)); - + if (errCode) { atomic_store_32(&pJob->errCode, errCode); } @@ -574,6 +583,8 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod assert(0); } + + // Note: no more error processing, handled in function internal int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAILED, errCode)); @@ -584,10 +595,11 @@ int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_DROPPING, errCode)); } + // Note: no more error processing, handled in function internal int32_t schFetchFromRemote(SSchJob *pJob) { int32_t code = 0; - + if (atomic_val_compare_exchange_32(&pJob->remoteFetch, 0, 1) != 0) { SCH_JOB_ELOG("prior fetching not finished, remoteFetch:%d", atomic_load_32(&pJob->remoteFetch)); return TSDB_CODE_SUCCESS; @@ -604,7 +616,7 @@ int32_t schFetchFromRemote(SSchJob *pJob) { SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_VND_FETCH)); return TSDB_CODE_SUCCESS; - + _return: atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); @@ -614,16 +626,17 @@ _return: return code; } + // Note: no more error processing, handled in function internal int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { int32_t code = 0; - + SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_PARTIAL_SUCCEED)); if ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule) { tsem_post(&pJob->rspSem); } - + if (atomic_load_8(&pJob->userFetch)) { SCH_ERR_JRET(schFetchFromRemote(pJob)); } @@ -644,15 +657,15 @@ int32_t schProcessOnDataFetched(SSchJob *job) { // Note: no more error processing, handled in function internal int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { - bool needRetry = false; - bool moved = false; + bool needRetry = false; + bool moved = false; int32_t taskDone = 0; int32_t code = 0; SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode)); SCH_ERR_JRET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry)); - + if (!needRetry) { SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode)); @@ -664,7 +677,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) } SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED); - + if (SCH_TASK_NEED_WAIT_ALL(pTask)) { SCH_LOCK(SCH_WRITE, &pTask->level->lock); pTask->level->taskFailed++; @@ -672,7 +685,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) SCH_UNLOCK(SCH_WRITE, &pTask->level->lock); atomic_store_32(&pJob->errCode, errCode); - + if (taskDone < pTask->level->taskNum) { SCH_TASK_DLOG("not all tasks done, done:%d, all:%d", taskDone, pTask->level->taskNum); SCH_ERR_RET(errCode); @@ -681,7 +694,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) } else { // Note: no more error processing, already handled SCH_ERR_RET(schLaunchTask(pJob, pTask)); - + return TSDB_CODE_SUCCESS; } @@ -692,14 +705,15 @@ _return: SCH_ERR_RET(errCode); } + // Note: no more error processing, handled in function internal int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { - bool moved = false; - int32_t code = 0; + bool moved = false; + int32_t code = 0; SSchTask *pErrTask = pTask; SCH_TASK_DLOG("taskOnSuccess, status:%d", SCH_GET_TASK_STATUS(pTask)); - + code = schMoveTaskToSuccList(pJob, pTask, &moved); if (code && moved) { SCH_ERR_RET(code); @@ -708,20 +722,20 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_PARTIAL_SUCCEED); SCH_ERR_JRET(schRecordTaskSucceedNode(pTask)); - + int32_t parentNum = pTask->parents ? (int32_t)taosArrayGetSize(pTask->parents) : 0; if (parentNum == 0) { int32_t taskDone = 0; - + if (SCH_TASK_NEED_WAIT_ALL(pTask)) { SCH_LOCK(SCH_WRITE, &pTask->level->lock); pTask->level->taskSucceed++; taskDone = pTask->level->taskSucceed + pTask->level->taskFailed; SCH_UNLOCK(SCH_WRITE, &pTask->level->lock); - + if (taskDone < pTask->level->taskNum) { - SCH_TASK_ELOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum); - + SCH_TASK_DLOG("wait all tasks, done:%d, all:%d", taskDone, pTask->level->taskNum); + return TSDB_CODE_SUCCESS; } else if (taskDone > pTask->level->taskNum) { assert(0); @@ -742,32 +756,32 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { if (code && moved) { SCH_ERR_RET(code); } - + SCH_ERR_RET(schProcessOnJobPartialSuccess(pJob)); return TSDB_CODE_SUCCESS; } - /* - if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) { - strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn)); - job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port; +/* + if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) { + strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn)); + job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port; - ++job->dataSrcEps.numOfEps; - } - */ + ++job->dataSrcEps.numOfEps; + } +*/ for (int32_t i = 0; i < parentNum; ++i) { SSchTask *par = *(SSchTask **)taosArrayGet(pTask->parents, i); pErrTask = par; - + atomic_add_fetch_32(&par->childReady, 1); SCH_LOCK(SCH_WRITE, &par->lock); SDownstreamSource source = {.taskId = pTask->taskId, .schedId = schMgmt.sId, .addr = pTask->succeedAddr}; qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &source); SCH_UNLOCK(SCH_WRITE, &par->lock); - + if (SCH_TASK_READY_TO_LUNCH(par)) { SCH_ERR_RET(schLaunchTask(pJob, par)); } @@ -782,24 +796,23 @@ _return: SCH_ERR_RET(code); } -int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, - int32_t rspCode) { +int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t code = 0; SCH_ERR_JRET(schValidateTaskReceivedMsgType(pJob, pTask, msgType)); switch (msgType) { case TDMT_VND_CREATE_TABLE_RSP: { - if (rspCode != TSDB_CODE_SUCCESS) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); + if (rspCode != TSDB_CODE_SUCCESS) { + SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); + } + + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); + + break; } - - SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); - - break; - } case TDMT_VND_SUBMIT_RSP: { -#if 0 // TODO OPEN THIS + #if 0 //TODO OPEN THIS SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg; if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) { @@ -807,69 +820,71 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch } pJob->resNumOfRows += rsp->affectedRows; -#else - if (rspCode != TSDB_CODE_SUCCESS) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); + #else + if (rspCode != TSDB_CODE_SUCCESS) { + SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); + } + + SShellSubmitRsp *rsp = (SShellSubmitRsp *)msg; + if (rsp) { + pJob->resNumOfRows += rsp->affectedRows; + } + #endif + + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); + + break; } - - SShellSubmitRsp *rsp = (SShellSubmitRsp *)msg; - if (rsp) { - pJob->resNumOfRows += rsp->affectedRows; - } -#endif - - SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); - - break; - } case TDMT_VND_QUERY_RSP: { - SQueryTableRsp *rsp = (SQueryTableRsp *)msg; - - if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); + SQueryTableRsp *rsp = (SQueryTableRsp *)msg; + + if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) { + SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); + } + + SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY)); + + break; } - - SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, TDMT_VND_RES_READY)); - - break; - } case TDMT_VND_RES_READY_RSP: { - SResReadyRsp *rsp = (SResReadyRsp *)msg; - - if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rsp->code)); + SResReadyRsp *rsp = (SResReadyRsp *)msg; + + if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) { + SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); + } + + SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); + + break; } - - SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); - - break; - } case TDMT_VND_FETCH_RSP: { - SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; - if (rspCode != TSDB_CODE_SUCCESS || NULL == msg) { - SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); - } + if (rspCode != TSDB_CODE_SUCCESS || NULL == msg) { + SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); + } - if (pJob->res) { - SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->res); - tfree(rsp); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); - } + if (pJob->res) { + SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->res); + tfree(rsp); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + atomic_store_ptr(&pJob->res, rsp); + atomic_store_32(&pJob->resNumOfRows, rsp->numOfRows); + + if (rsp->completed) { + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED); + } SCH_ERR_JRET(schProcessOnDataFetched(pJob)); break; } - - SCH_ERR_JRET(schProcessOnDataFetched(pJob)); - - break; - } case TDMT_VND_DROP_TASK: { - // SHOULD NEVER REACH HERE - assert(0); - break; - } + // SHOULD NEVER REACH HERE + assert(0); + break; + } default: SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%d", msgType, SCH_GET_TASK_STATUS(pTask)); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); @@ -880,19 +895,20 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch _return: SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, code)); - + SCH_RET(code); } -int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) { - int32_t code = 0; - SSchCallbackParam *pParam = (SSchCallbackParam *)param; - SSchJob * pJob = NULL; - SSchTask * pTask = NULL; +int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode) { + int32_t code = 0; + SSchCallbackParam *pParam = (SSchCallbackParam *)param; + SSchJob *pJob = NULL; + SSchTask *pTask = NULL; + SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId)); if (NULL == job || NULL == (*job)) { - qError("QID:%" PRIx64 " taosHashGet queryId not exist, may be dropped", pParam->queryId); + qError("QID:%"PRIx64" taosHashGet queryId not exist, may be dropped", pParam->queryId); SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); } @@ -902,13 +918,13 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in int32_t s = taosHashGetSize(pJob->execTasks); if (s <= 0) { - qError("QID:%" PRIx64 ",TID:%" PRId64 " no task in execTask list", pParam->queryId, pParam->taskId); + qError("QID:%"PRIx64",TID:%"PRId64" no task in execTask list", pParam->queryId, pParam->taskId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } SSchTask **task = taosHashGet(pJob->execTasks, &pParam->taskId, sizeof(pParam->taskId)); if (NULL == task || NULL == (*task)) { - qError("QID:%" PRIx64 ",TID:%" PRId64 " taosHashGet taskId not exist", pParam->queryId, pParam->taskId); + qError("QID:%"PRIx64",TID:%"PRId64" taosHashGet taskId not exist", pParam->queryId, pParam->taskId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -927,29 +943,29 @@ _return: SCH_RET(code); } -int32_t schHandleSubmitCallback(void *param, const SDataBuf *pMsg, int32_t code) { +int32_t schHandleSubmitCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_SUBMIT_RSP, code); } -int32_t schHandleCreateTableCallback(void *param, const SDataBuf *pMsg, int32_t code) { +int32_t schHandleCreateTableCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_CREATE_TABLE_RSP, code); } -int32_t schHandleQueryCallback(void *param, const SDataBuf *pMsg, int32_t code) { +int32_t schHandleQueryCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_QUERY_RSP, code); } -int32_t schHandleFetchCallback(void *param, const SDataBuf *pMsg, int32_t code) { +int32_t schHandleFetchCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_FETCH_RSP, code); } -int32_t schHandleReadyCallback(void *param, const SDataBuf *pMsg, int32_t code) { +int32_t schHandleReadyCallback(void* param, const SDataBuf* pMsg, int32_t code) { return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code); } -int32_t schHandleDropCallback(void *param, const SDataBuf *pMsg, int32_t code) { +int32_t schHandleDropCallback(void* param, const SDataBuf* pMsg, int32_t code) { SSchCallbackParam *pParam = (SSchCallbackParam *)param; - qDebug("QID:%" PRIx64 ",TID:%" PRIx64 " drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code); + qDebug("QID:%"PRIx64",TID:%"PRIx64" drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code); } int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { @@ -957,16 +973,16 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { case TDMT_VND_CREATE_TABLE: *fp = schHandleCreateTableCallback; break; - case TDMT_VND_SUBMIT: + case TDMT_VND_SUBMIT: *fp = schHandleSubmitCallback; break; - case TDMT_VND_QUERY: + case TDMT_VND_QUERY: *fp = schHandleQueryCallback; break; - case TDMT_VND_RES_READY: + case TDMT_VND_RES_READY: *fp = schHandleReadyCallback; break; - case TDMT_VND_FETCH: + case TDMT_VND_FETCH: *fp = schHandleFetchCallback; break; case TDMT_VND_DROP_TASK: @@ -980,18 +996,18 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { return TSDB_CODE_SUCCESS; } -int32_t schAsyncSendMsg(void *transport, SEpSet *epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, - uint32_t msgSize) { - int32_t code = 0; - SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); + +int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t tId, int32_t msgType, void *msg, uint32_t msgSize) { + int32_t code = 0; + SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { - qError("QID:%" PRIx64 ",TID:%" PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SMsgSendInfo)); + qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SMsgSendInfo)); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } SSchCallbackParam *param = calloc(1, sizeof(SSchCallbackParam)); if (NULL == param) { - qError("QID:%" PRIx64 ",TID:%" PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SSchCallbackParam)); + qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SSchCallbackParam)); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -1006,19 +1022,19 @@ int32_t schAsyncSendMsg(void *transport, SEpSet *epSet, uint64_t qId, uint64_t t pMsgSendInfo->msgInfo.len = msgSize; pMsgSendInfo->msgType = msgType; pMsgSendInfo->fp = fp; - - int64_t transporterId = 0; - + + int64_t transporterId = 0; + code = asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo); if (code) { - qError("QID:%" PRIx64 ",TID:%" PRIx64 " asyncSendMsgToServer failed, code:%x", qId, tId, code); + qError("QID:%"PRIx64 ",TID:%"PRIx64 " asyncSendMsgToServer failed, code:%x", qId, tId, code); SCH_ERR_JRET(code); } - + return TSDB_CODE_SUCCESS; _return: - + tfree(param); tfree(pMsgSendInfo); @@ -1028,7 +1044,7 @@ _return: void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { epSet->inUse = addr->inUse; epSet->numOfEps = addr->numOfEps; - + for (int8_t i = 0; i < epSet->numOfEps; ++i) { strncpy(epSet->fqdn[i], addr->epAddr[i].fqdn, sizeof(addr->epAddr[i].fqdn)); epSet->port[i] = addr->epAddr[i].port; @@ -1037,19 +1053,19 @@ void schConvertAddrToEpSet(SQueryNodeAddr *addr, SEpSet *epSet) { int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) { uint32_t msgSize = 0; - void * msg = NULL; - int32_t code = 0; - bool isCandidateAddr = false; - SEpSet epSet; + void *msg = NULL; + int32_t code = 0; + bool isCandidateAddr = false; + SEpSet epSet; if (NULL == addr) { addr = taosArrayGet(pTask->candidateAddrs, atomic_load_8(&pTask->candidateIdx)); - + isCandidateAddr = true; } schConvertAddrToEpSet(addr, &epSet); - + switch (msgType) { case TDMT_VND_CREATE_TABLE: case TDMT_VND_SUBMIT: { @@ -1075,7 +1091,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SSubQueryMsg *pMsg = msg; pMsg->header.vgId = htonl(addr->nodeId); - + pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(pJob->queryId); pMsg->taskId = htobe64(pTask->taskId); @@ -1094,12 +1110,12 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, } SResReadyReq *pMsg = msg; - - pMsg->header.vgId = htonl(addr->nodeId); - - pMsg->sId = htobe64(schMgmt.sId); + + pMsg->header.vgId = htonl(addr->nodeId); + + pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(pJob->queryId); - pMsg->taskId = htobe64(pTask->taskId); + pMsg->taskId = htobe64(pTask->taskId); break; } case TDMT_VND_FETCH: { @@ -1109,31 +1125,31 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SCH_TASK_ELOG("calloc %d failed", msgSize); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - + SResFetchReq *pMsg = msg; - - pMsg->header.vgId = htonl(addr->nodeId); - - pMsg->sId = htobe64(schMgmt.sId); + + pMsg->header.vgId = htonl(addr->nodeId); + + pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(pJob->queryId); - pMsg->taskId = htobe64(pTask->taskId); + pMsg->taskId = htobe64(pTask->taskId); break; } - case TDMT_VND_DROP_TASK: { + case TDMT_VND_DROP_TASK:{ msgSize = sizeof(STaskDropReq); msg = calloc(1, msgSize); if (NULL == msg) { SCH_TASK_ELOG("calloc %d failed", msgSize); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - + STaskDropReq *pMsg = msg; - - pMsg->header.vgId = htonl(addr->nodeId); - - pMsg->sId = htobe64(schMgmt.sId); + + pMsg->header.vgId = htonl(addr->nodeId); + + pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(pJob->queryId); - pMsg->taskId = htobe64(pTask->taskId); + pMsg->taskId = htobe64(pTask->taskId); break; } default: @@ -1149,7 +1165,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, if (isCandidateAddr) { SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr)); } - + return TSDB_CODE_SUCCESS; _return: @@ -1166,24 +1182,25 @@ static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { *pStatus = status; } - return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || - status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING); + return (status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED + || status == JOB_TASK_STATUS_CANCELLING || status == JOB_TASK_STATUS_DROPPING); } + // Note: no more error processing, handled in function internal int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { - int8_t status = 0; + int8_t status = 0; int32_t code = 0; - + if (schJobNeedToStop(pJob, &status)) { SCH_TASK_ELOG("no need to launch task cause of job status, job status:%d", status); - + code = atomic_load_32(&pJob->errCode); SCH_ERR_RET(code); - + SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); } - + SSubplan *plan = pTask->plan; if (NULL == pTask->msg) { @@ -1195,7 +1212,7 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { // printf("physical plan:%s\n", pTask->msg); } - + SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); // NOTE: race condition: the task should be put into the hash table before send msg to server @@ -1206,13 +1223,13 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { } SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); - + return TSDB_CODE_SUCCESS; _return: SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, code)); - + SCH_RET(code); } @@ -1220,12 +1237,12 @@ int32_t schLaunchJob(SSchJob *pJob) { SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx); SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_EXECUTING)); - + for (int32_t i = 0; i < level->taskNum; ++i) { SSchTask *pTask = taosArrayGet(level->subTasks, i); SCH_ERR_RET(schLaunchTask(pJob, pTask)); } - + return TSDB_CODE_SUCCESS; } @@ -1236,7 +1253,7 @@ void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) { } int32_t size = (int32_t)taosArrayGetSize(pTask->execAddrs); - + if (size <= 0) { SCH_TASK_DLOG("task has no exec address, no need to drop it, status:%d", SCH_GET_TASK_STATUS(pTask)); return; @@ -1260,9 +1277,9 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { if (!SCH_TASK_NO_NEED_DROP(pTask)) { schDropTaskOnExecutedNode(pJob, pTask); } - + pIter = taosHashIterate(list, pIter); - } + } } void schDropJobAllTasks(SSchJob *pJob) { @@ -1271,15 +1288,15 @@ void schDropJobAllTasks(SSchJob *pJob) { schDropTaskInHashList(pJob, pJob->failTasks); } -int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag *pDag, struct SSchJob **job, bool syncSchedule) { +int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** job, bool syncSchedule) { if (nodeList && taosArrayGetSize(nodeList) <= 0) { - qInfo("QID:%" PRIx64 " input nodeList is empty", pDag->queryId); + qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId); } - int32_t code = 0; + int32_t code = 0; SSchJob *pJob = calloc(1, sizeof(SSchJob)); if (NULL == pJob) { - qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob)); + qError("QID:%"PRIx64" calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob)); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -1289,22 +1306,19 @@ int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag *pDag, struc SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob)); - pJob->execTasks = - taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + pJob->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); if (NULL == pJob->execTasks) { SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - pJob->succTasks = - taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + pJob->succTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); if (NULL == pJob->succTasks) { SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - pJob->failTasks = - taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + pJob->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); if (NULL == pJob->failTasks) { SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1324,11 +1338,11 @@ int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag *pDag, struc } pJob->status = JOB_TASK_STATUS_NOT_START; - + SCH_ERR_JRET(schLaunchJob(pJob)); *(SSchJob **)job = pJob; - + if (syncSchedule) { SCH_JOB_DLOG("will wait for rsp now, job status:%d", SCH_GET_JOB_STATUS(pJob)); tsem_wait(&pJob->rspSem); @@ -1341,18 +1355,20 @@ int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag *pDag, struc _return: *(SSchJob **)job = NULL; - + scheduleFreeJob(pJob); - + SCH_RET(code); } int32_t schCancelJob(SSchJob *pJob) { - // TODO + //TODO + + //TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST - // TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST } + int32_t schedulerInit(SSchedulerCfg *cfg) { if (schMgmt.jobs) { qError("scheduler already initialized"); @@ -1361,7 +1377,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { if (cfg) { schMgmt.cfg = *cfg; - + if (schMgmt.cfg.maxJobNum == 0) { schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; } @@ -1369,8 +1385,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; } - schMgmt.jobs = - taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); if (NULL == schMgmt.jobs) { qError("init schduler jobs failed, num:%u", schMgmt.cfg.maxJobNum); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1381,12 +1396,12 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR); } - qInfo("scheduler %" PRIx64 " initizlized, maxJob:%u", schMgmt.sId, schMgmt.cfg.maxJobNum); - + qInfo("scheduler %"PRIx64" initizlized, maxJob:%u", schMgmt.sId, schMgmt.cfg.maxJobNum); + return TSDB_CODE_SUCCESS; } -int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag *pDag, struct SSchJob **pJob, SQueryResult *pRes) { +int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -1399,11 +1414,11 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag *pDag, stru pRes->code = atomic_load_32(&job->errCode); pRes->numOfRows = job->resNumOfRows; - + return TSDB_CODE_SUCCESS; } -int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag *pDag, struct SSchJob **pJob) { +int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -1412,7 +1427,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag *pDag, return TSDB_CODE_SUCCESS; } -int32_t schedulerConvertDagToTaskList(SQueryDag *pDag, SArray **pTasks) { +int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) { if (NULL == pDag || pDag->numOfSubplans <= 0 || taosArrayGetSize(pDag->pSubplans) <= 0) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -1437,10 +1452,10 @@ int32_t schedulerConvertDagToTaskList(SQueryDag *pDag, SArray **pTasks) { } STaskInfo tInfo = {0}; - char * msg = NULL; - int32_t msgLen = 0; - int32_t code = 0; - + char *msg = NULL; + int32_t msgLen = 0; + int32_t code = 0; + for (int32_t i = 0; i < taskNum; ++i) { SSubplan *plan = taosArrayGetP(plans, i); @@ -1481,7 +1496,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag *pDag, SArray **pTasks) { *pTasks = info; info = NULL; - + _return: schedulerFreeTaskList(info); @@ -1502,7 +1517,7 @@ int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - int32_t msgSize = src->msg->contentLen + sizeof(*src->msg); + int32_t msgSize = src->msg->contentLen + sizeof(*src->msg); STaskInfo info = {0}; info.addr = src->addr; @@ -1535,7 +1550,8 @@ _return: SCH_RET(code); } -int32_t scheduleFetchRows(SSchJob *pJob, void **pData) { + +int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { if (NULL == pJob || NULL == pData) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -1577,7 +1593,7 @@ int32_t scheduleFetchRows(SSchJob *pJob, void **pData) { SCH_JOB_ELOG("job failed or dropping, status:%d", status); SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); } - + if (pJob->res && ((SRetrieveTableRsp *)pJob->res)->completed) { SCH_ERR_JRET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED)); } @@ -1629,7 +1645,7 @@ void scheduleFreeJob(void *job) { SSchJob *pJob = job; uint64_t queryId = pJob->queryId; - bool setJobFree = false; + bool setJobFree = false; if (SCH_GET_JOB_STATUS(pJob) > 0) { if (0 != taosHashRemove(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId))) { @@ -1664,33 +1680,33 @@ void scheduleFreeJob(void *job) { schDropJobAllTasks(pJob); } - pJob->subPlans = NULL; // it is a reference to pDag->pSubplans - + pJob->subPlans = NULL; // it is a reference to pDag->pSubplans + int32_t numOfLevels = taosArrayGetSize(pJob->levels); - for (int32_t i = 0; i < numOfLevels; ++i) { + for(int32_t i = 0; i < numOfLevels; ++i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks); - for (int32_t j = 0; j < numOfTasks; ++j) { - SSchTask *pTask = taosArrayGet(pLevel->subTasks, j); + for(int32_t j = 0; j < numOfTasks; ++j) { + SSchTask* pTask = taosArrayGet(pLevel->subTasks, j); schFreeTask(pTask); } taosArrayDestroy(pLevel->subTasks); } - + taosHashCleanup(pJob->execTasks); taosHashCleanup(pJob->failTasks); taosHashCleanup(pJob->succTasks); - + taosArrayDestroy(pJob->levels); taosArrayDestroy(pJob->nodeList); tfree(pJob->res); - + tfree(pJob); - qDebug("QID:%" PRIx64 " job freed", queryId); + qDebug("QID:%"PRIx64" job freed", queryId); } void schedulerFreeTaskList(SArray *taskList) { @@ -1706,10 +1722,11 @@ void schedulerFreeTaskList(SArray *taskList) { taosArrayDestroy(taskList); } - + void schedulerDestroy(void) { if (schMgmt.jobs) { - taosHashCleanup(schMgmt.jobs); // TODO + taosHashCleanup(schMgmt.jobs); //TODO schMgmt.jobs = NULL; } } +