This commit is contained in:
dmchen 2024-07-18 02:36:28 +00:00
parent f727668ebb
commit 8501d0a909
1 changed files with 9 additions and 7 deletions

View File

@ -137,7 +137,7 @@ static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const ch
return 0;
FAILED:
mndReleaseTopic(pMnode, pTopic);
return code;
TAOS_RETURN(code);
}
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
@ -186,7 +186,7 @@ END:
mndReleaseConsumer(pMnode, pConsumer);
tDeleteSMqConsumerObj(pConsumerNew);
mndTransDrop(pTrans);
return code;
TAOS_RETURN(code);
}
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
@ -229,7 +229,7 @@ END:
mndReleaseConsumer(pMnode, pConsumer);
tDeleteSMqConsumerObj(pConsumerNew);
mndTransDrop(pTrans);
return code;
TAOS_RETURN(code);
}
static int32_t checkPrivilege(SMnode *pMnode, SMqConsumerObj *pConsumer, SMqHbRsp *rsp, char *user) {
@ -420,6 +420,7 @@ static int32_t addEpSetInfo(SMnode *pMnode, SMqConsumerObj *pConsumer, int32_t e
}
static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoch, int64_t consumerId){
int32_t code = 0;
// encode rsp
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqAskEpRsp(NULL, rsp);
void *buf = rpcMallocCont(tlen);
@ -436,12 +437,12 @@ static int32_t buildAskEpRsp(SRpcMsg *pMsg, SMqAskEpRsp *rsp, int32_t serverEpoc
pHead->walever = 0;
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
TAOS_CHECK_RETURN(tEncodeSMqAskEpRsp(&abuf, rsp));
tEncodeSMqAskEpRsp(&abuf, rsp);
// send rsp
pMsg->info.rsp = buf;
pMsg->info.rspLen = tlen;
return 0;
TAOS_RETURN(code);
}
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
@ -582,6 +583,7 @@ static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerOb
}
static int32_t checkAndSortTopic(SMnode *pMnode, SArray *pTopicList){
int32_t code = 0;
taosArraySort(pTopicList, taosArrayCompareString);
taosArrayRemoveDuplicate(pTopicList, taosArrayCompareString, freeItem);
@ -592,7 +594,7 @@ static int32_t checkAndSortTopic(SMnode *pMnode, SArray *pTopicList){
return TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE;
}
}
return TSDB_CODE_SUCCESS;
TAOS_RETURN(code);
}
static SMqConsumerObj* buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscribe){
@ -694,7 +696,7 @@ _over:
mndTransDrop(pTrans);
tDeleteSMqConsumerObj(pConsumerNew);
taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
return code;
TAOS_RETURN(code);
}
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {