From 8ebb6e202ed1cdf56df54cef1402643cdde798ea Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 15 Sep 2023 19:26:55 +0800 Subject: [PATCH] fix:[TD-26323]add macro to control tmq assert & make subscribe transaction no conflicts --- source/dnode/mnode/impl/src/mndConsumer.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index f25bd2cffb..7838b967b0 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -401,7 +401,9 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); if(pSub == NULL){ +#ifdef TMQ_DEBUG ASSERT(0); +#endif continue; } taosWLockLatch(&pSub->lock); @@ -499,7 +501,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); // txn guarantees pSub is created if(pSub == NULL) { +#ifdef TMQ_DEBUG ASSERT(0); +#endif continue; } taosRLockLatch(&pSub->lock); @@ -510,7 +514,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { // 2.1 fetch topic schema SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); if(pTopic == NULL) { +#ifdef TMQ_DEBUG ASSERT(0); +#endif taosRUnLockLatch(&pSub->lock); mndReleaseSubscribe(pMnode, pSub); continue; @@ -649,7 +655,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { } // check topic existence - pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "subscribe"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); if (pTrans == NULL) { goto _over; }