Merge pull request #22938 from taosdata/fix/TD-26323

fix:[TD-26323]add macro to control tmq assert & make subscribe transa…
This commit is contained in:
Haojun Liao 2023-09-18 09:48:39 +08:00 committed by GitHub
commit 8aa34802f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 7 additions and 1 deletions

View File

@ -401,7 +401,9 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
if(pSub == NULL){
#ifdef TMQ_DEBUG
ASSERT(0);
#endif
continue;
}
taosWLockLatch(&pSub->lock);
@ -499,7 +501,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
// txn guarantees pSub is created
if(pSub == NULL) {
#ifdef TMQ_DEBUG
ASSERT(0);
#endif
continue;
}
taosRLockLatch(&pSub->lock);
@ -510,7 +514,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
// 2.1 fetch topic schema
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
if(pTopic == NULL) {
#ifdef TMQ_DEBUG
ASSERT(0);
#endif
taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
continue;
@ -649,7 +655,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
}
// check topic existence
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "subscribe");
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
if (pTrans == NULL) {
goto _over;
}