From 5a5bf29acb03d2dcb43d094d5406376ea521dbef Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 13 Mar 2024 23:50:42 +0800 Subject: [PATCH] opti:tmq logic --- source/dnode/mnode/impl/src/mndSubscribe.c | 1 + source/dnode/mnode/impl/src/mndTopic.c | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 19a39a9c33..e860c01c36 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -940,6 +940,7 @@ static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgro int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer); if (code != 0) { sdbRelease(pMnode->pSdb, pConsumer); + sdbCancelFetch(pMnode->pSdb, pIter); return code; } } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 82a3a3fee6..3bfab4a688 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -618,7 +618,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) { tNameFromString(&topicName, createTopicReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); //reuse this function for topic - auditRecord(pReq, pMnode->clusterId, "createTopic", dbname.dbname, topicName.dbname, + auditRecord(pReq, pMnode->clusterId, "createTopic", dbname.dbname, topicName.dbname, createTopicReq.sql, strlen(createTopicReq.sql)); _OVER: @@ -670,7 +670,6 @@ static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topi void *pIter = NULL; SMqConsumerObj *pConsumer = NULL; while (1) { - sdbRelease(pSdb, pConsumer); pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); if (pIter == NULL) { break; @@ -683,6 +682,7 @@ static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topi if (code != 0) { goto end; } + sdbRelease(pSdb, pConsumer); continue; } mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", @@ -697,7 +697,7 @@ static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topi topicName, pConsumer->consumerId, pConsumer->cgroup); goto end; } - + sdbRelease(pSdb, pConsumer); } end: