From 10e86fc733a53b57c056c1a4e7bc61a3624cb2df Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 27 Dec 2023 19:25:08 +0800 Subject: [PATCH] fix:[TS-4391] rebalance cnt always 1 if msg lost --- source/dnode/mnode/impl/inc/mndConsumer.h | 1 + source/dnode/mnode/impl/src/mndConsumer.c | 28 +++++++++++++++------- source/dnode/mnode/impl/src/mndSubscribe.c | 4 ++++ 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index 94c937e8f4..f075510428 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -47,6 +47,7 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer); bool mndRebTryStart(); +bool mndRebCanStart(); void mndRebEnd(); void mndRebCntInc(); void mndRebCntDec(); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index c9ee66d3a0..11c929c898 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -101,6 +101,16 @@ bool mndRebTryStart() { return old == 0; } +bool mndRebCanStart() { + int32_t val = atomic_load_32(&mqRebInExecCnt); + if (val < 0) { + mError("rebalance trans end, rebalance counter:%d should not be less equalled than 0, ignore counter desc", val); + return false; + } + mInfo("tq timer, rebalance counter val:%d", val); + return val == 0; +} + void mndRebEnd() { mndRebCntDec(); } void mndRebCntInc() { @@ -119,7 +129,7 @@ void mndRebCntDec() { int32_t newVal = val - 1; int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal); if (oldVal == val) { - mDebug("rebalance trans end, rebalance counter:%d", newVal); + mInfo("rebalance trans end, rebalance counter:%d", newVal); break; } } @@ -284,10 +294,10 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { SMqConsumerObj *pConsumer; void *pIter = NULL; - mDebug("start to process mq timer"); + mInfo("start to process mq timer"); // rebalance cannot be parallel - if (!mndRebTryStart()) { + if (!mndRebCanStart()) { mInfo("mq rebalance already in progress, do nothing"); return 0; } @@ -295,7 +305,6 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg)); if (pRebMsg == NULL) { mError("failed to create the rebalance msg, size:%d, quit mq timer", (int32_t)sizeof(SMqDoRebalanceMsg)); - mndRebEnd(); return TSDB_CODE_OUT_OF_MEMORY; } @@ -303,7 +312,6 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { if (pRebMsg->rebSubHash == NULL) { mError("failed to create rebalance hashmap"); rpcFreeCont(pRebMsg); - mndRebEnd(); return TSDB_CODE_OUT_OF_MEMORY; } @@ -390,6 +398,11 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); } + + if (newTopicNum == 0 && removedTopicNum == 0 && taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close + mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info); + } + taosRUnLockLatch(&pConsumer->lock); } @@ -397,7 +410,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { } if (taosHashGetSize(pRebMsg->rebSubHash) != 0) { - mInfo("mq rebalance will be triggered"); + mInfo("mq send msg to rebalance"); SRpcMsg rpcMsg = { .msgType = TDMT_MND_TMQ_DO_REBALANCE, .pCont = pRebMsg, @@ -407,8 +420,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { } else { taosHashCleanup(pRebMsg->rebSubHash); rpcFreeCont(pRebMsg); - mDebug("mq timer finished, no need to re-balance"); - mndRebEnd(); + mInfo("mq timer finished, no need to re-balance"); } return 0; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 01d4d1029c..8cd7f6bc9c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -732,6 +732,10 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { SMqDoRebalanceMsg *pReq = pMsg->pCont; void *pIter = NULL; // bool rebalanceOnce = false; // to ensure only once. + if (!mndRebTryStart()) { + mInfo("mq rebalance already in progress, do nothing"); + return 0; + } mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(pReq->rebSubHash));