From 1c12a16b0095bf5817b0c374bcae53cfd2323877 Mon Sep 17 00:00:00 2001 From: dmchen Date: Sun, 21 Jul 2024 00:58:07 +0000 Subject: [PATCH] fix/TD-30989 --- source/dnode/mnode/impl/src/mndMnode.c | 256 ++++++++++++-------- source/dnode/mnode/impl/src/mndPerfSchema.c | 53 ++-- source/dnode/mnode/impl/src/mndProfile.c | 77 +++--- source/dnode/mnode/impl/src/mndQnode.c | 190 ++++++++------- source/dnode/mnode/impl/src/mndQuery.c | 6 +- source/dnode/mnode/impl/src/mndScheduler.c | 82 ++++--- 6 files changed, 373 insertions(+), 291 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index af6ae8c5a0..7b1ca9c625 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -69,6 +69,7 @@ int32_t mndInitMnode(SMnode *pMnode) { void mndCleanupMnode(SMnode *pMnode) {} SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) { + terrno = 0; SMnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_MNODE, &mnodeId); if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { terrno = TSDB_CODE_MND_MNODE_NOT_EXIST; @@ -82,13 +83,18 @@ void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj) { } static int32_t mndCreateDefaultMnode(SMnode *pMnode) { + int32_t code = 0; SMnodeObj mnodeObj = {0}; mnodeObj.id = 1; mnodeObj.createdTime = taosGetTimestampMs(); mnodeObj.updateTime = mnodeObj.createdTime; SSdbRaw *pRaw = mndMnodeActionEncode(&mnodeObj); - if (pRaw == NULL) return -1; + if (pRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + return -1; + } (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); mInfo("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw); @@ -97,25 +103,27 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) { if (pTrans == NULL) { sdbFreeRaw(pRaw); mError("mnode:%d, failed to create since %s", mnodeObj.id, terrstr()); + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; return -1; } mInfo("trans:%d, used to create mnode:%d", pTrans->id, mnodeObj.id); - if (mndTransAppendCommitlog(pTrans, pRaw) != 0) { + if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) { mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); - return -1; + TAOS_RETURN(code); } (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); - if (mndTransPrepare(pMnode, pTrans) != 0) { + if ((code = mndTransPrepare(pMnode, pTrans)) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; } mndTransDrop(pTrans); - return 0; + TAOS_RETURN(code); } static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) { @@ -188,16 +196,19 @@ _OVER: } static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) { + int32_t code = 0; mTrace("mnode:%d, perform insert action, row:%p", pObj->id, pObj); pObj->pDnode = sdbAcquireNotReadyObj(pSdb, SDB_DNODE, &pObj->id); if (pObj->pDnode == NULL) { mError("mnode:%d, failed to perform insert action since %s", pObj->id, terrstr()); - return -1; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + int32_t code = 0; } pObj->syncState = TAOS_SYNC_STATE_OFFLINE; mndReloadSyncConfig(pSdb->pMnode); - return 0; + TAOS_RETURN(code); } static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pObj) { @@ -271,38 +282,59 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { } static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { + int32_t code = 0; SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj); - if (pRedoRaw == NULL) return -1; - if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; - if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1; - return 0; + if (pRedoRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw)); + TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING)); + TAOS_RETURN(code); } int32_t mndSetRestoreCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { + int32_t code = 0; SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj); - if (pRedoRaw == NULL) return -1; - if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; - if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1; - return 0; + if (pRedoRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw)); + TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY)); + TAOS_RETURN(code); } static int32_t mndSetCreateMnodeUndoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { + int32_t code = 0; SSdbRaw *pUndoRaw = mndMnodeActionEncode(pObj); - if (pUndoRaw == NULL) return -1; - if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; - if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1; - return 0; + if (pUndoRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw)); + TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED)); + TAOS_RETURN(code); } int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { + int32_t code = 0; SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj); - if (pCommitRaw == NULL) return -1; - if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; - if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1; - return 0; + if (pCommitRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw)); + TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)); + TAOS_RETURN(code); } static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pCreateReq, SEpSet *pCreateEpSet) { + int32_t code = 0; int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pCreateReq); void *pReq = taosMemoryMalloc(contLen); tSerializeSDCreateMnodeReq(pReq, contLen, pCreateReq); @@ -315,15 +347,16 @@ static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *p .acceptableCode = TSDB_CODE_MNODE_ALREADY_DEPLOYED, }; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { + if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) { taosMemoryFree(pReq); - return -1; + TAOS_RETURN(code); } - return 0; + TAOS_RETURN(code); } static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans, SDAlterMnodeTypeReq *pAlterMnodeTypeReq, SEpSet *pAlterMnodeTypeEpSet) { + int32_t code = 0; int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterMnodeTypeReq); void *pReq = taosMemoryMalloc(contLen); tSerializeSDCreateMnodeReq(pReq, contLen, pAlterMnodeTypeReq); @@ -337,14 +370,15 @@ static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans, SDAlterMnodeType .acceptableCode = TSDB_CODE_MNODE_ALREADY_IS_VOTER, }; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { + if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) { taosMemoryFree(pReq); - return -1; + TAOS_RETURN(code); } - return 0; + TAOS_RETURN(code); } static int32_t mndBuildAlterMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pAlterReq, SEpSet *pAlterEpSet) { + int32_t code = 0; int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterReq); void *pReq = taosMemoryMalloc(contLen); tSerializeSDCreateMnodeReq(pReq, contLen, pAlterReq); @@ -357,15 +391,16 @@ static int32_t mndBuildAlterMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *pA .acceptableCode = 0, }; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { + if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) { taosMemoryFree(pReq); - return -1; + TAOS_RETURN(code); } - return 0; + TAOS_RETURN(code); } static int32_t mndBuildDropMnodeRedoAction(STrans *pTrans, SDDropMnodeReq *pDropReq, SEpSet *pDroprEpSet) { + int32_t code = 0; int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, pDropReq); void *pReq = taosMemoryMalloc(contLen); tSerializeSCreateDropMQSNodeReq(pReq, contLen, pDropReq); @@ -378,11 +413,11 @@ static int32_t mndBuildDropMnodeRedoAction(STrans *pTrans, SDDropMnodeReq *pDrop .acceptableCode = TSDB_CODE_MNODE_NOT_DEPLOYED, }; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { + if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) { taosMemoryFree(pReq); - return -1; + TAOS_RETURN(code); } - return 0; + TAOS_RETURN(code); } static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) { @@ -426,9 +461,9 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno createEpset.eps[0].port = pDnode->port; memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - if (mndBuildCreateMnodeRedoAction(pTrans, &createReq, &createEpset) != 0) return -1; + TAOS_CHECK_RETURN(mndBuildCreateMnodeRedoAction(pTrans, &createReq, &createEpset)); - return 0; + TAOS_RETURN(0); } int32_t mndSetRestoreCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) { @@ -474,9 +509,9 @@ int32_t mndSetRestoreCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno createEpset.eps[0].port = pDnode->port; memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - if (mndBuildCreateMnodeRedoAction(pTrans, &createReq, &createEpset) != 0) return -1; + TAOS_CHECK_RETURN(mndBuildCreateMnodeRedoAction(pTrans, &createReq, &createEpset)); - return 0; + TAOS_RETURN(0); } static int32_t mndSetAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) { @@ -517,9 +552,9 @@ static int32_t mndSetAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, S createEpset.eps[0].port = pDnode->port; memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - if (mndBuildAlterMnodeTypeRedoAction(pTrans, &alterReq, &createEpset) != 0) return -1; + TAOS_CHECK_RETURN(mndBuildAlterMnodeTypeRedoAction(pTrans, &alterReq, &createEpset)); - return 0; + TAOS_RETURN(0); } int32_t mndSetRestoreAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) { @@ -565,19 +600,23 @@ int32_t mndSetRestoreAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, S createEpset.eps[0].port = pDnode->port; memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - if (mndBuildAlterMnodeTypeRedoAction(pTrans, &alterReq, &createEpset) != 0) return -1; + TAOS_CHECK_RETURN(mndBuildAlterMnodeTypeRedoAction(pTrans, &alterReq, &createEpset)); - return 0; + TAOS_RETURN(0); } static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "create-mnode"); - if (pTrans == NULL) goto _OVER; + if (pTrans == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + goto _OVER; + } mndTransSetSerial(pTrans); mInfo("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId); - if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER; + TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER); SMnodeObj mnodeObj = {0}; mnodeObj.id = pDnode->id; @@ -586,8 +625,8 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, mnodeObj.role = TAOS_SYNC_ROLE_LEARNER; mnodeObj.lastIndex = pMnode->applied; - if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER; - if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; + TAOS_CHECK_GOTO(mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj), NULL, _OVER); + TAOS_CHECK_GOTO(mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj), NULL, _OVER); SMnodeObj mnodeLeaderObj = {0}; mnodeLeaderObj.id = pDnode->id; @@ -596,15 +635,15 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, mnodeLeaderObj.role = TAOS_SYNC_ROLE_VOTER; mnodeLeaderObj.lastIndex = pMnode->applied + 1; - if (mndSetAlterMnodeTypeRedoActions(pMnode, pTrans, pDnode, &mnodeLeaderObj) != 0) goto _OVER; - if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeLeaderObj) != 0) goto _OVER; - if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; + TAOS_CHECK_GOTO(mndSetAlterMnodeTypeRedoActions(pMnode, pTrans, pDnode, &mnodeLeaderObj), NULL, _OVER); + TAOS_CHECK_GOTO(mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeLeaderObj), NULL, _OVER); + TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER); code = 0; _OVER: mndTransDrop(pTrans); - return code; + TAOS_RETURN(code); } static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) { @@ -614,19 +653,14 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) { SDnodeObj *pDnode = NULL; SMCreateMnodeReq createReq = {0}; - if (tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - goto _OVER; - } + TAOS_CHECK_GOTO(tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER); mInfo("mnode:%d, start to create", createReq.dnodeId); - if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MNODE) != 0) { - goto _OVER; - } + TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MNODE), NULL, _OVER); pObj = mndAcquireMnode(pMnode, createReq.dnodeId); if (pObj != NULL) { - terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST; + code = TSDB_CODE_MND_MNODE_ALREADY_EXIST; goto _OVER; } else if (terrno != TSDB_CODE_MND_MNODE_NOT_EXIST) { goto _OVER; @@ -634,17 +668,17 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) { pDnode = mndAcquireDnode(pMnode, createReq.dnodeId); if (pDnode == NULL) { - terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + code = TSDB_CODE_MND_DNODE_NOT_EXIST; goto _OVER; } if (sdbGetSize(pMnode->pSdb, SDB_MNODE) >= 3) { - terrno = TSDB_CODE_MND_TOO_MANY_MNODES; + code = TSDB_CODE_MND_TOO_MANY_MNODES; goto _OVER; } if (!mndIsDnodeOnline(pDnode, taosGetTimestampMs())) { - terrno = TSDB_CODE_DNODE_OFFLINE; + code = TSDB_CODE_DNODE_OFFLINE; goto _OVER; } @@ -665,27 +699,38 @@ _OVER: mndReleaseDnode(pMnode, pDnode); tFreeSMCreateQnodeReq(&createReq); - return code; + TAOS_RETURN(code); } static int32_t mndSetDropMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { + int32_t code = 0; SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj); - if (pRedoRaw == NULL) return -1; - if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; - if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1; - return 0; + if (pRedoRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw)); + TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)); + TAOS_RETURN(code); } static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { + int32_t code = 0; SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj); - if (pCommitRaw == NULL) return -1; - if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; - if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; - return 0; + if (pCommitRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw)); + TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)); + TAOS_RETURN(code); } static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj, bool force) { + int32_t code = 0; SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; SDDropMnodeReq dropReq = {0}; @@ -700,32 +745,32 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode if (totalMnodes == 2) { if (force) { mError("cant't force drop dnode, since a mnode on it and replica is 2"); - terrno = TSDB_CODE_MNODE_ONLY_TWO_MNODE; - return -1; + code = TSDB_CODE_MNODE_ONLY_TWO_MNODE; + TAOS_RETURN(code); } mInfo("vgId:1, has %d mnodes, exec redo log first", totalMnodes); - if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) return -1; + TAOS_CHECK_RETURN(mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj)); if (!force) { - if (mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet) != 0) return -1; + TAOS_CHECK_RETURN(mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet)); } } else if (totalMnodes == 3) { mInfo("vgId:1, has %d mnodes, exec redo action first", totalMnodes); if (!force) { - if (mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet) != 0) return -1; + TAOS_CHECK_RETURN(mndBuildDropMnodeRedoAction(pTrans, &dropReq, &dropEpSet)); } - if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) return -1; + TAOS_CHECK_RETURN(mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj)); } else { - return -1; + TAOS_RETURN(-1); } - return 0; + TAOS_RETURN(code); } int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj, bool force) { if (pObj == NULL) return 0; pObj->lastIndex = pMnode->applied; - if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj, force) != 0) return -1; - if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) return -1; + TAOS_CHECK_RETURN(mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj, force)); + TAOS_CHECK_RETURN(mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj)); return 0; } @@ -734,19 +779,23 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) { STrans *pTrans = NULL; pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "drop-mnode"); - if (pTrans == NULL) goto _OVER; + if (pTrans == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + goto _OVER; + } mndTransSetSerial(pTrans); mInfo("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id); - if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER; + TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER); - if (mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj, false) != 0) goto _OVER; - if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; + TAOS_CHECK_GOTO(mndSetDropMnodeInfoToTrans(pMnode, pTrans, pObj, false), NULL, _OVER); + TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER); code = 0; _OVER: mndTransDrop(pTrans); - return code; + TAOS_RETURN(code); } static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) { @@ -755,38 +804,35 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) { SMnodeObj *pObj = NULL; SMDropMnodeReq dropReq = {0}; - if (tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - goto _OVER; - } + TAOS_CHECK_GOTO(tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER); mInfo("mnode:%d, start to drop", dropReq.dnodeId); - if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) { - goto _OVER; - } + TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE), NULL, _OVER); if (dropReq.dnodeId <= 0) { - terrno = TSDB_CODE_INVALID_MSG; + code = TSDB_CODE_INVALID_MSG; goto _OVER; } pObj = mndAcquireMnode(pMnode, dropReq.dnodeId); if (pObj == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; goto _OVER; } if (pMnode->selfDnodeId == dropReq.dnodeId) { - terrno = TSDB_CODE_MND_CANT_DROP_LEADER; + code = TSDB_CODE_MND_CANT_DROP_LEADER; goto _OVER; } if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) { - terrno = TSDB_CODE_MND_TOO_FEW_MNODES; + code = TSDB_CODE_MND_TOO_FEW_MNODES; goto _OVER; } if (!mndIsDnodeOnline(pObj->pDnode, taosGetTimestampMs())) { - terrno = TSDB_CODE_DNODE_OFFLINE; + code = TSDB_CODE_DNODE_OFFLINE; goto _OVER; } @@ -805,7 +851,7 @@ _OVER: mndReleaseMnode(pMnode, pObj); tFreeSMCreateQnodeReq(&dropReq); - return code; + TAOS_RETURN(code); } static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { @@ -892,13 +938,11 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) { #if 1 return 0; #else + int32_t code = 0; SMnode *pMnode = pReq->info.node; SDAlterMnodeReq alterReq = {0}; - if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } + TAOS_CHECK_RETURN(tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq)); SMnodeOpt option = {.deploy = true, .numOfReplicas = alterReq.replica, .selfIndex = -1}; memcpy(option.replicas, alterReq.replicas, sizeof(alterReq.replicas)); @@ -913,9 +957,9 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) { return 0; } - if (mndWriteFile(pMnode->path, &option) != 0) { + if ((code = mndWriteFile(pMnode->path, &option)) != 0) { mError("failed to write mnode file since %s", terrstr()); - return -1; + TAOS_RETURN(code); } SSyncCfg cfg = {.replicaNum = alterReq.replica, .myIndex = -1}; @@ -939,14 +983,14 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) { } } - int32_t code = syncReconfig(pMnode->syncMgmt.sync, &cfg); + code = syncReconfig(pMnode->syncMgmt.sync, &cfg); if (code != 0) { mError("failed to sync reconfig since %s", terrstr()); } else { mInfo("alter mnode sync success"); } - return code; + TAOS_RETURN(code); #endif } diff --git a/source/dnode/mnode/impl/src/mndPerfSchema.c b/source/dnode/mnode/impl/src/mndPerfSchema.c index 33dc63bdf4..d54c27ce30 100644 --- a/source/dnode/mnode/impl/src/mndPerfSchema.c +++ b/source/dnode/mnode/impl/src/mndPerfSchema.c @@ -19,10 +19,11 @@ // connection/application/ int32_t mndInitPerfsTableSchema(const SSysDbTableSchema *pSrc, int32_t colNum, SSchema **pDst) { + int32_t code = 0; SSchema *schema = taosMemoryCalloc(colNum, sizeof(SSchema)); if (NULL == schema) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_RETURN(code); } for (int32_t i = 0; i < colNum; ++i) { @@ -34,10 +35,11 @@ int32_t mndInitPerfsTableSchema(const SSysDbTableSchema *pSrc, int32_t colNum, S } *pDst = schema; - return TSDB_CODE_SUCCESS; + TAOS_RETURN(code); } int32_t mndPerfsInitMeta(SHashObj *hash) { + int32_t code = 0; STableMetaRsp meta = {0}; tstrncpy(meta.dbFName, TSDB_INFORMATION_SCHEMA_DB, sizeof(meta.dbFName)); @@ -53,56 +55,56 @@ int32_t mndPerfsInitMeta(SHashObj *hash) { tstrncpy(meta.tbName, pSysDbTableMeta[i].name, sizeof(meta.tbName)); meta.numOfColumns = pSysDbTableMeta[i].colNum; - if (mndInitPerfsTableSchema(pSysDbTableMeta[i].schema, pSysDbTableMeta[i].colNum, &meta.pSchemas)) { - return -1; - } + TAOS_CHECK_RETURN(mndInitPerfsTableSchema(pSysDbTableMeta[i].schema, pSysDbTableMeta[i].colNum, &meta.pSchemas)); if (taosHashPut(hash, meta.tbName, strlen(meta.tbName), &meta, sizeof(meta))) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_RETURN(code); } } - return TSDB_CODE_SUCCESS; + TAOS_RETURN(code); } int32_t mndBuildPerfsTableSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) { + int32_t code = 0; if (NULL == pMnode->perfsMeta) { - terrno = TSDB_CODE_APP_ERROR; - return -1; + code = TSDB_CODE_APP_ERROR; + TAOS_RETURN(code); } STableMetaRsp *meta = (STableMetaRsp *)taosHashGet(pMnode->perfsMeta, tbName, strlen(tbName)); if (NULL == meta) { mError("invalid performance schema table name:%s", tbName); - terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; - return -1; + code = TSDB_CODE_PAR_TABLE_NOT_EXIST; + TAOS_RETURN(code); } *pRsp = *meta; pRsp->pSchemas = taosMemoryCalloc(meta->numOfColumns, sizeof(SSchema)); if (pRsp->pSchemas == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; pRsp->pSchemas = NULL; - return -1; + TAOS_RETURN(code); } memcpy(pRsp->pSchemas, meta->pSchemas, meta->numOfColumns * sizeof(SSchema)); - return 0; + TAOS_RETURN(code); } int32_t mndBuildPerfsTableCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp) { + int32_t code = 0; if (NULL == pMnode->perfsMeta) { - terrno = TSDB_CODE_APP_ERROR; - return -1; + code = TSDB_CODE_APP_ERROR; + TAOS_RETURN(code); } STableMetaRsp *pMeta = taosHashGet(pMnode->perfsMeta, tbName, strlen(tbName)); if (NULL == pMeta) { mError("invalid performance schema table name:%s", tbName); - terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; - return -1; + code = TSDB_CODE_PAR_TABLE_NOT_EXIST; + TAOS_RETURN(code); } strcpy(pRsp->tbName, pMeta->tbName); @@ -114,20 +116,21 @@ int32_t mndBuildPerfsTableCfg(SMnode *pMnode, const char *dbFName, const char *t pRsp->pSchemas = taosMemoryCalloc(pMeta->numOfColumns, sizeof(SSchema)); if (pRsp->pSchemas == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; pRsp->pSchemas = NULL; - return -1; + TAOS_RETURN(code); } memcpy(pRsp->pSchemas, pMeta->pSchemas, pMeta->numOfColumns * sizeof(SSchema)); - return 0; + TAOS_RETURN(code); } int32_t mndInitPerfs(SMnode *pMnode) { + int32_t code = 0; pMnode->perfsMeta = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (pMnode->perfsMeta == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_RETURN(code); } return mndPerfsInitMeta(pMnode->perfsMeta); diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 4224d79391..3921f2281a 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -85,22 +85,23 @@ static void mndCancelGetNextApp(SMnode *pMnode, void *pIter); static int32_t mndProcessSvrVerReq(SRpcMsg *pReq); int32_t mndInitProfile(SMnode *pMnode) { + int32_t code = 0; SProfileMgmt *pMgmt = &pMnode->profileMgmt; // in ms int32_t checkTime = tsShellActivityTimer * 2 * 1000; pMgmt->connCache = taosCacheInit(TSDB_DATA_TYPE_UINT, checkTime, false, (__cache_free_fn_t)mndFreeConn, "conn"); if (pMgmt->connCache == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; mError("failed to alloc profile cache since %s", terrstr()); - return -1; + TAOS_RETURN(code); } pMgmt->appCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, checkTime, true, (__cache_free_fn_t)mndFreeApp, "app"); if (pMgmt->appCache == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; mError("failed to alloc profile cache since %s", terrstr()); - return -1; + TAOS_RETURN(code); } mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq); @@ -116,7 +117,7 @@ int32_t mndInitProfile(SMnode *pMnode) { mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndRetrieveApps); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndCancelGetNextApp); - return 0; + TAOS_RETURN(code); } void mndCleanupProfile(SMnode *pMnode) { @@ -384,6 +385,7 @@ static SAppObj *mndCreateApp(SMnode *pMnode, uint32_t clientIp, SAppHbReq *pReq) static void mndFreeApp(SAppObj *pApp) { mTrace("app %" PRIx64 " is destroyed", pApp->appId); } static SAppObj *mndAcquireApp(SMnode *pMnode, int64_t appId) { + terrno = 0; SProfileMgmt *pMgmt = &pMnode->profileMgmt; SAppObj *pApp = taosCacheAcquireByKey(pMgmt->appCache, &appId, sizeof(appId)); @@ -431,13 +433,16 @@ static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) { } static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnInfo *connInfo) { + int32_t code = 0; SAppHbReq *pReq = &pHbReq->app; SAppObj *pApp = mndAcquireApp(pMnode, pReq->appId); if (pApp == NULL) { pApp = mndCreateApp(pMnode, connInfo->clientIp, pReq); if (pApp == NULL) { mError("failed to create new app %" PRIx64 " since %s", pReq->appId, terrstr()); - return -1; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); } else { mDebug("a new app %" PRIx64 " is created", pReq->appId); mndReleaseApp(pMnode, pApp); @@ -475,6 +480,7 @@ static int32_t mndGetOnlineDnodeNum(SMnode *pMnode, int32_t *num) { static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq, SClientHbBatchRsp *pBatchRsp, SConnPreparedObj *pObj) { + int32_t code = 0; SProfileMgmt *pMgmt = &pMnode->profileMgmt; SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL}; SRpcConnInfo connInfo = pMsg->info.conn; @@ -492,7 +498,9 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb pHbReq->app.pid, pHbReq->app.name, 0); if (pConn == NULL) { mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr()); - return -1; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); } else { mDebug("user:%s, conn:%u is freed, will create a new conn:%u", connInfo.user, pBasic->connId, pConn->id); } @@ -501,9 +509,9 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic)); if (rspBasic == NULL) { mndReleaseConn(pMnode, pConn, true); - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; mError("user:%s, conn:%u failed to process hb while since %s", pConn->user, pBasic->connId, terrstr()); - return -1; + TAOS_RETURN(code); } mndSaveQueryList(pConn, pBasic); @@ -539,9 +547,9 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb hbRsp.info = taosArrayInit(kvNum, sizeof(SKv)); if (NULL == hbRsp.info) { mError("taosArrayInit %d rsp kv failed", kvNum); - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; tFreeClientHbRsp(&hbRsp); - return -1; + TAOS_RETURN(code); } #ifdef TD_ENTERPRISE @@ -554,8 +562,8 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb mTrace("recv view dyn ver, bootTs:%" PRId64 ", ver:%" PRIu64, pDynViewVer->svrBootTs, pDynViewVer->dynViewVer); SDynViewVersion *pRspVer = NULL; - if (0 != mndValidateDynViewVersion(pMnode, pDynViewVer, &needCheck, &pRspVer)) { - return -1; + if (0 != (code = mndValidateDynViewVersion(pMnode, pDynViewVer, &needCheck, &pRspVer))) { + TAOS_RETURN(code); } if (needCheck) { @@ -647,13 +655,14 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb } static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) { + int32_t code = 0; SMnode *pMnode = pReq->info.node; SClientHbBatchReq batchReq = {0}; if (tDeserializeSClientHbBatchReq(pReq->pCont, pReq->contLen, &batchReq) != 0) { taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq); - terrno = TSDB_CODE_INVALID_MSG; - return -1; + code = TSDB_CODE_INVALID_MSG; + TAOS_RETURN(code); } SConnPreparedObj obj = {0}; @@ -699,31 +708,27 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) { taosArrayDestroy(obj.pQnodeList); - return 0; + TAOS_RETURN(code); } static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) { + int32_t code = 0; SMnode *pMnode = pReq->info.node; SProfileMgmt *pMgmt = &pMnode->profileMgmt; SKillQueryReq killReq = {0}; - if (tDeserializeSKillQueryReq(pReq->pCont, pReq->contLen, &killReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } + TAOS_CHECK_RETURN(tDeserializeSKillQueryReq(pReq->pCont, pReq->contLen, &killReq)); mInfo("kill query msg is received, queryId:%s", killReq.queryStrId); - if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_QUERY) != 0) { - return -1; - } + TAOS_CHECK_RETURN(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_QUERY)); int32_t connId = 0; uint64_t queryId = 0; char *p = strchr(killReq.queryStrId, ':'); if (NULL == p) { mError("invalid query id %s", killReq.queryStrId); - terrno = TSDB_CODE_MND_INVALID_QUERY_ID; - return -1; + code = TSDB_CODE_MND_INVALID_QUERY_ID; + TAOS_RETURN(code); } *p = 0; connId = taosStr2Int32(killReq.queryStrId, NULL, 16); @@ -732,40 +737,36 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) { SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &connId, sizeof(int32_t)); if (pConn == NULL) { mError("connId:%x, failed to kill queryId:%" PRIx64 ", conn not exist", connId, queryId); - terrno = TSDB_CODE_MND_INVALID_CONN_ID; - return -1; + code = TSDB_CODE_MND_INVALID_CONN_ID; + TAOS_RETURN(code); } else { mInfo("connId:%x, queryId:%" PRIx64 " is killed by user:%s", connId, queryId, pReq->info.conn.user); pConn->killId = queryId; taosCacheRelease(pMgmt->connCache, (void **)&pConn, false); - return 0; + TAOS_RETURN(code); } } static int32_t mndProcessKillConnReq(SRpcMsg *pReq) { + int32_t code = 0; SMnode *pMnode = pReq->info.node; SProfileMgmt *pMgmt = &pMnode->profileMgmt; SKillConnReq killReq = {0}; - if (tDeserializeSKillConnReq(pReq->pCont, pReq->contLen, &killReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } + TAOS_CHECK_RETURN(tDeserializeSKillConnReq(pReq->pCont, pReq->contLen, &killReq)); - if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_CONN) != 0) { - return -1; - } + TAOS_CHECK_RETURN(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_CONN)); SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &killReq.connId, sizeof(uint32_t)); if (pConn == NULL) { mError("connId:%u, failed to kill connection, conn not exist", killReq.connId); - terrno = TSDB_CODE_MND_INVALID_CONN_ID; - return -1; + code = TSDB_CODE_MND_INVALID_CONN_ID; + TAOS_RETURN(code); } else { mInfo("connId:%u, is killed by user:%s", killReq.connId, pReq->info.conn.user); pConn->killed = 1; taosCacheRelease(pMgmt->connCache, (void **)&pConn, false); - return TSDB_CODE_SUCCESS; + TAOS_RETURN(code); } } diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index b86f14e698..6cf0fbd387 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -138,15 +138,16 @@ _OVER: } static int32_t mndQnodeActionInsert(SSdb *pSdb, SQnodeObj *pObj) { + int32_t code = 0; mTrace("qnode:%d, perform insert action, row:%p", pObj->id, pObj); pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id); if (pObj->pDnode == NULL) { - terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + code = TSDB_CODE_MND_DNODE_NOT_EXIST; mError("qnode:%d, failed to perform insert action since %s", pObj->id, terrstr()); - return -1; + TAOS_RETURN(code); } - return 0; + TAOS_RETURN(code); } static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj) { @@ -166,27 +167,42 @@ static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew } static int32_t mndSetCreateQnodeRedoLogs(STrans *pTrans, SQnodeObj *pObj) { + int32_t code = 0; SSdbRaw *pRedoRaw = mndQnodeActionEncode(pObj); - if (pRedoRaw == NULL) return -1; - if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; - if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1; - return 0; + if (pRedoRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw)); + TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING)); + TAOS_RETURN(code); } static int32_t mndSetCreateQnodeUndoLogs(STrans *pTrans, SQnodeObj *pObj) { + int32_t code = 0; SSdbRaw *pUndoRaw = mndQnodeActionEncode(pObj); - if (pUndoRaw == NULL) return -1; - if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; - if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1; - return 0; + if (pUndoRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + TAOS_CHECK_RETURN(mndTransAppendUndolog(pTrans, pUndoRaw)); + TAOS_CHECK_RETURN(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED)); + TAOS_RETURN(code); } int32_t mndSetCreateQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj) { + int32_t code = 0; SSdbRaw *pCommitRaw = mndQnodeActionEncode(pObj); - if (pCommitRaw == NULL) return -1; - if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; - if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1; - return 0; + if (pCommitRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw)); + TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)); + TAOS_RETURN(code); } bool mndQnodeInDnode(SQnodeObj *pQnode, int32_t dnodeId) { @@ -194,6 +210,7 @@ bool mndQnodeInDnode(SQnodeObj *pQnode, int32_t dnodeId) { } int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) { + int32_t code = 0; SDCreateQnodeReq createReq = {0}; createReq.dnodeId = pDnode->id; @@ -212,23 +229,24 @@ int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeOb action.msgType = TDMT_DND_CREATE_QNODE; action.acceptableCode = TSDB_CODE_QNODE_ALREADY_DEPLOYED; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { + if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) { taosMemoryFree(pReq); - return -1; + TAOS_RETURN(code); } - return 0; + TAOS_RETURN(code); } static int32_t mndSetCreateQnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) { + int32_t code = 0; SDDropQnodeReq dropReq = {0}; dropReq.dnodeId = pDnode->id; int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, &dropReq); void *pReq = taosMemoryMalloc(contLen); if (pReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_RETURN(code); } tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq); @@ -239,12 +257,12 @@ static int32_t mndSetCreateQnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, S action.msgType = TDMT_DND_DROP_QNODE; action.acceptableCode = TSDB_CODE_QNODE_NOT_DEPLOYED; - if (mndTransAppendUndoAction(pTrans, &action) != 0) { + if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) { taosMemoryFree(pReq); - return -1; + TAOS_RETURN(code); } - return 0; + TAOS_RETURN(code); } static int32_t mndCreateQnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateQnodeReq *pCreate) { @@ -256,22 +274,26 @@ static int32_t mndCreateQnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, qnodeObj.updateTime = qnodeObj.createdTime; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-qnode"); - if (pTrans == NULL) goto _OVER; + if (pTrans == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + goto _OVER; + } mndTransSetSerial(pTrans); mInfo("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId); - if (mndSetCreateQnodeRedoLogs(pTrans, &qnodeObj) != 0) goto _OVER; - if (mndSetCreateQnodeUndoLogs(pTrans, &qnodeObj) != 0) goto _OVER; - if (mndSetCreateQnodeCommitLogs(pTrans, &qnodeObj) != 0) goto _OVER; - if (mndSetCreateQnodeRedoActions(pTrans, pDnode, &qnodeObj) != 0) goto _OVER; - if (mndSetCreateQnodeUndoActions(pTrans, pDnode, &qnodeObj) != 0) goto _OVER; - if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; + TAOS_CHECK_GOTO(mndSetCreateQnodeRedoLogs(pTrans, &qnodeObj), NULL, _OVER); + TAOS_CHECK_GOTO(mndSetCreateQnodeUndoLogs(pTrans, &qnodeObj), NULL, _OVER); + TAOS_CHECK_GOTO(mndSetCreateQnodeCommitLogs(pTrans, &qnodeObj), NULL, _OVER); + TAOS_CHECK_GOTO(mndSetCreateQnodeRedoActions(pTrans, pDnode, &qnodeObj), NULL, _OVER); + TAOS_CHECK_GOTO(mndSetCreateQnodeUndoActions(pTrans, pDnode, &qnodeObj), NULL, _OVER); + TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER); code = 0; _OVER: mndTransDrop(pTrans); - return code; + TAOS_RETURN(code); } static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) { @@ -281,19 +303,14 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) { SDnodeObj *pDnode = NULL; SMCreateQnodeReq createReq = {0}; - if (tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - goto _OVER; - } + TAOS_CHECK_GOTO(tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER); mInfo("qnode:%d, start to create", createReq.dnodeId); - if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_QNODE) != 0) { - goto _OVER; - } + TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_QNODE), NULL, _OVER); pObj = mndAcquireQnode(pMnode, createReq.dnodeId); if (pObj != NULL) { - terrno = TSDB_CODE_MND_QNODE_ALREADY_EXIST; + code = TSDB_CODE_MND_QNODE_ALREADY_EXIST; goto _OVER; } else if (terrno != TSDB_CODE_MND_QNODE_NOT_EXIST) { goto _OVER; @@ -301,7 +318,7 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) { pDnode = mndAcquireDnode(pMnode, createReq.dnodeId); if (pDnode == NULL) { - terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + code = TSDB_CODE_MND_DNODE_NOT_EXIST; goto _OVER; } @@ -320,34 +337,47 @@ _OVER: mndReleaseQnode(pMnode, pObj); mndReleaseDnode(pMnode, pDnode); tFreeSMCreateQnodeReq(&createReq); - return code; + TAOS_RETURN(code); } static int32_t mndSetDropQnodeRedoLogs(STrans *pTrans, SQnodeObj *pObj) { + int32_t code = 0; SSdbRaw *pRedoRaw = mndQnodeActionEncode(pObj); - if (pRedoRaw == NULL) return -1; - if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; - if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1; - return 0; + if (pRedoRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + TAOS_CHECK_RETURN(mndTransAppendRedolog(pTrans, pRedoRaw)); + TAOS_CHECK_RETURN(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)); + TAOS_RETURN(code); } static int32_t mndSetDropQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj) { + int32_t code = 0; SSdbRaw *pCommitRaw = mndQnodeActionEncode(pObj); - if (pCommitRaw == NULL) return -1; - if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; - if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; - return 0; + if (pCommitRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + if (mndTransAppendCommitlog(pTrans, pCommitRaw)) + ; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) + ; + TAOS_RETURN(code); } static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) { + int32_t code = 0; SDDropQnodeReq dropReq = {0}; dropReq.dnodeId = pDnode->id; int32_t contLen = tSerializeSCreateDropMQSNodeReq(NULL, 0, &dropReq); void *pReq = taosMemoryMalloc(contLen); if (pReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_RETURN(code); } tSerializeSCreateDropMQSNodeReq(pReq, contLen, &dropReq); @@ -358,20 +388,20 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn action.msgType = TDMT_DND_DROP_QNODE; action.acceptableCode = TSDB_CODE_QNODE_NOT_DEPLOYED; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { + if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) { taosMemoryFree(pReq); - return -1; + TAOS_RETURN(code); } - return 0; + TAOS_RETURN(code); } int32_t mndSetDropQnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SQnodeObj *pObj, bool force) { if (pObj == NULL) return 0; - if (mndSetDropQnodeRedoLogs(pTrans, pObj) != 0) return -1; - if (mndSetDropQnodeCommitLogs(pTrans, pObj) != 0) return -1; + TAOS_CHECK_RETURN(mndSetDropQnodeRedoLogs(pTrans, pObj)); + TAOS_CHECK_RETURN(mndSetDropQnodeCommitLogs(pTrans, pObj)); if (!force) { - if (mndSetDropQnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) return -1; + TAOS_CHECK_RETURN(mndSetDropQnodeRedoActions(pTrans, pObj->pDnode, pObj)); } return 0; } @@ -380,18 +410,22 @@ static int32_t mndDropQnode(SMnode *pMnode, SRpcMsg *pReq, SQnodeObj *pObj) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, "drop-qnode"); - if (pTrans == NULL) goto _OVER; + if (pTrans == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + goto _OVER; + } mndTransSetSerial(pTrans); mInfo("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id); - if (mndSetDropQnodeInfoToTrans(pMnode, pTrans, pObj, false) != 0) goto _OVER; - if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; + TAOS_CHECK_GOTO(mndSetDropQnodeInfoToTrans(pMnode, pTrans, pObj, false), NULL, _OVER); + TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER); code = 0; _OVER: mndTransDrop(pTrans); - return code; + TAOS_RETURN(code); } static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) { @@ -400,23 +434,20 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) { SQnodeObj *pObj = NULL; SMDropQnodeReq dropReq = {0}; - if (tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - goto _OVER; - } + TAOS_CHECK_GOTO(tDeserializeSCreateDropMQSNodeReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER); mInfo("qnode:%d, start to drop", dropReq.dnodeId); - if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_QNODE) != 0) { - goto _OVER; - } + TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_QNODE), NULL, _OVER); if (dropReq.dnodeId <= 0) { - terrno = TSDB_CODE_INVALID_MSG; + code = TSDB_CODE_INVALID_MSG; goto _OVER; } pObj = mndAcquireQnode(pMnode, dropReq.dnodeId); if (pObj == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; goto _OVER; } @@ -435,10 +466,11 @@ _OVER: mndReleaseQnode(pMnode, pObj); tFreeSDDropQnodeReq(&dropReq); - return code; + TAOS_RETURN(code); } int32_t mndCreateQnodeList(SMnode *pMnode, SArray **pList, int32_t limit) { + int32_t code = 0; SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; SQnodeObj *pObj = NULL; @@ -447,8 +479,8 @@ int32_t mndCreateQnodeList(SMnode *pMnode, SArray **pList, int32_t limit) { SArray *qnodeList = taosArrayInit(5, sizeof(SQueryNodeLoad)); if (NULL == qnodeList) { mError("failed to alloc epSet while process qnode list req"); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return terrno; + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_RETURN(code); } while (1) { @@ -484,20 +516,14 @@ static int32_t mndProcessQnodeListReq(SRpcMsg *pReq) { SQnodeListReq qlistReq = {0}; SQnodeListRsp qlistRsp = {0}; - if (tDeserializeSQnodeListReq(pReq->pCont, pReq->contLen, &qlistReq) != 0) { - mError("failed to parse qnode list req"); - terrno = TSDB_CODE_INVALID_MSG; - goto _OVER; - } + TAOS_CHECK_GOTO(tDeserializeSQnodeListReq(pReq->pCont, pReq->contLen, &qlistReq), NULL, _OVER); - if (mndCreateQnodeList(pMnode, &qlistRsp.qnodeList, qlistReq.rowNum) != 0) { - goto _OVER; - } + TAOS_CHECK_GOTO(mndCreateQnodeList(pMnode, &qlistRsp.qnodeList, qlistReq.rowNum), NULL, _OVER); int32_t rspLen = tSerializeSQnodeListRsp(NULL, 0, &qlistRsp); void *pRsp = rpcMallocCont(rspLen); if (pRsp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _OVER; } @@ -509,7 +535,7 @@ static int32_t mndProcessQnodeListReq(SRpcMsg *pReq) { _OVER: tFreeSQnodeListRsp(&qlistRsp); - return code; + TAOS_RETURN(code); } static int32_t mndRetrieveQnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index ae930f0d96..7c86b9aa74 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -88,7 +88,7 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { void *pRsp = NULL; SMnode *pMnode = pMsg->info.node; - if (tDeserializeSBatchReq(pMsg->pCont, pMsg->contLen, &batchReq)) { + if ((code = tDeserializeSBatchReq(pMsg->pCont, pMsg->contLen, &batchReq)) != 0) { code = TSDB_CODE_OUT_OF_MEMORY; mError("tDeserializeSBatchReq failed"); goto _exit; @@ -119,7 +119,7 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(req->msgType)]; if (fp == NULL) { mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); - terrno = TSDB_CODE_MSG_NOT_PROCESSED; + code = TSDB_CODE_MSG_NOT_PROCESSED; taosArrayDestroy(batchRsp.pRsps); return -1; } @@ -164,7 +164,7 @@ _exit: taosArrayDestroyEx(batchReq.pMsgs, tFreeSBatchReqMsg); taosArrayDestroyEx(batchRsp.pRsps, mnodeFreeSBatchRspMsg); - return code; + TAOS_RETURN(code); } int32_t mndInitQuery(SMnode *pMnode) { diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index db8c08e2e3..155975d580 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -50,17 +50,17 @@ static bool isCountWindowStreamTask(SSubplan* pPlan) { int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, int64_t deleteMark) { + int32_t code = 0; SNode* pAst = NULL; SQueryPlan* pPlan = NULL; - terrno = TSDB_CODE_SUCCESS; if (nodesStringToNode(ast, &pAst) < 0) { - terrno = TSDB_CODE_QRY_INVALID_INPUT; + code = TSDB_CODE_QRY_INVALID_INPUT; goto END; } if (qSetSTableIdForRsma(pAst, uid) < 0) { - terrno = TSDB_CODE_QRY_INVALID_INPUT; + code = TSDB_CODE_QRY_INVALID_INPUT; goto END; } @@ -75,33 +75,33 @@ int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64 }; if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) { - terrno = TSDB_CODE_QRY_INVALID_INPUT; + code = TSDB_CODE_QRY_INVALID_INPUT; goto END; } int32_t levelNum = LIST_LENGTH(pPlan->pSubplans); if (levelNum != 1) { - terrno = TSDB_CODE_QRY_INVALID_INPUT; + code = TSDB_CODE_QRY_INVALID_INPUT; goto END; } SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); int32_t opNum = LIST_LENGTH(inner->pNodeList); if (opNum != 1) { - terrno = TSDB_CODE_QRY_INVALID_INPUT; + code = TSDB_CODE_QRY_INVALID_INPUT; goto END; } SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); if (qSubPlanToString(plan, pDst, pDstLen) < 0) { - terrno = TSDB_CODE_QRY_INVALID_INPUT; + code = TSDB_CODE_QRY_INVALID_INPUT; goto END; } END: if (pAst) nodesDestroyNode(pAst); if (pPlan) nodesDestroyNode((SNode*)pPlan); - return terrno; + TAOS_RETURN(code); } int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) { @@ -127,6 +127,7 @@ int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) { int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList, SStreamTask* pTask) { + int32_t code = 0; bool isShuffle = false; if (pStream->fixedSinkVgId == 0) { @@ -135,9 +136,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr isShuffle = true; pTask->outputInfo.type = TASK_OUTPUT__SHUFFLE_DISPATCH; pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; - if (mndExtractDbInfo(pMnode, pDb, &pTask->outputInfo.shuffleDispatcher.dbInfo, NULL) < 0) { - return -1; - } + TAOS_CHECK_RETURN(mndExtractDbInfo(pMnode, pDb, &pTask->outputInfo.shuffleDispatcher.dbInfo, NULL)); } sdbRelease(pMnode->pSdb, pDb); @@ -166,7 +165,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr streamTaskSetFixedDownstreamInfo(pTask, pOneSinkTask); } - return 0; + TAOS_RETURN(code); } int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) { @@ -639,14 +638,15 @@ static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) { static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SEpSet* pEpset, int64_t skey, SArray* pVerList) { + int32_t code = 0; SSdb* pSdb = pMnode->pSdb; int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans); bool hasExtraSink = false; bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb); if (pDbObj == NULL) { - terrno = TSDB_CODE_QRY_INVALID_INPUT; - return -1; + code = TSDB_CODE_QRY_INVALID_INPUT; + TAOS_RETURN(code); } bool multiTarget = (pDbObj->cfg.numOfVgroups > 1); @@ -670,9 +670,11 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* SSubplan* plan = getScanSubPlan(pPlan); // source plan if (plan == NULL) { - return terrno; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); } - int32_t code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, numOfPlanLevel == 1); + code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, numOfPlanLevel == 1); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -688,7 +690,9 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* if (numOfPlanLevel == 3) { plan = getAggSubPlan(pPlan, 1); // middle agg plan if (plan == NULL) { - return terrno; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); } do { SArray** list = taosArrayGetLast(pStream->tasks); @@ -715,7 +719,9 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* plan = getAggSubPlan(pPlan, 0); if (plan == NULL) { - return terrno; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); } mDebug("doScheduleStream add final agg"); @@ -724,7 +730,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* addNewTaskList(pStream); code = addAggTask(pStream, pMnode, plan, pEpset, true); if (code != TSDB_CODE_SUCCESS) { - return code; + TAOS_RETURN(code); } bindTwoLevel(pStream->tasks, 0, size); if (pStream->conf.fillHistory) { @@ -735,26 +741,28 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* if (pStream->conf.fillHistory) { bindAggSink(pStream, pMnode, pStream->pHTasksList); } - return TDB_CODE_SUCCESS; + TAOS_RETURN(code); } int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t skey, SArray* pVgVerList) { + int32_t code = 0; SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); if (pPlan == NULL) { - terrno = TSDB_CODE_QRY_INVALID_INPUT; - return -1; + code = TSDB_CODE_QRY_INVALID_INPUT; + TAOS_RETURN(code); } SEpSet mnodeEpset = {0}; mndGetMnodeEpSet(pMnode, &mnodeEpset); - int32_t code = doScheduleStream(pStream, pMnode, pPlan, &mnodeEpset, skey, pVgVerList); + code = doScheduleStream(pStream, pMnode, pPlan, &mnodeEpset, skey, pVgVerList); qDestroyQueryPlan(pPlan); - return code; + TAOS_RETURN(code); } int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) { + int32_t code = 0; SSdb* pSdb = pMnode->pSdb; SVgObj* pVgroup = NULL; SQueryPlan* pPlan = NULL; @@ -763,21 +771,21 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib if (pTopic->subType == TOPIC_SUB_TYPE__COLUMN) { pPlan = qStringToQueryPlan(pTopic->physicalPlan); if (pPlan == NULL) { - terrno = TSDB_CODE_QRY_INVALID_INPUT; - return -1; + code = TSDB_CODE_QRY_INVALID_INPUT; + TAOS_RETURN(code); } } else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL) { SNode* pAst = NULL; - if (nodesStringToNode(pTopic->ast, &pAst) != 0) { + if ((code = nodesStringToNode(pTopic->ast, &pAst)) != 0) { mError("topic:%s, failed to create since %s", pTopic->name, terrstr()); - return -1; + TAOS_RETURN(code); } SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true}; - if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) { + if ((code = qCreateQueryPlan(&cxt, &pPlan, NULL)) != 0) { mError("failed to create topic:%s since %s", pTopic->name, terrstr()); nodesDestroyNode(pAst); - return -1; + TAOS_RETURN(code); } nodesDestroyNode(pAst); } @@ -786,8 +794,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib int32_t levelNum = LIST_LENGTH(pPlan->pSubplans); if (levelNum != 1) { qDestroyQueryPlan(pPlan); - terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY; - return -1; + code = TSDB_CODE_MND_INVALID_TOPIC_QUERY; + TAOS_RETURN(code); } SNodeListNode* pNodeListNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); @@ -795,8 +803,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib int32_t opNum = LIST_LENGTH(pNodeListNode->pNodeList); if (opNum != 1) { qDestroyQueryPlan(pPlan); - terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY; - return -1; + code = TSDB_CODE_MND_INVALID_TOPIC_QUERY; + TAOS_RETURN(code); } pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0); @@ -831,13 +839,13 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) { qDestroyQueryPlan(pPlan); - terrno = TSDB_CODE_QRY_INVALID_INPUT; - return -1; + code = TSDB_CODE_QRY_INVALID_INPUT; + TAOS_RETURN(code); } } else { pSub->qmsg = taosStrdup(""); } qDestroyQueryPlan(pPlan); - return 0; + TAOS_RETURN(code); }