From e5834b8e0018ac920af7e28f6e95a1d680bbb4e9 Mon Sep 17 00:00:00 2001 From: dapan Date: Sat, 8 Jan 2022 18:15:50 +0800 Subject: [PATCH] feature/qnode --- source/libs/scheduler/src/scheduler.c | 150 +++++++++++++++++++------- 1 file changed, 110 insertions(+), 40 deletions(-) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 3f6d0f1702..3f8c75a78c 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -20,6 +20,73 @@ static SSchedulerMgmt schMgmt = {0}; +int32_t schValidateStatus(SSchJob *pJob, int8_t oriStatus, int8_t newStatus) { + int32_t code = 0; + + if (oriStatus == newStatus) { + SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + switch (oriStatus) { + case JOB_TASK_STATUS_NULL: + if (newStatus != JOB_TASK_STATUS_EXECUTING + && newStatus != JOB_TASK_STATUS_FAILED + && newStatus != JOB_TASK_STATUS_NOT_START) { + SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + break; + case JOB_TASK_STATUS_NOT_START: + if (newStatus != JOB_TASK_STATUS_CANCELLED) { + SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + break; + case JOB_TASK_STATUS_EXECUTING: + if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED + && newStatus != JOB_TASK_STATUS_FAILED + && newStatus != JOB_TASK_STATUS_CANCELLING + && newStatus != JOB_TASK_STATUS_CANCELLED + && newStatus != JOB_TASK_STATUS_DROPPING) { + SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + break; + case JOB_TASK_STATUS_PARTIAL_SUCCEED: + if (newStatus != JOB_TASK_STATUS_EXECUTING + && newStatus != JOB_TASK_STATUS_SUCCEED + && newStatus != JOB_TASK_STATUS_CANCELLED) { + SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + break; + case JOB_TASK_STATUS_SUCCEED: + case JOB_TASK_STATUS_FAILED: + case JOB_TASK_STATUS_CANCELLING: + if (newStatus != JOB_TASK_STATUS_CANCELLED) { + SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + break; + case JOB_TASK_STATUS_CANCELLED: + case JOB_TASK_STATUS_DROPPING: + SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + break; + + default: + qError("invalid task status:%d", oriStatus); + return TSDB_CODE_QRY_APP_ERROR; + } + + return TSDB_CODE_SUCCESS; + +_return: + + SCH_JOB_ELOG("invalid job status update, from %d to %d", oriStatus, newStatus); + SCH_ERR_RET(code); +} + + int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { for (int32_t i = 0; i < pJob->levelNum; ++i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); @@ -365,14 +432,21 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *job) { return TSDB_CODE_SUCCESS; } -int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) { - job->status = JOB_TASK_STATUS_FAILED; - job->errCode = errCode; +int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { + int8_t status = SCH_GET_JOB_STATUS(pJob); - atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); + if (schValidateStatus(pJob, status, JOB_TASK_STATUS_FAILED)) { + SCH_ERR_RET(atomic_load_32(&pJob->errCode)); + } + + SCH_SET_JOB_STATUS(pJob, JOB_TASK_STATUS_FAILED); + + atomic_store_32(&pJob->errCode, errCode); - if (job->userFetch || ((!SCH_JOB_NEED_FETCH(&job->attr)) && job->attr.syncSchedule)) { - tsem_post(&job->rspSem); + atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); + + if (pJob->userFetch || ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule)) { + tsem_post(&pJob->rspSem); } return TSDB_CODE_SUCCESS; @@ -670,13 +744,13 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t int32_t code = 0; SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); if (NULL == pMsgSendInfo) { - qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo)); + qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SMsgSendInfo)); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } SSchCallbackParam *param = calloc(1, sizeof(SSchCallbackParam)); if (NULL == param) { - qError("calloc %d failed", (int32_t)sizeof(SSchCallbackParam)); + qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", qId, tId, (int32_t)sizeof(SSchCallbackParam)); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -694,11 +768,13 @@ int32_t schAsyncSendMsg(void *transport, SEpSet* epSet, uint64_t qId, uint64_t t pMsgSendInfo->fp = fp; int64_t transporterId = 0; + SCH_ERR_JRET(asyncSendMsgToServer(transport, epSet, &transporterId, pMsgSendInfo)); return TSDB_CODE_SUCCESS; _return: + tfree(param); tfree(pMsgSendInfo); @@ -720,35 +796,31 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { uint32_t msgSize = 0; void *msg = NULL; int32_t code = 0; + SEpSet epSet; + + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); + + schConvertAddrToEpSet(addr, &epSet); switch (msgType) { case TDMT_VND_CREATE_TABLE: case TDMT_VND_SUBMIT: { - if (NULL == pTask->msg || pTask->msgLen <= 0) { - qError("submit msg is NULL"); - SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); - } - msgSize = pTask->msgLen; msg = pTask->msg; break; } case TDMT_VND_QUERY: { - if (NULL == pTask->msg) { - qError("query msg is NULL"); - SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); - } - msgSize = sizeof(SSubQueryMsg) + pTask->msgLen; msg = calloc(1, msgSize); if (NULL == msg) { - qError("calloc %d failed", msgSize); + SCH_TASK_ELOG("calloc %d failed", msgSize); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } SSubQueryMsg *pMsg = msg; - pMsg->header.vgId = htonl(pTask->plan->execNode.nodeId); + pMsg->header.vgId = htonl(addr->nodeId); + pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(pJob->queryId); pMsg->taskId = htobe64(pTask->taskId); @@ -760,32 +832,31 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { msgSize = sizeof(SResReadyMsg); msg = calloc(1, msgSize); if (NULL == msg) { - qError("calloc %d failed", msgSize); + SCH_TASK_ELOG("calloc %d failed", msgSize); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } SResReadyMsg *pMsg = msg; - pMsg->header.vgId = htonl(pTask->plan->execNode.nodeId); + pMsg->header.vgId = htonl(addr->nodeId); + pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(pJob->queryId); pMsg->taskId = htobe64(pTask->taskId); break; } case TDMT_VND_FETCH: { - if (NULL == pTask) { - SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); - } msgSize = sizeof(SResFetchMsg); msg = calloc(1, msgSize); if (NULL == msg) { - qError("calloc %d failed", msgSize); + SCH_TASK_ELOG("calloc %d failed", msgSize); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } SResFetchMsg *pMsg = msg; - pMsg->header.vgId = htonl(pTask->plan->execNode.nodeId); + pMsg->header.vgId = htonl(addr->nodeId); + pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(pJob->queryId); pMsg->taskId = htobe64(pTask->taskId); @@ -795,28 +866,25 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { msgSize = sizeof(STaskDropMsg); msg = calloc(1, msgSize); if (NULL == msg) { - qError("calloc %d failed", msgSize); + SCH_TASK_ELOG("calloc %d failed", msgSize); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } STaskDropMsg *pMsg = msg; - pMsg->header.vgId = htonl(pTask->plan->execNode.nodeId); + pMsg->header.vgId = htonl(addr->nodeId); + pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(pJob->queryId); pMsg->taskId = htobe64(pTask->taskId); break; } default: - qError("unknown msg type:%d", msgType); + SCH_TASK_ELOG("unknown msg type:%d", msgType); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); break; } - SEpSet epSet; - SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); - - schConvertAddrToEpSet(addr, &epSet); SCH_ERR_JRET(schAsyncSendMsg(pJob->transport, &epSet, pJob->queryId, pTask->taskId, msgType, msg, msgSize)); @@ -844,7 +912,7 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { if (schJobNeedToStop(pJob, &status)) { SCH_TASK_ELOG("no need to launch task cause of job status, job status:%d", status); - SCH_ERR_RET(pJob->errCode); + SCH_ERR_RET(atomic_load_32(&pJob->errCode)); } SSubplan *plan = pTask->plan; @@ -860,9 +928,11 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); // NOTE: race condition: the task should be put into the hash table before send msg to server - SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask)); + if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING) { + SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask)); - SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING); + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXECUTING); + } SCH_ERR_JRET(schBuildAndSendMsg(pJob, pTask, plan->msgType)); @@ -1031,7 +1101,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void SSchJob *job = *(SSchJob **)pJob; - pRes->code = job->errCode; + pRes->code = atomic_load_32(&job->errCode); pRes->numOfRows = job->resNumOfRows; return TSDB_CODE_SUCCESS; @@ -1061,7 +1131,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) { if (job->status == JOB_TASK_STATUS_FAILED) { job->res = NULL; - SCH_RET(job->errCode); + SCH_RET(atomic_load_32(&job->errCode)); } if (job->status == JOB_TASK_STATUS_SUCCEED) { @@ -1081,7 +1151,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) { tsem_wait(&job->rspSem); if (job->status == JOB_TASK_STATUS_FAILED) { - code = job->errCode; + code = atomic_load_32(&job->errCode); } if (job->res && ((SRetrieveTableRsp *)job->res)->completed) {