fix(tmq): unref topic
This commit is contained in:
parent
65ba238ecd
commit
984b831563
|
@ -419,6 +419,8 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
|||
SMqTopicObj topicObj = {0};
|
||||
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
|
||||
topicObj.refConsumerCnt = pTopic->refConsumerCnt + 1;
|
||||
mInfo("subscribe topic %s by consumer %ld cgroup %s, refcnt %d", pTopic->name, consumerId, cgroup,
|
||||
topicObj.refConsumerCnt);
|
||||
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER;
|
||||
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
|
|
|
@ -417,7 +417,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
|||
|
||||
// 2. redo log: subscribe and vg assignment
|
||||
// subscribe
|
||||
if (mndSetSubRedoLogs(pMnode, pTrans, pOutput->pSub) != 0) {
|
||||
if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) {
|
||||
goto REB_FAIL;
|
||||
}
|
||||
|
||||
|
@ -479,6 +479,10 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
|||
SMqTopicObj topicObj = {0};
|
||||
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
|
||||
topicObj.refConsumerCnt = pTopic->refConsumerCnt - consumerNum;
|
||||
// TODO is that correct?
|
||||
pTopic->refConsumerCnt = topicObj.refConsumerCnt;
|
||||
mInfo("subscribe topic %s unref %d consumer cgroup %s, refcnt %d", pTopic->name, consumerNum, cgroup,
|
||||
topicObj.refConsumerCnt);
|
||||
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto REB_FAIL;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -813,7 +813,7 @@ class TDTestCase:
|
|||
|
||||
self.tmqCase1(cfgPath, buildPath)
|
||||
self.tmqCase2(cfgPath, buildPath)
|
||||
# self.tmqCase2a(cfgPath, buildPath)
|
||||
self.tmqCase2a(cfgPath, buildPath)
|
||||
self.tmqCase3(cfgPath, buildPath)
|
||||
self.tmqCase4(cfgPath, buildPath)
|
||||
self.tmqCase5(cfgPath, buildPath)
|
||||
|
|
Loading…
Reference in New Issue