fix:[TS-4391] rebalance cnt always 1 if msg lost
This commit is contained in:
parent
b58a23df49
commit
c83989754f
|
@ -48,11 +48,6 @@ int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *p
|
||||||
|
|
||||||
const char *mndConsumerStatusName(int status);
|
const char *mndConsumerStatusName(int status);
|
||||||
|
|
||||||
bool mndRebCanStart();
|
|
||||||
bool mndRebTryStart();
|
|
||||||
void mndRebCntInc();
|
|
||||||
void mndRebCntDec();
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -32,14 +32,13 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
|
||||||
|
|
||||||
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
|
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
|
||||||
|
|
||||||
//static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, const char *topicName, int32_t vgId) {
|
|
||||||
// return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName);
|
|
||||||
//}
|
|
||||||
|
|
||||||
//int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
|
||||||
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
|
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
|
||||||
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub);
|
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub);
|
||||||
|
|
||||||
|
bool mndRebTryStart();
|
||||||
|
void mndRebCntInc();
|
||||||
|
void mndRebCntDec();
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -30,8 +30,6 @@
|
||||||
|
|
||||||
#define MND_MAX_GROUP_PER_TOPIC 100
|
#define MND_MAX_GROUP_PER_TOPIC 100
|
||||||
|
|
||||||
static int32_t mqRebInExecCnt = 0;
|
|
||||||
|
|
||||||
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
||||||
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
||||||
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
|
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
|
||||||
|
@ -90,22 +88,6 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo*
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool mndRebTryStart() {
|
|
||||||
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
|
|
||||||
mInfo("rebalance counter old val:%d", old);
|
|
||||||
return old == 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void mndRebCntInc() {
|
|
||||||
int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
|
|
||||||
mInfo("rebalance cnt inc, value:%d", val);
|
|
||||||
}
|
|
||||||
|
|
||||||
void mndRebCntDec() {
|
|
||||||
int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
|
|
||||||
mInfo("rebalance cnt sub, value:%d", val);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, bool enableReplay) {
|
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, bool enableReplay) {
|
||||||
SMqTopicObj *pTopic = NULL;
|
SMqTopicObj *pTopic = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
|
@ -30,6 +30,8 @@
|
||||||
#define MND_CONSUMER_LOST_HB_CNT 6
|
#define MND_CONSUMER_LOST_HB_CNT 6
|
||||||
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
|
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
|
||||||
|
|
||||||
|
static int32_t mqRebInExecCnt = 0;
|
||||||
|
|
||||||
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
|
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
|
||||||
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
|
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
|
||||||
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
|
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
|
||||||
|
@ -830,6 +832,22 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool mndRebTryStart() {
|
||||||
|
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
|
||||||
|
mInfo("rebalance counter old val:%d", old);
|
||||||
|
return old == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void mndRebCntInc() {
|
||||||
|
int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
|
||||||
|
mInfo("rebalance cnt inc, value:%d", val);
|
||||||
|
}
|
||||||
|
|
||||||
|
void mndRebCntDec() {
|
||||||
|
int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
|
||||||
|
mInfo("rebalance cnt sub, value:%d", val);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
||||||
int code = 0;
|
int code = 0;
|
||||||
if (!mndRebTryStart()) {
|
if (!mndRebTryStart()) {
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndConsumer.h"
|
#include "mndSubscribe.h"
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndPrivilege.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
|
|
Loading…
Reference in New Issue