fix:[TD-26323]add macro to control tmq assert & make subscribe transaction no conflicts

This commit is contained in:
wangmm0220 2023-09-15 19:26:55 +08:00
parent e20c8b39f4
commit 8ebb6e202e
1 changed files with 7 additions and 1 deletions

View File

@ -401,7 +401,9 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
if(pSub == NULL){ if(pSub == NULL){
#ifdef TMQ_DEBUG
ASSERT(0); ASSERT(0);
#endif
continue; continue;
} }
taosWLockLatch(&pSub->lock); taosWLockLatch(&pSub->lock);
@ -499,7 +501,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
// txn guarantees pSub is created // txn guarantees pSub is created
if(pSub == NULL) { if(pSub == NULL) {
#ifdef TMQ_DEBUG
ASSERT(0); ASSERT(0);
#endif
continue; continue;
} }
taosRLockLatch(&pSub->lock); taosRLockLatch(&pSub->lock);
@ -510,7 +514,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
// 2.1 fetch topic schema // 2.1 fetch topic schema
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
if(pTopic == NULL) { if(pTopic == NULL) {
#ifdef TMQ_DEBUG
ASSERT(0); ASSERT(0);
#endif
taosRUnLockLatch(&pSub->lock); taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
continue; continue;
@ -649,7 +655,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
} }
// check topic existence // check topic existence
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "subscribe"); pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
if (pTrans == NULL) { if (pTrans == NULL) {
goto _over; goto _over;
} }