From c83989754f448500aa91e4c486454a57f46c2dd9 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 29 Dec 2023 14:09:49 +0800 Subject: [PATCH] fix:[TS-4391] rebalance cnt always 1 if msg lost --- source/dnode/mnode/impl/inc/mndConsumer.h | 5 ----- source/dnode/mnode/impl/inc/mndSubscribe.h | 9 ++++----- source/dnode/mnode/impl/src/mndConsumer.c | 18 ------------------ source/dnode/mnode/impl/src/mndSubscribe.c | 18 ++++++++++++++++++ source/dnode/mnode/impl/src/mndTrans.c | 2 +- 5 files changed, 23 insertions(+), 29 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index 59a22b76cd..8c89ddc825 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -48,11 +48,6 @@ int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *p const char *mndConsumerStatusName(int status); -bool mndRebCanStart(); -bool mndRebTryStart(); -void mndRebCntInc(); -void mndRebCntDec(); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndSubscribe.h b/source/dnode/mnode/impl/inc/mndSubscribe.h index 10864da5fb..23b3a7d1fe 100644 --- a/source/dnode/mnode/impl/inc/mndSubscribe.h +++ b/source/dnode/mnode/impl/inc/mndSubscribe.h @@ -32,14 +32,13 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub); int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName); -//static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, const char *topicName, int32_t vgId) { -// return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName); -//} - -//int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic); int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub); +bool mndRebTryStart(); +void mndRebCntInc(); +void mndRebCntDec(); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 5987f0ec34..cf8b9e019a 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -30,8 +30,6 @@ #define MND_MAX_GROUP_PER_TOPIC 100 -static int32_t mqRebInExecCnt = 0; - static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer); static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer); static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer); @@ -90,22 +88,6 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo* return; } -bool mndRebTryStart() { - int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1); - mInfo("rebalance counter old val:%d", old); - return old == 0; -} - -void mndRebCntInc() { - int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1); - mInfo("rebalance cnt inc, value:%d", val); -} - -void mndRebCntDec() { - int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1); - mInfo("rebalance cnt sub, value:%d", val); -} - static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, bool enableReplay) { SMqTopicObj *pTopic = NULL; int32_t code = 0; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index b2370374b7..b7958c1484 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -30,6 +30,8 @@ #define MND_CONSUMER_LOST_HB_CNT 6 #define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200 +static int32_t mqRebInExecCnt = 0; + static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *); static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw); static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *); @@ -830,6 +832,22 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) { return 0; } +bool mndRebTryStart() { + int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1); + mInfo("rebalance counter old val:%d", old); + return old == 0; +} + +void mndRebCntInc() { + int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1); + mInfo("rebalance cnt inc, value:%d", val); +} + +void mndRebCntDec() { + int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1); + mInfo("rebalance cnt sub, value:%d", val); +} + static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { int code = 0; if (!mndRebTryStart()) { diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 9e478f3aa5..99553fc57a 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -15,7 +15,7 @@ #define _DEFAULT_SOURCE #include "mndTrans.h" -#include "mndConsumer.h" +#include "mndSubscribe.h" #include "mndDb.h" #include "mndPrivilege.h" #include "mndShow.h"