From d0c101f60540270be828308df021635139f0e825 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 5 Aug 2024 15:03:51 +0800 Subject: [PATCH 1/2] fix:[TD-31115] modify trans conflict level in tmq --- source/dnode/mnode/impl/src/mndConsumer.c | 18 ++++++++++-------- source/dnode/mnode/impl/src/mndSubscribe.c | 6 +++--- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 1a9f808688..121dd3a09e 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -87,19 +87,18 @@ END: return code; } -static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser, - bool enableReplay) { +static int32_t validateTopics(STrans* pTrans, SCMSubscribeReq *subscribe, SMnode *pMnode, const char *pUser) { SMqTopicObj *pTopic = NULL; int32_t code = 0; - int32_t numOfTopics = taosArrayGetSize(pTopicList); + int32_t numOfTopics = taosArrayGetSize(subscribe->topicNames); for (int32_t i = 0; i < numOfTopics; i++) { - char *pOneTopic = taosArrayGetP(pTopicList, i); + char *pOneTopic = taosArrayGetP(subscribe->topicNames, i); MND_TMQ_RETURN_CHECK(mndAcquireTopic(pMnode, pOneTopic, &pTopic)); MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic)); MND_TMQ_RETURN_CHECK(grantCheckExpire(TSDB_GRANT_SUBSCRIPTION)); - if (enableReplay) { + if (subscribe->enableReplay) { if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) { code = TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT; goto END; @@ -117,6 +116,10 @@ static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const ch mndReleaseDb(pMnode, pDb); } } + char key[TSDB_CONSUMER_ID_LEN] = {0}; + (void)snprintf(key, TSDB_CONSUMER_ID_LEN, "%"PRIx64, subscribe->consumerId); + mndTransSetDbName(pTrans, pTopic->db, key); + MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans)); mndReleaseTopic(pMnode, pTopic); } return 0; @@ -128,7 +131,6 @@ END: static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { int32_t code = 0; - int32_t lino = 0; SMnode *pMnode = pMsg->info.node; SMqConsumerClearMsg *pClearMsg = pMsg->pCont; SMqConsumerObj *pConsumerNew = NULL; @@ -569,10 +571,10 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { mndReleaseConsumer(pMnode, pConsumerTmp); } MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames)); - pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pMsg, "subscribe"); MND_TMQ_NULL_CHECK(pTrans); - MND_TMQ_RETURN_CHECK(validateTopics(subscribe.topicNames, pMnode, pMsg->info.conn.user, subscribe.enableReplay)); + MND_TMQ_RETURN_CHECK(validateTopics(pTrans, &subscribe, pMnode, pMsg->info.conn.user)); MND_TMQ_RETURN_CHECK(buildSubConsumer(pMnode, &subscribe, &pConsumerNew)); MND_TMQ_RETURN_CHECK(mndSetConsumerCommitLogs(pTrans, pConsumerNew)); MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans)); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 585bf5527d..ff374c0ef4 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -650,7 +650,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu goto END; } - mndTransSetDbName(pTrans, pOutput->pSub->dbName, cgroup); + mndTransSetDbName(pTrans, pOutput->pSub->dbName, pOutput->pSub->key); MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans)); // 1. redo action: action to all vg @@ -1079,10 +1079,10 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { goto END; } - pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "drop-cgroup"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pMsg, "drop-cgroup"); MND_TMQ_NULL_CHECK(pTrans); mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic); - mndTransSetDbName(pTrans, pSub->dbName, dropReq.cgroup); + mndTransSetDbName(pTrans, pSub->dbName, NULL); MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans)); MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans)); MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub)); From 3a94a22801f0611bec4c98120e0b7d5f74e64b98 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 5 Aug 2024 22:58:10 +0800 Subject: [PATCH 2/2] fix:[TD-31115] modify trans conflict level in tmq --- source/dnode/mnode/impl/src/mndConsumer.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 121dd3a09e..6116d2da19 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -565,13 +565,16 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { SCMSubscribeReq subscribe = {0}; MND_TMQ_RETURN_CHECK(tDeserializeSCMSubscribeReq(msgStr, &subscribe, pMsg->contLen)); - if(taosArrayGetSize(subscribe.topicNames) == 0){ + bool ubSubscribe = (taosArrayGetSize(subscribe.topicNames) == 0); + if(ubSubscribe){ SMqConsumerObj *pConsumerTmp = NULL; MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp)); mndReleaseConsumer(pMnode, pConsumerTmp); } MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames)); - pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pMsg, "subscribe"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, + (ubSubscribe ? TRN_CONFLICT_NOTHING :TRN_CONFLICT_DB_INSIDE), + pMsg, "subscribe"); MND_TMQ_NULL_CHECK(pTrans); MND_TMQ_RETURN_CHECK(validateTopics(pTrans, &subscribe, pMnode, pMsg->info.conn.user));