From a8c39cdd1139c23958132b7ca19bbb06e7dbf99a Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 15 Jul 2024 14:48:21 +0800 Subject: [PATCH] enh: add return code processing --- source/libs/qworker/src/qwDbg.c | 21 +++++++++++--- source/libs/qworker/src/qwMsg.c | 48 +++++++++++++++++++------------ source/libs/qworker/src/qwUtil.c | 43 +++++++++++++++++++++------ source/libs/qworker/src/qworker.c | 2 +- 4 files changed, 82 insertions(+), 32 deletions(-) diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 17b2849684..b7a4b718e2 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -173,8 +173,21 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int if (pEpSet) { contLen = tSerializeSEpSet(NULL, 0, pEpSet); + if (contLen < 0) { + qError("tSerializeSEpSet failed, code:%x", terrno); + return terrno; + } rsp = rpcMallocCont(contLen); - tSerializeSEpSet(rsp, contLen, pEpSet); + if (NULL == rsp) { + qError("rpcMallocCont %d failed, code:%x", contLen, terrno); + return terrno; + } + + contLen = tSerializeSEpSet(rsp, contLen, pEpSet); + if (contLen < 0) { + qError("tSerializeSEpSet second failed, code:%x", terrno); + return terrno; + } } SRpcMsg rpcRsp = { @@ -216,20 +229,20 @@ void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) { epSet.eps[2].port = 7300; ctx->phase = QW_PHASE_POST_QUERY; - qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, &epSet); + (void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, &epSet); // ignore error *rsped = true; return; } if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) { QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY); - qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); + (void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); // ignore error *rsped = true; return; } if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 9)) { - qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); + (void)qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_SYN_NOT_LEADER, NULL); // ignore error *rsped = true; return; } diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 473dd41228..9f4a540f3c 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -166,6 +166,9 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve int32_t code) { if (NULL == pRsp) { pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); + if (NULL == pRsp) { + QW_RET(terrno); + } memset(pRsp, 0, sizeof(SRetrieveTableRsp)); dataLength = 0; } @@ -187,6 +190,9 @@ int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieve #if 0 int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) { STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp)); + if (NULL == pRsp) { + QW_RET(terrno); + } pRsp->code = code; SRpcMsg rpcRsp = { @@ -203,6 +209,9 @@ int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code) { int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) { STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp)); + if (NULL == pRsp) { + QW_RET(terrno); + } pRsp->code = code; SRpcMsg rpcRsp = { @@ -428,6 +437,7 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + int32_t code = 0; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; SSubQueryMsg msg = {0}; if (tDeserializeSSubQueryMsg(pMsg->pCont, pMsg->contLen, &msg) < 0) { @@ -442,8 +452,8 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t eId = msg.execId; QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle); - qwAbortPrerocessQuery(QW_FPARAMS()); - QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle); + code = qwAbortPrerocessQuery(QW_FPARAMS()); + QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p, code:%x", pMsg->info.handle, code); tFreeSSubQueryMsg(&msg); @@ -458,7 +468,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int int32_t code = 0; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.queryProcessed, 1); SSubQueryMsg msg = {0}; @@ -500,7 +510,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in SQWTaskCtx *handles = NULL; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, QUERY_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.cqueryProcessed, 1); if (NULL == msg || pMsg->contLen < sizeof(*msg)) { @@ -533,7 +543,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int SResFetchReq req = {0}; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.fetchProcessed, 1); if (tDeserializeSResFetchReq(pMsg->pCont, pMsg->contLen, &req) < 0) { @@ -551,9 +561,9 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle); - QW_ERR_RET(qwProcessFetch(QW_FPARAMS(), &qwMsg)); + int32_t code = qwProcessFetch(QW_FPARAMS(), &qwMsg); - QW_SCH_TASK_DLOG("processFetch end, node:%p", node); + QW_SCH_TASK_DLOG("processFetch end, node:%p, code:%x", node, code); return TSDB_CODE_SUCCESS; } @@ -561,7 +571,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { SQWorker *mgmt = (SQWorker *)qWorkerMgmt; if (mgmt) { - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.rspProcessed, 1); } @@ -580,7 +590,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in int32_t code = 0; STaskCancelReq *msg = pMsg->pCont; - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.cancelProcessed, 1); if (NULL == msg || pMsg->contLen < sizeof(*msg)) { @@ -621,7 +631,7 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6 int32_t code = 0; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.dropProcessed, 1); STaskDropReq msg = {0}; @@ -644,9 +654,9 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6 QW_SCH_TASK_DLOG("processDrop start, node:%p, handle:%p", node, pMsg->info.handle); - QW_ERR_RET(qwProcessDrop(QW_FPARAMS(), &qwMsg)); + code = qwProcessDrop(QW_FPARAMS(), &qwMsg); - QW_SCH_TASK_DLOG("processDrop end, node:%p", node); + QW_SCH_TASK_DLOG("processDrop end, node:%p, code:%x", node, code); return TSDB_CODE_SUCCESS; } @@ -659,7 +669,7 @@ int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in int32_t code = 0; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.notifyProcessed, 1); STaskNotifyReq msg = {0}; @@ -678,9 +688,9 @@ int32_t qWorkerProcessNotifyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in QW_SCH_TASK_DLOG("processNotify start, node:%p, handle:%p", node, pMsg->info.handle); - QW_ERR_RET(qwProcessNotify(QW_FPARAMS(), &qwMsg)); + code = qwProcessNotify(QW_FPARAMS(), &qwMsg); - QW_SCH_TASK_DLOG("processNotify end, node:%p", node); + QW_SCH_TASK_DLOG("processNotify end, node:%p, code:%x", node, code); return TSDB_CODE_SUCCESS; } @@ -695,7 +705,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ SSchedulerHbReq req = {0}; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; - qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); + QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1); if (NULL == pMsg->pCont) { @@ -717,9 +727,9 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->info.handle); - QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req)); + code = qwProcessHb(mgmt, &qwMsg, &req); - QW_SCH_DLOG("processHb end, node:%p", node); + QW_SCH_DLOG("processHb end, node:%p, code:%x", node, code); return TSDB_CODE_SUCCESS; } @@ -735,7 +745,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SD QW_STAT_INC(mgmt->stat.msgStat.deleteProcessed, 1); - tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req); + QW_ERR_RET(tDeserializeSVDeleteReq(pMsg->pCont, pMsg->contLen, &req)); uint64_t sId = req.sId; uint64_t qId = req.queryId; diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 06559093a8..0451532cbe 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -323,34 +323,52 @@ static void freeExplainExecItem(void *param) { int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { + int32_t code = TSDB_CODE_SUCCESS; qTaskInfo_t taskHandle = ctx->taskHandle; ctx->explainRsped = true; SArray *execInfoList = taosArrayInit(4, sizeof(SExplainExecInfo)); - QW_ERR_RET(qGetExplainExecInfo(taskHandle, execInfoList)); + if (NULL == execInfoList) { + QW_ERR_JRET(terrno); + } + + QW_ERR_JRET(qGetExplainExecInfo(taskHandle, execInfoList)); if (ctx->localExec) { SExplainLocalRsp localRsp = {0}; localRsp.rsp.numOfPlans = taosArrayGetSize(execInfoList); SExplainExecInfo *pExec = taosMemoryCalloc(localRsp.rsp.numOfPlans, sizeof(SExplainExecInfo)); + if (NULL == pExec) { + QW_ERR_JRET(terrno); + } memcpy(pExec, taosArrayGet(execInfoList, 0), localRsp.rsp.numOfPlans * sizeof(SExplainExecInfo)); localRsp.rsp.subplanInfo = pExec; localRsp.qId = qId; localRsp.tId = tId; localRsp.rId = rId; localRsp.eId = eId; - taosArrayPush(ctx->explainRes, &localRsp); + if (NULL == taosArrayPush(ctx->explainRes, &localRsp)) { + QW_ERR_JRET(terrno); + } + taosArrayDestroy(execInfoList); + execInfoList = NULL; } else { SRpcHandleInfo connInfo = ctx->ctrlConnInfo; connInfo.ahandle = NULL; int32_t code = qwBuildAndSendExplainRsp(&connInfo, execInfoList); taosArrayDestroyEx(execInfoList, freeExplainExecItem); + execInfoList = NULL; + QW_ERR_RET(code); } - return TSDB_CODE_SUCCESS; +_return: + + taosArrayDestroyEx(execInfoList, freeExplainExecItem); + + return code; } @@ -544,7 +562,7 @@ int32_t qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { void qwCloseRef(void) { taosWLockLatch(&gQwMgmt.lock); if (atomic_load_32(&gQwMgmt.qwNum) <= 0 && gQwMgmt.qwRef >= 0) { - taosCloseRef(gQwMgmt.qwRef); + (void)taosCloseRef(gQwMgmt.qwRef); // ignore error gQwMgmt.qwRef = -1; } taosWUnLockLatch(&gQwMgmt.lock); @@ -561,7 +579,7 @@ void qwDestroyImpl(void *pMgmt) { int32_t schStatusCount = 0; qDebug("start to destroy qworker, type:%d, id:%d, handle:%p", nodeType, nodeId, mgmt); - taosTmrStop(mgmt->hbTimer); + (void)taosTmrStop(mgmt->hbTimer); //ignore error mgmt->hbTimer = NULL; taosTmrCleanUp(mgmt->timer); @@ -652,24 +670,33 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) { return pStat->num ? (pStat->total / pStat->num) : 0; default: qError("unsupported queue type %d", type); + break; } return -1; } void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) { + int32_t code = TSDB_CODE_SUCCESS; int32_t num = taosArrayGetSize(pExpiredSch); for (int32_t i = 0; i < num; ++i) { uint64_t *sId = taosArrayGet(pExpiredSch, i); SQWSchStatus *pSch = NULL; - if (qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch)) { + if (NULL == sId) { + qError("get the %dth sch failed, code:%x", i, terrno); + break; + } + + code = qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch); + if (TSDB_CODE_SUCCESS != code) { + qError("acquire sch %" PRIx64 " failed, code:%x", *sId, code); continue; } if (taosHashGetSize(pSch->tasksHash) <= 0) { qwDestroySchStatus(pSch); - taosHashRemove(mgmt->schHash, sId, sizeof(*sId)); - qDebug("sch %" PRIx64 " destroyed", *sId); + code = taosHashRemove(mgmt->schHash, sId, sizeof(*sId)); + qDebug("sch %" PRIx64 " destroy result code:%x", *sId, code); } qwReleaseScheduler(QW_WRITE, mgmt); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 084ee7efe3..5840cc0245 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -1372,7 +1372,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S _return: if (mgmt->refId >= 0) { - qwRelease(mgmt->refId); // ignore error + (void)qwRelease(mgmt->refId); // ignore error } else { taosHashCleanup(mgmt->schHash); taosHashCleanup(mgmt->ctxHash);