Merge pull request #27021 from taosdata/fix/TD-31115

fix:[TD-31115] add consumer check when drop topic or group in tmq
This commit is contained in:
dapan1121 2024-08-07 09:42:12 +08:00 committed by GitHub
commit b6a7049e75
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 62 additions and 71 deletions

View File

@ -30,6 +30,7 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic);
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
bool mndTopicExistsForDb(SMnode *pMnode, SDbObj *pDb);
void mndTopicGetShowName(const char* fullTopic, char* topic);
bool checkTopic(SArray *topics, char *topicName);
int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics);

View File

@ -1018,37 +1018,37 @@ END:
return code;
}
//static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) {
// void *pIter = NULL;
// SMqConsumerObj *pConsumer = NULL;
// int code = 0;
// while (1) {
// pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
// if (pIter == NULL) {
// break;
// }
//
// // drop consumer in lost status, other consumers not in lost status already deleted by rebalance
// if (pConsumer->status != MQ_CONSUMER_STATUS_LOST || strcmp(cgroup, pConsumer->cgroup) != 0) {
// sdbRelease(pMnode->pSdb, pConsumer);
// continue;
// }
// int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
// for (int32_t i = 0; i < sz; i++) {
// char *name = taosArrayGetP(pConsumer->assignedTopics, i);
// if (name && strcmp(topic, name) == 0) {
// MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumer));
// }
// }
//
// sdbRelease(pMnode->pSdb, pConsumer);
// }
//
//END:
// sdbRelease(pMnode->pSdb, pConsumer);
// sdbCancelFetch(pMnode->pSdb, pIter);
// return code;
//}
static int32_t mndCheckConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic) {
void *pIter = NULL;
SMqConsumerObj *pConsumer = NULL;
int code = 0;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) {
break;
}
if (strcmp(cgroup, pConsumer->cgroup) != 0) {
sdbRelease(pMnode->pSdb, pConsumer);
continue;
}
bool found = checkTopic(pConsumer->assignedTopics, topic);
if (found){
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
topic, pConsumer->consumerId, pConsumer->cgroup);
code = TSDB_CODE_MND_CGROUP_USED;
goto END;
}
sdbRelease(pMnode->pSdb, pConsumer);
}
END:
sdbRelease(pMnode->pSdb, pConsumer);
sdbCancelFetch(pMnode->pSdb, pIter);
return code;
}
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
@ -1085,6 +1085,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
mndTransSetDbName(pTrans, pSub->dbName, NULL);
MND_TMQ_RETURN_CHECK(mndTransCheckConflict(pMnode, pTrans));
MND_TMQ_RETURN_CHECK(sendDeleteSubToVnode(pMnode, pSub, pTrans));
MND_TMQ_RETURN_CHECK(mndCheckConsumerByGroup(pMnode, pTrans, dropReq.cgroup, dropReq.topic));
MND_TMQ_RETURN_CHECK(mndSetDropSubCommitLogs(pMnode, pTrans, pSub));
MND_TMQ_RETURN_CHECK(mndTransPrepare(pMnode, pTrans));

View File

@ -602,7 +602,7 @@ END:
return code;
}
static bool checkTopic(SArray *topics, char *topicName){
bool checkTopic(SArray *topics, char *topicName){
int32_t sz = taosArrayGetSize(topics);
for (int32_t i = 0; i < sz; i++) {
char *name = taosArrayGetP(topics, i);
@ -613,44 +613,33 @@ static bool checkTopic(SArray *topics, char *topicName){
return false;
}
//static int32_t mndDropConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){
// int32_t code = 0;
// SSdb *pSdb = pMnode->pSdb;
// void *pIter = NULL;
// SMqConsumerObj *pConsumer = NULL;
// while (1) {
// pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
// if (pIter == NULL) {
// break;
// }
//
// bool found = checkTopic(pConsumer->assignedTopics, topicName);
// if (found){
// if (pConsumer->status == MQ_CONSUMER_STATUS_LOST) {
// MND_TMQ_RETURN_CHECK(mndSetConsumerDropLogs(pTrans, pConsumer));
// sdbRelease(pSdb, pConsumer);
// continue;
// }
// mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
// topicName, pConsumer->consumerId, pConsumer->cgroup);
// code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
// goto END;
// }
//
// if (checkTopic(pConsumer->rebNewTopics, topicName) || checkTopic(pConsumer->rebRemovedTopics, topicName)) {
// code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
// mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)",
// topicName, pConsumer->consumerId, pConsumer->cgroup);
// goto END;
// }
// sdbRelease(pSdb, pConsumer);
// }
//
//END:
// sdbRelease(pSdb, pConsumer);
// sdbCancelFetch(pSdb, pIter);
// return code;
//}
static int32_t mndCheckConsumerByTopic(SMnode *pMnode, STrans *pTrans, char *topicName){
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMqConsumerObj *pConsumer = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) {
break;
}
bool found = checkTopic(pConsumer->assignedTopics, topicName);
if (found){
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s",
topicName, pConsumer->consumerId, pConsumer->cgroup);
code = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
goto END;
}
sdbRelease(pSdb, pConsumer);
}
END:
sdbRelease(pSdb, pConsumer);
sdbCancelFetch(pSdb, pIter);
return code;
}
static int32_t mndDropCheckInfoByTopic(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic){
// broadcast to all vnode
@ -725,7 +714,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
MND_TMQ_RETURN_CHECK(mndCheckTopicPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_TOPIC, pTopic));
MND_TMQ_RETURN_CHECK(mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_READ_DB, pTopic->db));
// MND_TMQ_RETURN_CHECK(mndDropConsumerByTopic(pMnode, pTrans, dropReq.name));
MND_TMQ_RETURN_CHECK(mndCheckConsumerByTopic(pMnode, pTrans, dropReq.name));
MND_TMQ_RETURN_CHECK(mndDropSubByTopic(pMnode, pTrans, dropReq.name));
if (pTopic->ntbUid != 0) {