diff --git a/include/os/osMath.h b/include/os/osMath.h index 3c05d15397..edbfa935ce 100644 --- a/include/os/osMath.h +++ b/include/os/osMath.h @@ -32,9 +32,9 @@ extern "C" { #define TSWAP(a, b) \ do { \ char *__tmp = (char*)alloca(sizeof(a)); \ - memcpy(__tmp, &(a), sizeof(a)); \ - memcpy(&(a), &(b), sizeof(a)); \ - memcpy(&(b), __tmp, sizeof(a)); \ + (void)memcpy(__tmp, &(a), sizeof(a)); \ + (void)memcpy(&(a), &(b), sizeof(a)); \ + (void)memcpy(&(b), __tmp, sizeof(a)); \ } while (0) #ifdef WINDOWS diff --git a/include/os/osMemory.h b/include/os/osMemory.h index 6166f1dc07..c6a5ce27c4 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -54,6 +54,7 @@ void taosMemoryTrim(int32_t size); void *taosMemoryMallocAlign(uint32_t alignment, int64_t size); #define TAOS_MEMSET(_s, _c, _n) ((void)memset(_s, _c, _n)) +#define TAOS_MEMCPY(_d, _s, _n) ((void)memcpy(_d, _s, _n)) #define taosMemoryFreeClear(ptr) \ do { \ diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 17476072bb..5ea79c6ae9 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -336,7 +336,7 @@ extern SSchedulerMgmt schMgmt; ((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && \ (!SCH_IS_DATA_BIND_QRY_TASK(_task))) -#define SCH_UPDATE_REDIRECT_CODE(job, _code) atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code) +#define SCH_UPDATE_REDIRECT_CODE(job, _code) (void)atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code) #define SCH_GET_REDIRECT_CODE(job, _code) \ (((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode) @@ -413,7 +413,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_LOG_TASK_START_TS(_task) \ do { \ int64_t us = taosGetTimestampUs(); \ - taosArrayPush((_task)->profile.execTime, &us); \ + (void)taosArrayPush((_task)->profile.execTime, &us); \ if (0 == (_task)->execId) { \ (_task)->profile.startTs = us; \ } \ @@ -422,7 +422,10 @@ extern SSchedulerMgmt schMgmt; #define SCH_LOG_TASK_WAIT_TS(_task) \ do { \ int64_t us = taosGetTimestampUs(); \ - (_task)->profile.waitTime += us - *(int64_t *)taosArrayGet((_task)->profile.execTime, (_task)->execId); \ + int64_t* startus = (int64_t*)taosArrayGet((_task)->profile.execTime, (_task)->execId); \ + if (NULL != startus) { \ + (_task)->profile.waitTime += us - *startus; \ + } \ } while (0) #define SCH_LOG_TASK_END_TS(_task) \ @@ -430,7 +433,9 @@ extern SSchedulerMgmt schMgmt; int64_t us = taosGetTimestampUs(); \ int32_t idx = (_task)->execId % (_task)->maxExecTimes; \ int64_t *startts = taosArrayGet((_task)->profile.execTime, (_task)->execId); \ - *startts = us - *startts; \ + if (NULL != startts) { \ + *startts = us - *startts; \ + } \ (_task)->profile.endTs = us; \ } while (0) @@ -538,7 +543,7 @@ void schCleanClusterHb(void *pTrans); int32_t schLaunchTask(SSchJob *job, SSchTask *task); int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask); int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType, void *param); -SSchJob *schAcquireJob(int64_t refId); +int32_t schAcquireJob(int64_t refId, SSchJob **ppJob); int32_t schReleaseJob(int64_t refId); void schFreeFlowCtrl(SSchJob *pJob); int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel); @@ -578,7 +583,7 @@ int32_t schJobFetchRows(SSchJob *pJob); int32_t schJobFetchRowsA(SSchJob *pJob); int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId); int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList); -char *schDumpEpSet(SEpSet *pEpSet); +int32_t schDumpEpSet(SEpSet *pEpSet, char** ppRes); char *schGetOpStr(SCH_OP_TYPE type); int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync); int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq); diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index a8e2e79aee..58a0706223 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -1165,7 +1165,9 @@ int32_t schProcessOnCbBegin(SSchJob **job, SSchTask **task, uint64_t qId, int64_ int8_t status = 0; SSchTask *pTask = NULL; - SSchJob *pJob = schAcquireJob(rId); + SSchJob *pJob = NULL; + + (void)schAcquireJob(rId, &pJob); if (NULL == pJob) { qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId); SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index d0dc04d6b4..08a8f684f5 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -113,7 +113,7 @@ int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rs } atomic_store_ptr(&pJob->fetchRes, rsp); - atomic_add_fetch_64(&pJob->resNumOfRows, htobe64(rsp->numOfRows)); + (void)atomic_add_fetch_64(&pJob->resNumOfRows, htobe64(rsp->numOfRows)); if (rsp->completed) { SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); @@ -166,13 +166,27 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD SCH_LOCK(SCH_WRITE, &pJob->resLock); if (NULL == pJob->execRes.res) { pJob->execRes.res = (void*)taosArrayInit(batchRsp.nRsps, POINTER_BYTES); + if (NULL == pJob->execRes.res) { + code = terrno; + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); + + tDecoderClear(&coder); + SCH_ERR_JRET(code); + } + pJob->execRes.msgType = TDMT_VND_CREATE_TABLE; } for (int32_t i = 0; i < batchRsp.nRsps; ++i) { SVCreateTbRsp *rsp = batchRsp.pRsps + i; if (rsp->pMeta) { - taosArrayPush((SArray*)pJob->execRes.res, &rsp->pMeta); + if (NULL == taosArrayPush((SArray*)pJob->execRes.res, &rsp->pMeta)) { + code = terrno; + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); + + tDecoderClear(&coder); + SCH_ERR_JRET(code); + } } if (TSDB_CODE_SUCCESS != rsp->code) { @@ -264,7 +278,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD SCH_ERR_JRET(code); } - atomic_add_fetch_64(&pJob->resNumOfRows, rsp->affectedRows); + (void)atomic_add_fetch_64(&pJob->resNumOfRows, rsp->affectedRows); int32_t createTbRspNum = taosArrayGetSize(rsp->aCreateTbRsp); SCH_TASK_DLOG("submit succeed, affectedRows:%d, createTbRspNum:%d", rsp->affectedRows, createTbRspNum); @@ -275,7 +289,12 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD SSubmitRsp2 *sum = pJob->execRes.res; sum->affectedRows += rsp->affectedRows; if (sum->aCreateTbRsp) { - taosArrayAddAll(sum->aCreateTbRsp, rsp->aCreateTbRsp); + if (NULL == taosArrayAddAll(sum->aCreateTbRsp, rsp->aCreateTbRsp)) { + code = terrno; + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); + SCH_ERR_JRET(code); + } + taosArrayDestroy(rsp->aCreateTbRsp); } else { TSWAP(sum->aCreateTbRsp, rsp->aCreateTbRsp); @@ -313,10 +332,14 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD SDecoder coder = {0}; SVDeleteRsp rsp = {0}; tDecoderInit(&coder, msg, msgSize); - tDecodeSVDeleteRsp(&coder, &rsp); + if (tDecodeSVDeleteRsp(&coder, &rsp) < 0) { + code = terrno; + tDecoderClear(&coder); + SCH_ERR_JRET(code); + } tDecoderClear(&coder); - atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows); + (void)atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows); SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows); } @@ -351,7 +374,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD SCH_ERR_JRET(schSaveJobExecRes(pJob, &rsp)); - atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows); + (void)atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows); taosMemoryFreeClear(msg); @@ -479,7 +502,7 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) { qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId, code); // called if drop task rsp received code - rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); + (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error if (pMsg) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); @@ -501,7 +524,7 @@ int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) { int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) { SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param; - rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); + (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error qDebug("handle %p is broken", pMsg->handle); @@ -531,7 +554,7 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) { if (code) { qError("hb rsp error:%s", tstrerror(code)); - rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); + (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error SCH_ERR_JRET(code); } @@ -585,9 +608,14 @@ int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bo param->head.isHbParam = true; SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); + if (NULL == addr) { + taosMemoryFree(param); + SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum: %d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs)); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } param->nodeEpId.nodeId = addr->nodeId; SEp *pEp = SCH_GET_CUR_EP(addr); - strcpy(param->nodeEpId.ep.fqdn, pEp->fqdn); + TAOS_STRCPY(param->nodeEpId.ep.fqdn, pEp->fqdn); param->nodeEpId.ep.port = pEp->port; param->pTrans = trans->pTrans; *pParam = param; @@ -712,7 +740,7 @@ int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) { int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) { int32_t code = 0; - memcpy(pDst, pSrc, sizeof(SRpcCtx)); + TAOS_MEMCPY(pDst, pSrc, sizeof(SRpcCtx)); pDst->brokenVal.val = NULL; pDst->args = NULL; @@ -760,7 +788,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { SQueryNodeEpId epId = {0}; epId.nodeId = addr->nodeId; - memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp)); + TAOS_MEMCPY(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp)); pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); if (NULL == pCtx->args) { @@ -877,7 +905,7 @@ int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHe SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - memcpy(dst, pSrc, sizeof(*dst)); + TAOS_MEMCPY(dst, pSrc, sizeof(*dst)); *pDst = (SSchCallbackParamHeader *)dst; return TSDB_CODE_SUCCESS; @@ -889,7 +917,7 @@ int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHe SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - memcpy(dst, pSrc, sizeof(*dst)); + TAOS_MEMCPY(dst, pSrc, sizeof(*dst)); *pDst = (SSchCallbackParamHeader *)dst; return TSDB_CODE_SUCCESS; @@ -904,7 +932,7 @@ int32_t schCloneSMsgSendInfo(void *src, void **dst) { SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - memcpy(pDst, pSrc, sizeof(*pSrc)); + TAOS_MEMCPY(pDst, pSrc, sizeof(*pSrc)); pDst->param = NULL; SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param)); @@ -948,6 +976,10 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery if (isHb && persistHandle && trans->pHandle == 0) { trans->pHandle = rpcAllocHandle(); + if (NULL == trans->pHandle) { + SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", terrno); + SCH_ERR_JRET(terrno); + } } if (pJob && pTask) { @@ -1000,7 +1032,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction) { req.header.vgId = nodeEpId->nodeId; req.sId = schMgmt.sId; - memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId)); + TAOS_MEMCPY(&req.epId, nodeEpId, sizeof(SQueryNodeEpId)); SCH_LOCK(SCH_READ, &schMgmt.hbLock); SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId)); @@ -1013,7 +1045,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction) { SCH_LOCK(SCH_WRITE, &hb->lock); code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx); - memcpy(&trans, &hb->trans, sizeof(trans)); + TAOS_MEMCPY(&trans, &hb->trans, sizeof(trans)); SCH_UNLOCK(SCH_WRITE, &hb->lock); SCH_UNLOCK(SCH_READ, &schMgmt.hbLock); @@ -1039,7 +1071,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction) { SQueryNodeAddr addr = {.nodeId = nodeEpId->nodeId}; addr.epSet.inUse = 0; addr.epSet.numOfEps = 1; - memcpy(&addr.epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep)); + TAOS_MEMCPY(&addr.epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep)); code = schAsyncSendMsg(NULL, NULL, &trans, &addr, msgType, msg, msgSize, true, &rpcCtx); msg = NULL; @@ -1064,6 +1096,11 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, if (NULL == addr) { addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); + if (NULL == addr) { + SCH_TASK_ELOG("fail to get condidateAddr, candidateIdx %d, totalNum: %d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs)); + SCH_ERR_JRET(terrno); + } + isCandidateAddr = true; SCH_TASK_DLOG("target candidateIdx %d, epInUse %d/%d", pTask->candidateIdx, addr->epSet.inUse, addr->epSet.numOfEps); @@ -1082,7 +1119,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - memcpy(msg, pTask->msg, msgSize); + TAOS_MEMCPY(msg, pTask->msg, msgSize); break; } @@ -1098,13 +1135,21 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, req.msg = pTask->msg; req.source = pJob->source; msgSize = tSerializeSVDeleteReq(NULL, 0, &req); + if (msgSize < 0) { + SCH_TASK_ELOG("tSerializeSVDeleteReq failed, code:%x", terrno); + SCH_ERR_JRET(terrno); + } msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { SCH_TASK_ELOG("calloc %d failed", msgSize); - SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - tSerializeSVDeleteReq(msg, msgSize, &req); + msgSize = tSerializeSVDeleteReq(msg, msgSize, &req); + if (msgSize < 0) { + SCH_TASK_ELOG("tSerializeSVDeleteReq second failed, code:%x", terrno); + SCH_ERR_JRET(terrno); + } break; } case TDMT_SCH_QUERY: @@ -1221,7 +1266,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, req.sId = schMgmt.sId; req.header.vgId = addr->nodeId; req.epId.nodeId = addr->nodeId; - memcpy(&req.epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp)); + TAOS_MEMCPY(&req.epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp)); msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req); if (msgSize < 0) { diff --git a/source/libs/scheduler/src/schStatus.c b/source/libs/scheduler/src/schStatus.c index d37393137f..f24ee74101 100644 --- a/source/libs/scheduler/src/schStatus.c +++ b/source/libs/scheduler/src/schStatus.c @@ -43,7 +43,7 @@ int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) { SCH_RET(schProcessOnJobFailure(pJob, (param ? *(int32_t*)param : 0))); break; case JOB_TASK_STATUS_DROP: - schProcessOnJobDropped(pJob, *(int32_t*)param); + (void)schProcessOnJobDropped(pJob, *(int32_t*)param); // ignore error if (taosRemoveRef(schMgmt.jobRef, pJob->refId)) { SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, pJob->refId); @@ -65,7 +65,8 @@ _return: } int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SSchedulerReq* pReq) { - SSchJob* pJob = schAcquireJob(jobId); + SSchJob* pJob = NULL; + (void)schAcquireJob(jobId, &pJob); if (NULL == pJob) { qDebug("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, jobId); SCH_ERR_RET(TSDB_CODE_SCH_JOB_NOT_EXISTS); @@ -90,7 +91,7 @@ int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq code = pJob->errCode; } - schReleaseJob(pJob->refId); + (void)schReleaseJob(pJob->refId); // ignore error return code; } diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index c317a63ce1..6dd6aa9aae 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -165,7 +165,7 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId)); } - schUpdateTaskExecNode(pJob, pTask, handle, execId); + SCH_ERR_RET(schUpdateTaskExecNode(pJob, pTask, handle, execId)); if ((execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) { // ignore it SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId, @@ -297,7 +297,11 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { for (int32_t i = 0; i < parentNum; ++i) { SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i); - + if (NULL == parent) { + SCH_TASK_ELOG("fail to get task %d parent, parentNum: %d", i, parentNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + SCH_LOCK(SCH_WRITE, &parent->planLock); SDownstreamSourceNode source = { .type = QUERY_NODE_DOWNSTREAM_SOURCE, @@ -308,9 +312,14 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { .fetchMsgType = SCH_FETCH_TYPE(pTask), .localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask), }; - qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source); + code = qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source); + if (TSDB_CODE_SUCCESS != code) { + SCH_TASK_ELOG("qSetSubplanExecutionNode failed, groupId: %d", pTask->plan->id.groupId); + } SCH_UNLOCK(SCH_WRITE, &parent->planLock); + SCH_ERR_RET(code); + int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1); if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) { @@ -413,16 +422,16 @@ void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) { schDropTaskOnExecNode(pJob, pTask); if (pTask->delayTimer) { - taosTmrStopA(&pTask->delayTimer); + (void)taosTmrStopA(&pTask->delayTimer); // ignore error } taosHashClear(pTask->execNodes); - schRemoveTaskFromExecList(pJob, pTask); + (void)schRemoveTaskFromExecList(pJob, pTask); // ignore error schDeregisterTaskHb(pJob, pTask); taosMemoryFreeClear(pTask->msg); pTask->msgLen = 0; pTask->lastMsgType = 0; pTask->childReady = 0; - memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr)); + TAOS_MEMSET(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr)); } int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { @@ -443,11 +452,21 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet)); } else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) { SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); + if (NULL == addr) { + SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs)); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + SEp *pEp = &addr->epSet.eps[addr->epSet.inUse]; SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode)); } else { SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); + if (NULL == addr) { + SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs)); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + SCH_SWITCH_EPSET(addr); SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps); } @@ -476,7 +495,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 for (int32_t i = 0; i < childrenNum; ++i) { SSchTask *pChild = taosArrayGetP(pTask->children, i); SCH_LOCK_TASK(pChild); - schDoTaskRedirect(pJob, pChild, NULL, rspCode); + (void)schDoTaskRedirect(pJob, pChild, NULL, rspCode); // error handled internal SCH_UNLOCK_TASK(pChild); } @@ -494,18 +513,23 @@ int32_t schResetTaskSetLevelInfo(SSchJob *pJob, SSchTask *pTask) { atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum)); if (SCH_GET_TASK_STATUS(pTask) >= JOB_TASK_STATUS_PART_SUCC) { - atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1); + (void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1); } - atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1); + (void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1); int32_t childrenNum = taosArrayGetSize(pTask->children); for (int32_t i = 0; i < childrenNum; ++i) { SSchTask *pChild = taosArrayGetP(pTask->children, i); + if (NULL == pChild) { + SCH_TASK_ELOG("fail to get the %dth child, childrenNum:%d", i, childrenNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + SCH_LOCK_TASK(pChild); pLevel = pChild->level; - atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1); - atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1); + (void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1); + (void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1); SCH_UNLOCK_TASK(pChild); } @@ -711,9 +735,9 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo } int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { - atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1); + (void)atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1); - schRemoveTaskFromExecList(pJob, pTask); + (void)schRemoveTaskFromExecList(pJob, pTask); // ignore error SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { @@ -724,6 +748,11 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { if (SCH_IS_DATA_BIND_TASK(pTask)) { SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); + if (NULL == addr) { + SCH_TASK_ELOG("fail to the %dth condidateAddr, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs)); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + SCH_SWITCH_EPSET(addr); } else { SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask)); @@ -743,6 +772,11 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) { for (int32_t i = 0; i < nodeNum; ++i) { SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i); + if (NULL == nload) { + SCH_TASK_ELOG("fail to get the %dth node in nodeList, nodeNum:%d", i, nodeNum); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + SQueryNodeAddr *naddr = &nload->addr; if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) { @@ -810,6 +844,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { } int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) { + int32_t code = TSDB_CODE_SUCCESS; if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) { SCH_TASK_ELOG("not able to update cndidate addr, addr num %d", (int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs) : 0)); @@ -817,18 +852,27 @@ int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSe } SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0); + if (NULL == pAddr) { + SCH_TASK_ELOG("fail to get task 0th condidataAddr, totalNum:%d", (int32_t)taosArrayGetSize(pTask->candidateAddrs)); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } - char *origEpset = schDumpEpSet(&pAddr->epSet); - char *newEpset = schDumpEpSet(pEpSet); + char *origEpset = NULL; + char *newEpset = NULL; + + SCH_ERR_RET(schDumpEpSet(&pAddr->epSet, &origEpset)); + SCH_ERR_JRET(schDumpEpSet(pEpSet, &newEpset)); SCH_TASK_DLOG("update task target node %d epset from %s to %s", pAddr->nodeId, origEpset, newEpset); + TAOS_MEMCPY(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet)); + +_return: + taosMemoryFree(origEpset); taosMemoryFree(newEpset); - memcpy(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet)); - - return TSDB_CODE_SUCCESS; + return code; } int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) { @@ -877,7 +921,6 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { } int32_t size = (int32_t)taosHashGetSize(pTask->execNodes); - if (size <= 0) { SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); return; @@ -889,7 +932,7 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { if (nodeInfo->handle) { SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle); void *pExecId = taosHashGetKey(nodeInfo, NULL); - schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, pExecId); + (void)schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, pExecId); // ignore error and continue SCH_TASK_DLOG("start to drop task's %dth execNode", i); } else { @@ -939,6 +982,11 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) { for (int32_t i = 0; i < taskNum; ++i) { STaskStatus *pStatus = taosArrayGet(pStatusList, i); + if (NULL == pStatus) { + qError("fail to get the %dth task status in hb rsp, taskNum:%d", i, taskNum); + continue; + } + int32_t code = 0; qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId, pStatus->taskId, @@ -983,10 +1031,15 @@ int32_t schHandleExplainRes(SArray *pExplainRes) { for (int32_t i = 0; i < resNum; ++i) { SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i); + if (NULL == localRsp) { + qError("fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum); + continue; + } qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId); - pJob = schAcquireJob(localRsp->rId); + pJob = NULL; + (void)schAcquireJob(localRsp->rId, &pJob); if (NULL == pJob) { qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId, localRsp->tId, localRsp->rId); @@ -996,7 +1049,7 @@ int32_t schHandleExplainRes(SArray *pExplainRes) { int8_t status = 0; if (schJobNeedToStop(pJob, &status)) { SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status)); - schReleaseJob(pJob->refId); + (void)schReleaseJob(pJob->refId); SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR); } @@ -1006,7 +1059,7 @@ int32_t schHandleExplainRes(SArray *pExplainRes) { code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp); } - schReleaseJob(pJob->refId); + (void)schReleaseJob(pJob->refId); qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId, localRsp->tId, code); @@ -1022,6 +1075,11 @@ _return: for (int32_t i = 0; i < resNum; ++i) { SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i); + if (NULL == localRsp) { + qError("in _return fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum); + continue; + } + tFreeSExplainRsp(&localRsp->rsp); } @@ -1076,6 +1134,9 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) { if (SCH_IS_EXPLAIN_JOB(pJob)) { explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp)); + if (NULL == explainRes) { + SCH_ERR_RET(terrno); + } } SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, @@ -1097,7 +1158,9 @@ _return: int32_t schLaunchTaskImpl(void *param) { SSchTaskCtx *pCtx = (SSchTaskCtx *)param; - SSchJob *pJob = schAcquireJob(pCtx->jobRid); + SSchJob *pJob = NULL; + + (void)schAcquireJob(pCtx->jobRid, &pJob); if (NULL == pJob) { qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid); taosMemoryFree(param); @@ -1113,7 +1176,7 @@ int32_t schLaunchTaskImpl(void *param) { int8_t status = 0; int32_t code = 0; - atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); + (void)atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); pTask->execId++; pTask->retryTimes++; pTask->waitRetry = false; @@ -1160,7 +1223,7 @@ _return: SCH_UNLOCK_TASK(pTask); } - schReleaseJob(pJob->refId); + (void)schReleaseJob(pJob->refId); taosMemoryFree(param); @@ -1178,7 +1241,7 @@ int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) { param->asyncLaunch = true; - taosAsyncExec(schLaunchTaskImpl, param, NULL); + SCH_ERR_RET(taosAsyncExec(schLaunchTaskImpl, param, NULL)); } else { SCH_ERR_RET(schLaunchTaskImpl(param)); } @@ -1252,7 +1315,7 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } - taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer, &pTask->delayTimer); + (void)taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer, &pTask->delayTimer); return TSDB_CODE_SUCCESS; } @@ -1283,7 +1346,7 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { SCH_LOCK_TASK(pTask); if (pTask->delayTimer) { - taosTmrStopA(&pTask->delayTimer); + (void)taosTmrStopA(&pTask->delayTimer); } schDropTaskOnExecNode(pJob, pTask); SCH_UNLOCK_TASK(pTask); @@ -1327,6 +1390,9 @@ int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) { if (SCH_IS_EXPLAIN_JOB(pJob)) { explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp)); + if (NULL == explainRes) { + SCH_ERR_RET(terrno); + } } SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId, diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index 689c98d395..811890dde5 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -22,9 +22,14 @@ #include "tref.h" #include "trpc.h" -FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { +FORCE_INLINE int32_t schAcquireJob(int64_t refId, SSchJob** ppJob) { qDebug("sch acquire jobId:0x%" PRIx64, refId); - return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); + *ppJob = (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); + if (NULL == *ppJob) { + return terrno; + } + + return TSDB_CODE_SUCCESS; } FORCE_INLINE int32_t schReleaseJob(int64_t refId) { @@ -36,15 +41,16 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) { return taosReleaseRef(schMgmt.jobRef, refId); } -char *schDumpEpSet(SEpSet *pEpSet) { +int32_t schDumpEpSet(SEpSet *pEpSet, char** ppRes) { + *ppRes = NULL; if (NULL == pEpSet) { - return NULL; + return TSDB_CODE_SUCCESS; } int32_t maxSize = 1024; char *str = taosMemoryMalloc(maxSize); if (NULL == str) { - return NULL; + return terrno; } int32_t n = 0; @@ -54,7 +60,8 @@ char *schDumpEpSet(SEpSet *pEpSet) { n += snprintf(str + n, maxSize - n, "[%s:%d]", pEp->fqdn, pEp->port); } - return str; + *ppRes = str; + return TSDB_CODE_SUCCESS; } char *schGetOpStr(SCH_OP_TYPE type) { @@ -73,7 +80,7 @@ char *schGetOpStr(SCH_OP_TYPE type) { } void schFreeHbTrans(SSchHbTrans *pTrans) { - rpcReleaseHandle((void *)pTrans->trans.pHandleId, TAOS_CONN_CLIENT); + (void)rpcReleaseHandle((void *)pTrans->trans.pHandleId, TAOS_CONN_CLIENT); schFreeRpcCtx(&pTrans->rpcCtx); } @@ -86,7 +93,7 @@ void schCleanClusterHb(void *pTrans) { if (hb->trans.pTrans == pTrans) { SQueryNodeEpId *pEpId = taosHashGetKey(hb, NULL); schFreeHbTrans(hb); - taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId)); + (void)taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId)); } hb = taosHashIterate(schMgmt.hbConnections, hb); @@ -109,7 +116,7 @@ int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *ep int64_t taskNum = atomic_load_64(&hb->taskNum); if (taskNum <= 0) { schFreeHbTrans(hb); - taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId)); + (void)taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId)); } SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock); @@ -165,7 +172,7 @@ int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId * break; } - atomic_add_fetch_64(&hb->taskNum, 1); + (void)atomic_add_fetch_64(&hb->taskNum, 1); SCH_UNLOCK(SCH_READ, &schMgmt.hbLock); @@ -178,12 +185,17 @@ void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) { } SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); + if (NULL == addr) { + SCH_TASK_ELOG("fail to get the %dth condidateAddr in task, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs)); + return; + } + SQueryNodeEpId epId = {0}; epId.nodeId = addr->nodeId; SEp *pEp = SCH_GET_CUR_EP(addr); - strcpy(epId.ep.fqdn, pEp->fqdn); + TAOS_STRCPY(epId.ep.fqdn, pEp->fqdn); epId.ep.port = pEp->port; SCH_LOCK(SCH_READ, &schMgmt.hbLock); @@ -197,7 +209,7 @@ void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) { int64_t taskNum = atomic_sub_fetch_64(&hb->taskNum, 1); if (0 == taskNum) { SCH_UNLOCK(SCH_READ, &schMgmt.hbLock); - schRemoveHbConnection(pJob, pTask, &epId); + (void)schRemoveHbConnection(pJob, pTask, &epId); } else { SCH_UNLOCK(SCH_READ, &schMgmt.hbLock); } @@ -211,12 +223,17 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) { } SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); + if (NULL == addr) { + SCH_TASK_ELOG("fail to get the %dth condidateAddr in task, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs)); + return TSDB_CODE_SCH_INTERNAL_ERROR; + } + SQueryNodeEpId epId = {0}; epId.nodeId = addr->nodeId; SEp *pEp = SCH_GET_CUR_EP(addr); - strcpy(epId.ep.fqdn, pEp->fqdn); + TAOS_STRCPY(epId.ep.fqdn, pEp->fqdn); epId.ep.port = pEp->port; SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId)); @@ -240,7 +257,7 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) { } SCH_LOCK(SCH_WRITE, &hb->lock); - memcpy(&hb->trans, trans, sizeof(*trans)); + TAOS_MEMCPY(&hb->trans, trans, sizeof(*trans)); SCH_UNLOCK(SCH_WRITE, &hb->lock); SCH_UNLOCK(SCH_READ, &schMgmt.hbLock); @@ -256,7 +273,7 @@ void schCloseJobRef(void) { } if (schMgmt.jobRef >= 0) { - taosCloseRef(schMgmt.jobRef); + (void)taosCloseRef(schMgmt.jobRef); schMgmt.jobRef = -1; } } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 8f85e066cd..45e0ecf738 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -169,7 +169,8 @@ void schedulerFreeJob(int64_t *jobId, int32_t errCode) { return; } - SSchJob *pJob = schAcquireJob(*jobId); + SSchJob *pJob = NULL; + (void)schAcquireJob(*jobId, &pJob); if (NULL == pJob) { qWarn("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId); return; diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 78e876f82c..f906e3ec3e 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -75,9 +75,7 @@ int32_t schtStartFetch = 0; void schtInitLogFile() { const char *defaultLogFileNamePrefix = "taoslog"; const int32_t maxLogFileNum = 10; - rpcInit(); tsAsyncLog = 0; - rpcInit(); qDebugFlag = 159; strcpy(tsLogDir, TD_LOG_DIR_PATH); @@ -136,8 +134,13 @@ int32_t schtBuildSubmitRspMsg(uint32_t *msize, void **rspMsg) { tEncodeSize(tEncodeSSubmitRsp2, &submitRsp, msgSize, ret); void *msg = taosMemoryCalloc(1, msgSize); + if (NULL == msg) { + return terrno; + } tEncoderInit(&ec, (uint8_t *)msg, msgSize); - tEncodeSSubmitRsp2(&ec, &submitRsp); + if (tEncodeSSubmitRsp2(&ec, &submitRsp) < 0) { + return -1; + } tEncoderClear(&ec); *rspMsg = msg; @@ -152,11 +155,26 @@ void schtBuildQueryDag(SQueryPlan *dag) { dag->queryId = qId; dag->numOfSubplans = 2; dag->pSubplans = nodesMakeList(); + if (NULL == dag->pSubplans) { + return; + } SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); + if (NULL == scan) { + return; + } SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); + if (NULL == merge) { + return; + } SSubplan *scanPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + if (NULL == scanPlan) { + return; + } SSubplan *mergePlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + if (NULL == mergePlan) { + return; + } scanPlan->id.queryId = qId; scanPlan->id.groupId = 0x0000000000000002; @@ -170,7 +188,13 @@ void schtBuildQueryDag(SQueryPlan *dag) { scanPlan->pChildren = NULL; scanPlan->level = 1; scanPlan->pParents = nodesMakeList(); + if (NULL == scanPlan->pParents) { + return; + } scanPlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN); + if (NULL == scanPlan->pNode) { + return; + } scanPlan->msgType = TDMT_SCH_QUERY; mergePlan->id.queryId = qId; @@ -181,21 +205,33 @@ void schtBuildQueryDag(SQueryPlan *dag) { mergePlan->execNode.epSet.numOfEps = 0; mergePlan->pChildren = nodesMakeList(); + if (NULL == mergePlan->pChildren) { + return; + } mergePlan->pParents = NULL; mergePlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_MERGE); + if (NULL == mergePlan->pNode) { + return; + } mergePlan->msgType = TDMT_SCH_QUERY; merge->pNodeList = nodesMakeList(); + if (NULL == merge->pNodeList) { + return; + } scan->pNodeList = nodesMakeList(); + if (NULL == scan->pNodeList) { + return; + } - nodesListAppend(merge->pNodeList, (SNode *)mergePlan); - nodesListAppend(scan->pNodeList, (SNode *)scanPlan); + (void)nodesListAppend(merge->pNodeList, (SNode *)mergePlan); + (void)nodesListAppend(scan->pNodeList, (SNode *)scanPlan); - nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan); - nodesListAppend(scanPlan->pParents, (SNode *)mergePlan); + (void)nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan); + (void)nodesListAppend(scanPlan->pParents, (SNode *)mergePlan); - nodesListAppend(dag->pSubplans, (SNode *)merge); - nodesListAppend(dag->pSubplans, (SNode *)scan); + (void)nodesListAppend(dag->pSubplans, (SNode *)merge); + (void)nodesListAppend(dag->pSubplans, (SNode *)scan); } void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) { @@ -205,18 +241,42 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) { dag->queryId = qId; dag->numOfSubplans = 2; dag->pSubplans = nodesMakeList(); + if (NULL == dag->pSubplans) { + return; + } SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); + if (NULL == scan) { + return; + } SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); + if (NULL == merge) { + return; + } SSubplan *mergePlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + if (NULL == mergePlan) { + return; + } merge->pNodeList = nodesMakeList(); + if (NULL == merge->pNodeList) { + return; + } scan->pNodeList = nodesMakeList(); + if (NULL == scan->pNodeList) { + return; + } mergePlan->pChildren = nodesMakeList(); + if (NULL == mergePlan->pChildren) { + return; + } for (int32_t i = 0; i < scanPlanNum; ++i) { SSubplan *scanPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + if (NULL == scanPlan) { + return; + } scanPlan->id.queryId = qId; scanPlan->id.groupId = 0x0000000000000002; scanPlan->id.subplanId = 0x0000000000000003 + i; @@ -233,13 +293,19 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) { scanPlan->pChildren = NULL; scanPlan->level = 1; scanPlan->pParents = nodesMakeList(); + if (NULL == scanPlan->pParents) { + return; + } scanPlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN); + if (NULL == scanPlan->pNode) { + return; + } scanPlan->msgType = TDMT_SCH_QUERY; - nodesListAppend(scanPlan->pParents, (SNode *)mergePlan); - nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan); + (void)nodesListAppend(scanPlan->pParents, (SNode *)mergePlan); + (void)nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan); - nodesListAppend(scan->pNodeList, (SNode *)scanPlan); + (void)nodesListAppend(scan->pNodeList, (SNode *)scanPlan); } mergePlan->id.queryId = qId; @@ -251,12 +317,15 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) { mergePlan->pParents = NULL; mergePlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_MERGE); + if (NULL == mergePlan->pNode) { + return; + } mergePlan->msgType = TDMT_SCH_QUERY; - nodesListAppend(merge->pNodeList, (SNode *)mergePlan); + (void)nodesListAppend(merge->pNodeList, (SNode *)mergePlan); - nodesListAppend(dag->pSubplans, (SNode *)merge); - nodesListAppend(dag->pSubplans, (SNode *)scan); + (void)nodesListAppend(dag->pSubplans, (SNode *)merge); + (void)nodesListAppend(dag->pSubplans, (SNode *)scan); } void schtFreeQueryDag(SQueryPlan *dag) {} @@ -267,10 +336,22 @@ void schtBuildInsertDag(SQueryPlan *dag) { dag->queryId = qId; dag->numOfSubplans = 2; dag->pSubplans = nodesMakeList(); + if (NULL == dag->pSubplans) { + return; + } SNodeListNode *inserta = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); + if (NULL == inserta) { + return; + } inserta->pNodeList = nodesMakeList(); + if (NULL == inserta->pNodeList) { + return; + } SSubplan *insertPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + if (NULL == insertPlan) { + return; + } insertPlan->id.queryId = qId; insertPlan->id.groupId = 0x0000000000000003; @@ -286,13 +367,22 @@ void schtBuildInsertDag(SQueryPlan *dag) { insertPlan->pParents = NULL; insertPlan->pNode = NULL; insertPlan->pDataSink = (SDataSinkNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT); + if (NULL == insertPlan->pDataSink) { + return; + } ((SDataInserterNode *)insertPlan->pDataSink)->size = 1; ((SDataInserterNode *)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1); + if (NULL == ((SDataInserterNode *)insertPlan->pDataSink)->pData) { + return; + } insertPlan->msgType = TDMT_VND_SUBMIT; - nodesListAppend(inserta->pNodeList, (SNode *)insertPlan); + (void)nodesListAppend(inserta->pNodeList, (SNode *)insertPlan); insertPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + if (NULL == insertPlan) { + return; + } insertPlan->id.queryId = qId; insertPlan->id.groupId = 0x0000000000000003; @@ -308,22 +398,31 @@ void schtBuildInsertDag(SQueryPlan *dag) { insertPlan->pParents = NULL; insertPlan->pNode = NULL; insertPlan->pDataSink = (SDataSinkNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT); + if (NULL == insertPlan->pDataSink) { + return; + } ((SDataInserterNode *)insertPlan->pDataSink)->size = 1; ((SDataInserterNode *)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1); + if (NULL == ((SDataInserterNode *)insertPlan->pDataSink)->pData) { + return; + } insertPlan->msgType = TDMT_VND_SUBMIT; - nodesListAppend(inserta->pNodeList, (SNode *)insertPlan); + (void)nodesListAppend(inserta->pNodeList, (SNode *)insertPlan); - nodesListAppend(dag->pSubplans, (SNode *)inserta); + (void)nodesListAppend(dag->pSubplans, (SNode *)inserta); } int32_t schtPlanToString(const SSubplan *subplan, char **str, int32_t *len) { *str = (char *)taosMemoryCalloc(1, 20); + if (NULL == *str) { + return -1; + } *len = 20; return 0; } -void schtExecNode(SSubplan *subplan, uint64_t groupId, SQueryNodeAddr *ep) {} +int32_t schtExecNode(SSubplan *subplan, uint64_t groupId, SQueryNodeAddr *ep) { return 0; } void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {} @@ -431,24 +530,27 @@ void *schtSendRsp(void *param) { taosMsleep(1); } - pJob = schAcquireJob(job); - + code = schAcquireJob(job, &pJob); + if (code) { + return NULL; + } + void *pIter = taosHashIterate(pJob->execTasks, NULL); while (pIter) { SSchTask *task = *(SSchTask **)pIter; SDataBuf msg = {0}; void *rmsg = NULL; - schtBuildSubmitRspMsg(&msg.len, &rmsg); + (void)schtBuildSubmitRspMsg(&msg.len, &rmsg); msg.msgType = TDMT_VND_SUBMIT_RSP; msg.pData = rmsg; - schHandleResponseMsg(pJob, task, task->execId, &msg, 0); + (void)schHandleResponseMsg(pJob, task, task->execId, &msg, 0); pIter = taosHashIterate(pJob->execTasks, pIter); } - schReleaseJob(job); + (void)schReleaseJob(job); schtJobDone = true; @@ -457,20 +559,25 @@ void *schtSendRsp(void *param) { void *schtCreateFetchRspThread(void *param) { int64_t job = *(int64_t *)param; - SSchJob *pJob = schAcquireJob(job); + SSchJob *pJob = NULL; + (void)schAcquireJob(job, &pJob); + if (NULL == pJob) { + return NULL; + } + taosSsleep(1); int32_t code = 0; SDataBuf msg = {0}; void *rmsg = NULL; - schtBuildFetchRspMsg(&msg.len, &rmsg); + (void)schtBuildFetchRspMsg(&msg.len, &rmsg); msg.msgType = TDMT_SCH_MERGE_FETCH_RSP; msg.pData = rmsg; code = schHandleResponseMsg(pJob, pJob->fetchTask, pJob->fetchTask->execId, &msg, 0); - schReleaseJob(job); + (void)schReleaseJob(job); assert(code == 0); return NULL; @@ -488,12 +595,17 @@ void *schtFetchRspThread(void *aa) { taosUsleep(100); param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param)); - + if (NULL == param) { + return NULL; + } param->queryId = schtQueryId; param->taskId = schtFetchTaskId; int32_t code = 0; SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp)); + if (NULL == rsp) { + return NULL; + } rsp->completed = 1; rsp->numOfRows = 10; @@ -549,12 +661,17 @@ void *schtRunJobThread(void *aa) { schtBuildQueryDag(dag); SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad)); - + if (NULL == qnodeList) { + assert(0); + } + SQueryNodeLoad load = {0}; load.addr.epSet.numOfEps = 1; strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep"); load.addr.epSet.eps[0].port = 6031; - taosArrayPush(qnodeList, &load); + if (NULL == taosArrayPush(qnodeList, &load)) { + assert(0); + } queryDone = 0; @@ -572,7 +689,9 @@ void *schtRunJobThread(void *aa) { code = schedulerExecJob(&req, &queryJobRefId); assert(code == 0); - pJob = schAcquireJob(queryJobRefId); + pJob = NULL; + code = schAcquireJob(queryJobRefId, &pJob); + if (NULL == pJob) { taosArrayDestroy(qnodeList); schtFreeQueryDag(dag); @@ -580,16 +699,24 @@ void *schtRunJobThread(void *aa) { } execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == execTasks) { + assert(0); + } void *pIter = taosHashIterate(pJob->execTasks, NULL); while (pIter) { SSchTask *task = *(SSchTask **)pIter; schtFetchTaskId = task->taskId - 1; - taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task)); + if (taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task))) { + assert(0); + } pIter = taosHashIterate(pJob->execTasks, pIter); } param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param)); + if (NULL == param) { + assert(0); + } param->refId = queryJobRefId; param->queryId = pJob->queryId; @@ -601,7 +728,9 @@ void *schtRunJobThread(void *aa) { SDataBuf msg = {0}; void *rmsg = NULL; - schtBuildQueryRspMsg(&msg.len, &rmsg); + if (schtBuildQueryRspMsg(&msg.len, &rmsg)) { + assert(0); + } msg.msgType = TDMT_SCH_QUERY_RSP; msg.pData = rmsg; @@ -612,6 +741,9 @@ void *schtRunJobThread(void *aa) { } param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param)); + if (NULL == param) { + assert(0); + } param->refId = queryJobRefId; param->queryId = pJob->queryId; @@ -622,7 +754,9 @@ void *schtRunJobThread(void *aa) { param->taskId = task->taskId - 1; SDataBuf msg = {0}; void *rmsg = NULL; - schtBuildQueryRspMsg(&msg.len, &rmsg); + if (schtBuildQueryRspMsg(&msg.len, &rmsg)) { + assert(0); + } msg.msgType = TDMT_SCH_QUERY_RSP; msg.pData = rmsg; @@ -728,7 +862,9 @@ TEST(queryTest, normalCase) { code = schedulerExecJob(&req, &job); ASSERT_EQ(code, 0); - SSchJob *pJob = schAcquireJob(job); + SSchJob *pJob = NULL; + code = schAcquireJob(job, &pJob); + ASSERT_EQ(code, 0); void *pIter = taosHashIterate(pJob->execTasks, NULL); while (pIter) { @@ -839,8 +975,10 @@ TEST(queryTest, readyFirstCase) { code = schedulerExecJob(&req, &job); ASSERT_EQ(code, 0); - SSchJob *pJob = schAcquireJob(job); - + SSchJob *pJob = NULL; + code = schAcquireJob(job, &pJob); + ASSERT_EQ(code, 0); + void *pIter = taosHashIterate(pJob->execTasks, NULL); while (pIter) { SSchTask *task = *(SSchTask **)pIter; @@ -956,7 +1094,9 @@ TEST(queryTest, flowCtrlCase) { code = schedulerExecJob(&req, &job); ASSERT_EQ(code, 0); - SSchJob *pJob = schAcquireJob(job); + SSchJob *pJob = NULL; + code = schAcquireJob(job, &pJob); + ASSERT_EQ(code, 0); while (!queryDone) { void *pIter = taosHashIterate(pJob->execTasks, NULL); @@ -1094,13 +1234,17 @@ TEST(otherTest, otherCase) { schReleaseJob(0); schFreeRpcCtx(NULL); - ASSERT_EQ(schDumpEpSet(NULL), (char *)NULL); + char* ep = NULL; + ASSERT_EQ(schDumpEpSet(NULL, &ep), TSDB_CODE_SUCCESS); ASSERT_EQ(strcmp(schGetOpStr(SCH_OP_NULL), "NULL"), 0); ASSERT_EQ(strcmp(schGetOpStr((SCH_OP_TYPE)100), "UNKNOWN"), 0); } int main(int argc, char **argv) { schtInitLogFile(); + if (rpcInit()) { + assert(0); + } taosSeedRand(taosGetTimestampSec()); testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS();