From 555df05e482e3224c8198e591d972bc7b364209a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 6 Aug 2024 19:27:42 +0800 Subject: [PATCH] fix:[TD-31115] add consumer check when drop topic or group in tmq --- source/dnode/mnode/impl/inc/mndTopic.h | 1 + source/dnode/mnode/impl/src/mndSubscribe.c | 63 ++++++++++---------- source/dnode/mnode/impl/src/mndTopic.c | 69 +++++++++------------- 3 files changed, 62 insertions(+), 71 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndTopic.h b/source/dnode/mnode/impl/inc/mndTopic.h index dc6c137455..b56cbef2a1 100644 --- a/source/dnode/mnode/impl/inc/mndTopic.h +++ b/source/dnode/mnode/impl/inc/mndTopic.h @@ -30,6 +30,7 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic); int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); bool mndTopicExistsForDb(SMnode *pMnode, SDbObj *pDb); void mndTopicGetShowName(const char* fullTopic, char* topic); +bool checkTopic(SArray *topics, char *topicName); int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index ff374c0ef4..1131f2f2d5 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -1018,37 +1018,37 @@ END: return code; } -//static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) { -// void *pIter = NULL; -// SMqConsumerObj *pConsumer = NULL; -// int code = 0; -// while (1) { -// pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); -// if (pIter == NULL) { -// break; -// } -// -// // drop consumer in lost status, other consumers not in lost status already deleted by rebalance -// if (pConsumer->status != MQ_CONSUMER_STATUS_LOST || strcmp(cgroup, pConsumer->cgroup) != 0) { -// sdbRelease(pMnode->pSdb, pConsumer); -// continue; -// } -// int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); -// for (int32_t i = 0; i < sz; i++) { -// char *name = taosArrayGetP(pConsumer->assignedTopics, i); -// if (name && strcmp(topic, name) == 0) { -// MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumer)); -// } -// } -// -// sdbRelease(pMnode->pSdb, pConsumer); -// } -// -//END: -// sdbRelease(pMnode->pSdb, pConsumer); -// sdbCancelFetch(pMnode->pSdb, pIter); -// return code; -//} +static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) { + void *pIter = NULL; + SMqConsumerObj *pConsumer = NULL; + int code = 0; + while (1) { + pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); + if (pIter == NULL) { + break; + } + + if (strcmp(cgroup, pConsumer->cgroup) != 0) { + sdbRelease(pMnode->pSdb, pConsumer); + continue; + } + + bool found = checkTopic(pConsumer->assignedTopics, topic); + if (found){ + mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", + topic, pConsumer->consumerId, pConsumer->cgroup); + code = TSDB_CODE_MND_CGROUP_USED; + goto END; + } + + sdbRelease(pMnode->pSdb, pConsumer); + } + +END: + sdbRelease(pMnode->pSdb, pConsumer); + sdbCancelFetch(pMnode->pSdb, pIter); + return code; +} static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; @@ -1085,6 +1085,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { mndTransSetDbName(pTrans, pSub->dbName, NULL); MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans)); MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans)); + MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic)); MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub)); MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans)); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 46d5afc145..4bca13508e 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -602,7 +602,7 @@ END: return code; } -static bool checkTopic(SArray *topics, char *topicName){ +bool checkTopic(SArray *topics, char *topicName){ int32_t sz = taosArrayGetSize(topics); for (int32_t i = 0; i < sz; i++) { char *name = taosArrayGetP(topics, i); @@ -613,44 +613,33 @@ static bool checkTopic(SArray *topics, char *topicName){ return false; } -//static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){ -// int32_t code = 0; -// SSdb *pSdb = pMnode->pSdb; -// void *pIter = NULL; -// SMqConsumerObj *pConsumer = NULL; -// while (1) { -// pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); -// if (pIter == NULL) { -// break; -// } -// -// bool found = checkTopic(pConsumer->assignedTopics, topicName); -// if (found){ -// if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) { -// MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumer)); -// sdbRelease(pSdb, pConsumer); -// continue; -// } -// mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", -// topicName, pConsumer->consumerId, pConsumer->cgroup); -// code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; -// goto END; -// } -// -// if (checkTopic(pConsumer->rebNewTopics, topicName) || checkTopic(pConsumer->rebRemovedTopics, topicName)) { -// code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; -// mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)", -// topicName, pConsumer->consumerId, pConsumer->cgroup); -// goto END; -// } -// sdbRelease(pSdb, pConsumer); -// } -// -//END: -// sdbRelease(pSdb, pConsumer); -// sdbCancelFetch(pSdb, pIter); -// return code; -//} +static int32_t mndCheckConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){ + int32_t code = 0; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + SMqConsumerObj *pConsumer = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); + if (pIter == NULL) { + break; + } + + bool found = checkTopic(pConsumer->assignedTopics, topicName); + if (found){ + mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", + topicName, pConsumer->consumerId, pConsumer->cgroup); + code = TSDB_CODE_MND_TOPIC_SUBSCRIBED; + goto END; + } + + sdbRelease(pSdb, pConsumer); + } + +END: + sdbRelease(pSdb, pConsumer); + sdbCancelFetch(pSdb, pIter); + return code; +} static int32_t mndDropCheckInfoByTopic(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic){ // broadcast to all vnode @@ -725,7 +714,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic)); MND_TMQ_RETURN_CHECK(mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db)); -// MND_TMQ_RETURN_CHECK(mndDropConsumerByTopic(pMnode, pTrans, dropReq.name)); + MND_TMQ_RETURN_CHECK(mndCheckConsumerByTopic(pMnode, pTrans, dropReq.name)); MND_TMQ_RETURN_CHECK(mndDropSubByTopic(pMnode, pTrans, dropReq.name)); if (pTopic->ntbUid != 0) {