diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index e2bedc258a..c61949b316 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -44,11 +44,16 @@ static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter); static void mndCheckConsumer(SRpcMsg *pMsg, SHashObj *hash); static int32_t mndSetSubCommitLogs(STrans *pTrans, SMqSubscribeObj *pSub) { + int32_t code = 0; SSdbRaw *pCommitRaw = mndSubActionEncode(pSub); - if (pCommitRaw == NULL) return -1; - if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; - if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1; - return 0; + if (pCommitRaw == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw)); + TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)); + TAOS_RETURN(code); } int32_t mndInitSubscribe(SMnode *pMnode) { @@ -75,6 +80,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) { } static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj *pTopic, const char *subKey) { + int32_t code = 0; SMqSubscribeObj *pSub = tNewSubscribeObj(subKey); if (pSub == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -86,7 +92,7 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj pSub->subType = pTopic->subType; pSub->withMeta = pTopic->withMeta; - if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) { + if ((terrno = mndSchedInitSubEp(pMnode, pTopic, pSub)) < 0) { tDeleteSubscribeObj(pSub); taosMemoryFree(pSub); return NULL; @@ -97,6 +103,7 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg, SSubplan *pPlan) { + int32_t code = 0; SMqRebVgReq req = {0}; req.oldConsumerId = pRebVg->oldConsumerId; req.newConsumerId = pRebVg->newConsumerId; @@ -106,8 +113,8 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj pPlan->execNode.nodeId = pRebVg->pVgEp->vgId; int32_t msgLen; if (qSubPlanToString(pPlan, &req.qmsg, &msgLen) < 0) { - terrno = TSDB_CODE_QRY_INVALID_INPUT; - return -1; + code = TSDB_CODE_QRY_INVALID_INPUT; + TAOS_RETURN(code); } } else { req.qmsg = taosStrdup(""); @@ -122,7 +129,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, ret); if (ret < 0) { taosMemoryFree(req.qmsg); - return -1; + TAOS_RETURN(ret); } tlen += sizeof(SMsgHead); @@ -130,7 +137,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(req.qmsg); - return -1; + TAOS_RETURN(ret); } SMsgHead *pMsgHead = (SMsgHead *)buf; @@ -140,40 +147,41 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj SEncoder encoder = {0}; tEncoderInit(&encoder, POINTER_SHIFT(buf, sizeof(SMsgHead)), tlen); - if (tEncodeSMqRebVgReq(&encoder, &req) < 0) { + if ((code = tEncodeSMqRebVgReq(&encoder, &req)) < 0) { taosMemoryFreeClear(buf); tEncoderClear(&encoder); taosMemoryFree(req.qmsg); - return -1; + TAOS_RETURN(code); } tEncoderClear(&encoder); *pBuf = buf; *pLen = tlen; taosMemoryFree(req.qmsg); - return 0; + TAOS_RETURN(code); } static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg, SSubplan *pPlan) { + int32_t code = 0; if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { if (pRebVg->oldConsumerId == -1) return 0; // drop stream, no consumer, while split vnode,all consumerId is -1 - terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; - return -1; + code = TSDB_CODE_MND_INVALID_SUB_OPTION; + TAOS_RETURN(code); } void *buf; int32_t tlen; - if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg, pPlan) < 0) { - return -1; + if ((code = mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg, pPlan)) < 0) { + TAOS_RETURN(code); } int32_t vgId = pRebVg->pVgEp->vgId; SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); if (pVgObj == NULL) { taosMemoryFree(buf); - terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST; - return -1; + code = TSDB_CODE_MND_VGROUP_NOT_EXIST; + TAOS_RETURN(code); } STransAction action = {0}; @@ -183,11 +191,11 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubsc action.msgType = TDMT_VND_TMQ_SUBSCRIBE; mndReleaseVgroup(pMnode, pVgObj); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { + if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) { taosMemoryFree(buf); - return -1; + TAOS_RETURN(code); } - return 0; + TAOS_RETURN(code); } static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, bool fullName) { @@ -209,6 +217,7 @@ static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup, } static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { + terrno = 0; SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1); if (pRebInfo == NULL) { pRebInfo = tNewSMqRebSubscribe(key); @@ -612,7 +621,6 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu if (strcmp(pOutput->pSub->qmsg, "") != 0) { code = qStringToSubplan(pOutput->pSub->qmsg, &pPlan); if (code != 0) { - terrno = code; goto END; } } @@ -623,7 +631,8 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb"); if (pTrans == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; goto END; } @@ -665,7 +674,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu END: nodesDestroyNode((SNode *)pPlan); mndTransDrop(pTrans); - return code; + TAOS_RETURN(code); } static void freeRebalanceItem(void *param) { @@ -827,6 +836,7 @@ static void checkConsumer(SMnode *pMnode, SMqSubscribeObj *pSub) { } static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOutputObj *rebOutput) { + int32_t code = 0; const char *key = rebInput->pRebInfo->key; SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, key); @@ -838,8 +848,10 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); if (pTopic == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; mError("[rebalance] mq rebalance %s ignored since topic %s doesn't exist", key, topic); - return -1; + TAOS_RETURN(code); } taosRLockLatch(&pTopic->lock); @@ -848,10 +860,12 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu rebOutput->pSub = mndCreateSubscription(pMnode, pTopic, key); if (rebOutput->pSub == NULL) { - mError("[rebalance] mq rebalance %s failed create sub since %s, ignore", key, terrstr()); + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + mError("[rebalance] mq rebalance %s failed create sub since %s, ignore", key, tstrerror(code)); taosRUnLockLatch(&pTopic->lock); mndReleaseTopic(pMnode, pTopic); - return -1; + TAOS_RETURN(code); } memcpy(rebOutput->pSub->dbName, pTopic->db, TSDB_DB_FNAME_LEN); @@ -869,7 +883,7 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu mInfo("[rebalance] sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum); mndReleaseSubscribe(pMnode, pSub); } - return 0; + TAOS_RETURN(code); } static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { @@ -885,6 +899,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK); if (rebSubHash == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; + if (terrno != 0) code = terrno; goto END; } taosHashSetFreeFp(rebSubHash, freeRebalanceItem); @@ -915,8 +930,8 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { mndDoRebalance(pMnode, &rebInput, &rebOutput); - if (mndPersistRebResult(pMnode, pMsg, &rebOutput) != 0) { - mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", terrstr()) + if ((code = mndPersistRebResult(pMnode, pMsg, &rebOutput)) != 0) { + mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", tstrerror(code)) } clearRebOutput(&rebOutput); @@ -931,7 +946,7 @@ END: taosHashCleanup(rebSubHash); mndRebCntDec(); - return code; + TAOS_RETURN(code); } static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans) { @@ -950,8 +965,7 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran } SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq)); if (pReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - ret = -1; + ret = TSDB_CODE_OUT_OF_MEMORY; goto END; } pReq->head.vgId = htonl(pVgObj->vgId); @@ -968,15 +982,14 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST; sdbRelease(pMnode->pSdb, pVgObj); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - ret = -1; + if ((ret = mndTransAppendRedoAction(pTrans, &action)) != 0) { goto END; } } END: sdbRelease(pMnode->pSdb, pVgObj); sdbCancelFetch(pMnode->pSdb, pIter); - return ret; + TAOS_RETURN(ret); } static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) { @@ -1012,7 +1025,7 @@ static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgro END: sdbRelease(pMnode->pSdb, pConsumer); sdbCancelFetch(pMnode->pSdb, pIter); - return ret; + TAOS_RETURN(ret); } static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { @@ -1022,8 +1035,8 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { int32_t code = TSDB_CODE_ACTION_IN_PROGRESS; if (tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; + code = TSDB_CODE_INVALID_MSG; + TAOS_RETURN(code); } SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, dropReq.cgroup, dropReq.topic); @@ -1032,24 +1045,24 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { mInfo("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic); return 0; } else { - terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST; - mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, terrstr()); - return -1; + code = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST; + mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, tstrerror(code)); + TAOS_RETURN(code); } } taosWLockLatch(&pSub->lock); if (taosHashGetSize(pSub->consumerHash) != 0) { - terrno = TSDB_CODE_MND_CGROUP_USED; - mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); - code = -1; + code = TSDB_CODE_MND_CGROUP_USED; + mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, tstrerror(code)); goto end; } pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "drop-cgroup"); if (pTrans == NULL) { - mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); - code = -1; + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, tstrerror(code)); goto end; } @@ -1088,9 +1101,9 @@ end: if (code != 0) { mError("cgroup %s on topic:%s, failed to drop", dropReq.cgroup, dropReq.topic); - return code; + TAOS_RETURN(code); } - return TSDB_CODE_ACTION_IN_PROGRESS; + TAOS_RETURN(TSDB_CODE_ACTION_IN_PROGRESS); } void mndCleanupSubscribe(SMnode *pMnode) {} @@ -1340,7 +1353,7 @@ END: sdbRelease(pSdb, pSub); sdbCancelFetch(pSdb, pIter); - return code; + TAOS_RETURN(code); } static int32_t buildResult(SSDataBlock *pBlock, int32_t *numOfRows, int64_t consumerId, const char *topic, diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 573b75ff5a..70d0b858f6 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -123,15 +123,16 @@ _OUT: } static int32_t mndTransValidateImp(SMnode *pMnode, STrans *pTrans) { + int32_t code = 0; if (pTrans->stage == TRN_STAGE_PREPARE) { - if (mndTransCheckConflict(pMnode, pTrans) < 0) { + if ((code = mndTransCheckConflict(pMnode, pTrans)) < 0) { mError("trans:%d, failed to validate trans conflicts.", pTrans->id); - return -1; + TAOS_RETURN(code); } return mndTransValidatePrepareStage(pMnode, pTrans); } - return 0; + TAOS_RETURN(code); } static int32_t mndTransValidate(SMnode *pMnode, SSdbRaw *pRaw) { @@ -139,10 +140,18 @@ static int32_t mndTransValidate(SMnode *pMnode, SSdbRaw *pRaw) { int32_t code = -1; SSdbRow *pRow = mndTransDecode(pRaw); - if (pRow == NULL) goto _OUT; + if (pRow == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + goto _OUT; + } pTrans = sdbGetRowObj(pRow); - if (pTrans == NULL) goto _OUT; + if (pTrans == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + goto _OUT; + } code = mndTransValidateImp(pMnode, pTrans); @@ -150,11 +159,10 @@ _OUT: if (pTrans) mndTransDropData(pTrans); if (pRow) taosMemoryFreeClear(pRow); if (code) terrno = (terrno ? terrno : TSDB_CODE_MND_TRANS_CONFLICT); - return code; + TAOS_RETURN(code); } int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) { - terrno = TSDB_CODE_SUCCESS; SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSdbRaw *pRaw = pMsg->pCont; STrans *pTrans = NULL; @@ -163,7 +171,7 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) { if (transId <= 0) { mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq); - terrno = TSDB_CODE_INVALID_MSG; + code = TSDB_CODE_INVALID_MSG; goto _OUT; } @@ -176,7 +184,7 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) { if (code != 0) { mError("trans:%d, failed to validate requested trans since %s", transId, terrstr()); code = 0; - pMeta->code = terrno; + pMeta->code = code; goto _OUT; } @@ -184,13 +192,15 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) { if (code != 0) { mError("trans:%d, failed to write to sdb since %s", transId, terrstr()); code = 0; - pMeta->code = terrno; + pMeta->code = code; goto _OUT; } pTrans = mndAcquireTrans(pMnode, transId); if (pTrans == NULL) { - mError("trans:%d, not found while execute in mnode since %s", transId, terrstr()); + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + mError("trans:%d, not found while execute in mnode since %s", transId, tstrerror(code)); goto _OUT; } @@ -207,7 +217,7 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) { _OUT: if (pTrans) mndReleaseTrans(pMnode, pTrans); - return code; + TAOS_RETURN(code); } static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) { @@ -258,7 +268,7 @@ _OUT: mndPostMgmtCode(pMnode, code ? code : pMeta->code); rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; - return code; + TAOS_RETURN(code); } SyncIndex mndSyncAppliedIndex(const SSyncFSM *pFSM) { @@ -466,16 +476,18 @@ int32_t mndInitSync(SMnode *pMnode) { pNode->clusterId); } + int32_t code = 0; tsem_init(&pMgmt->syncSem, 0, 0); pMgmt->sync = syncOpen(&syncInfo, true); if (pMgmt->sync <= 0) { - mError("failed to open sync since %s", terrstr()); - return -1; + if (terrno != 0) code = terrno; + mError("failed to open sync since %s", tstrerror(code)); + TAOS_RETURN(code); } pMnode->pSdb->sync = pMgmt->sync; mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync); - return 0; + TAOS_RETURN(code); } void mndCleanupSync(SMnode *pMnode) { @@ -518,10 +530,10 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; - if (req.contLen <= 0) return -1; + if (req.contLen <= 0) return TSDB_CODE_OUT_OF_MEMORY; req.pCont = rpcMallocCont(req.contLen); - if (req.pCont == NULL) return -1; + if (req.pCont == NULL) return TSDB_CODE_OUT_OF_MEMORY; memcpy(req.pCont, pRaw, req.contLen); taosThreadMutexLock(&pMgmt->lock); @@ -531,8 +543,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId); taosThreadMutexUnlock(&pMgmt->lock); rpcFreeCont(req.pCont); - terrno = TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED; - return terrno; + TAOS_RETURN(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED); } mInfo("trans:%d, will be proposed", transId);