From a49dc93bafe8132ac6e88bf714c0ac5e21e0a25a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 21 Jun 2024 14:18:04 +0800 Subject: [PATCH] fix:[TS-5067] check if consumer belong to this cgroup:topic where drop cgroup --- source/dnode/mnode/impl/src/mndSubscribe.c | 28 +++++++++++++++------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index ba9a4607cb..afeeeef924 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -856,9 +856,9 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu mInfo("[rebalance] sub topic:%s has no consumers sub yet", key); } else { taosRLockLatch(&pSub->lock); - rebOutput.pSub = tCloneSubscribeObj(pSub); - checkConsumer(pMnode, rebOutput.pSub); - rebInput.oldConsumerNum = taosHashGetSize(rebOutput.pSub->consumerHash); + rebOutput->pSub = tCloneSubscribeObj(pSub); + checkConsumer(pMnode, rebOutput->pSub); + rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash); taosRUnLockLatch(&pSub->lock); mInfo("[rebalance] sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum); @@ -979,14 +979,24 @@ static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgro break; } - if (strcmp(cgroup, pConsumer->cgroup) == 0 && taosArrayGetSize(pConsumer->currentTopics) == 0) { - int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer); - if (code != 0) { - sdbRelease(pMnode->pSdb, pConsumer); - sdbCancelFetch(pMnode->pSdb, pIter); - return code; + // drop consumer in lost status, other consumers not in lost status already deleted by rebalance + if (pConsumer->status != MQ_CONSUMER_STATUS_LOST || strcmp(cgroup, pConsumer->cgroup) != 0) { + sdbRelease(pMnode->pSdb, pConsumer); + continue; + } + int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); + for (int32_t i = 0; i < sz; i++) { + char *name = taosArrayGetP(pConsumer->assignedTopics, i); + if (strcmp(topic, name) == 0) { + int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer); + if (code != 0) { + sdbRelease(pMnode->pSdb, pConsumer); + sdbCancelFetch(pMnode->pSdb, pIter); + return code; + } } } + sdbRelease(pMnode->pSdb, pConsumer); } return 0;