diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 05156e1427..c9ee66d3a0 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -126,31 +126,37 @@ void mndRebCntDec() { } static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, bool enableReplay) { - int32_t numOfTopics = taosArrayGetSize(pTopicList); + SMqTopicObj *pTopic = NULL; + int32_t code = 0; + int32_t numOfTopics = taosArrayGetSize(pTopicList); for (int32_t i = 0; i < numOfTopics; i++) { char *pOneTopic = taosArrayGetP(pTopicList, i); - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic); + pTopic = mndAcquireTopic(pMnode, pOneTopic); if (pTopic == NULL) { // terrno has been set by callee function - return -1; + code = -1; + goto FAILED; } if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) { - mndReleaseTopic(pMnode, pTopic); - return -1; + code = -1; + goto FAILED; } if(enableReplay){ if(pTopic->subType != TOPIC_SUB_TYPE__COLUMN){ - return TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT; + code = TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT; + goto FAILED; }else if(pTopic->ntbUid == 0 && pTopic->ctbStbUid == 0) { SDbObj *pDb = mndAcquireDb(pMnode, pTopic->db); if (pDb == NULL) { - mndReleaseTopic(pMnode, pTopic); - return -1; + code = -1; + goto FAILED; } if (pDb->cfg.numOfVgroups != 1) { - return TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP; + mndReleaseDb(pMnode, pDb); + code = TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP; + goto FAILED; } mndReleaseDb(pMnode, pDb); } @@ -158,13 +164,16 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode * mndTransSetDbName(pTrans, pOneTopic, NULL); if(mndTransCheckConflict(pMnode, pTrans) != 0){ - mndReleaseTopic(pMnode, pTopic); - return -1; + code = -1; + goto FAILED; } mndReleaseTopic(pMnode, pTopic); } return 0; +FAILED: + mndReleaseTopic(pMnode, pTopic); + return code; } static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 215f8d3cb2..a4c3d395e3 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -40,11 +40,11 @@ void tqUpdateNodeStage(STQ* pTq) { tqDebug("vgId:%d update the meta stage to be:%"PRId64, pTq->pStreamMeta->vgId, pTq->pStreamMeta->stage); } -static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset, bool withTbName) { +static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) { pRsp->reqOffset = pOffset; pRsp->rspOffset = pOffset; - pRsp->withTbName = withTbName; + pRsp->withTbName = 1; pRsp->withSchema = 1; pRsp->blockData = taosArrayInit(0, sizeof(void*)); pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t)); @@ -177,7 +177,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, int32_t vgId = TD_VID(pTq->pVnode); SMqMetaRsp metaRsp = {0}; STaosxRsp taosxRsp = {0}; - tqInitTaosxRsp(&taosxRsp, *offset, pRequest->withTbName); + tqInitTaosxRsp(&taosxRsp, *offset); if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {