fix:[TS-4391] rebalance cnt always 1 if msg lost

This commit is contained in:
wangmm0220 2023-12-27 19:25:08 +08:00
parent 2ada7d455e
commit 10e86fc733
3 changed files with 25 additions and 8 deletions

View File

@ -47,6 +47,7 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer); int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer);
bool mndRebTryStart(); bool mndRebTryStart();
bool mndRebCanStart();
void mndRebEnd(); void mndRebEnd();
void mndRebCntInc(); void mndRebCntInc();
void mndRebCntDec(); void mndRebCntDec();

View File

@ -101,6 +101,16 @@ bool mndRebTryStart() {
return old == 0; return old == 0;
} }
bool mndRebCanStart() {
int32_t val = atomic_load_32(&mqRebInExecCnt);
if (val < 0) {
mError("rebalance trans end, rebalance counter:%d should not be less equalled than 0, ignore counter desc", val);
return false;
}
mInfo("tq timer, rebalance counter val:%d", val);
return val == 0;
}
void mndRebEnd() { mndRebCntDec(); } void mndRebEnd() { mndRebCntDec(); }
void mndRebCntInc() { void mndRebCntInc() {
@ -119,7 +129,7 @@ void mndRebCntDec() {
int32_t newVal = val - 1; int32_t newVal = val - 1;
int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal); int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
if (oldVal == val) { if (oldVal == val) {
mDebug("rebalance trans end, rebalance counter:%d", newVal); mInfo("rebalance trans end, rebalance counter:%d", newVal);
break; break;
} }
} }
@ -284,10 +294,10 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
SMqConsumerObj *pConsumer; SMqConsumerObj *pConsumer;
void *pIter = NULL; void *pIter = NULL;
mDebug("start to process mq timer"); mInfo("start to process mq timer");
// rebalance cannot be parallel // rebalance cannot be parallel
if (!mndRebTryStart()) { if (!mndRebCanStart()) {
mInfo("mq rebalance already in progress, do nothing"); mInfo("mq rebalance already in progress, do nothing");
return 0; return 0;
} }
@ -295,7 +305,6 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg)); SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
if (pRebMsg == NULL) { if (pRebMsg == NULL) {
mError("failed to create the rebalance msg, size:%d, quit mq timer", (int32_t)sizeof(SMqDoRebalanceMsg)); mError("failed to create the rebalance msg, size:%d, quit mq timer", (int32_t)sizeof(SMqDoRebalanceMsg));
mndRebEnd();
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -303,7 +312,6 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
if (pRebMsg->rebSubHash == NULL) { if (pRebMsg->rebSubHash == NULL) {
mError("failed to create rebalance hashmap"); mError("failed to create rebalance hashmap");
rpcFreeCont(pRebMsg); rpcFreeCont(pRebMsg);
mndRebEnd();
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -390,6 +398,11 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
} }
if (newTopicNum == 0 && removedTopicNum == 0 && taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
}
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
} }
@ -397,7 +410,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
} }
if (taosHashGetSize(pRebMsg->rebSubHash) != 0) { if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
mInfo("mq rebalance will be triggered"); mInfo("mq send msg to rebalance");
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.msgType = TDMT_MND_TMQ_DO_REBALANCE, .msgType = TDMT_MND_TMQ_DO_REBALANCE,
.pCont = pRebMsg, .pCont = pRebMsg,
@ -407,8 +420,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
} else { } else {
taosHashCleanup(pRebMsg->rebSubHash); taosHashCleanup(pRebMsg->rebSubHash);
rpcFreeCont(pRebMsg); rpcFreeCont(pRebMsg);
mDebug("mq timer finished, no need to re-balance"); mInfo("mq timer finished, no need to re-balance");
mndRebEnd();
} }
return 0; return 0;
} }

View File

@ -732,6 +732,10 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
SMqDoRebalanceMsg *pReq = pMsg->pCont; SMqDoRebalanceMsg *pReq = pMsg->pCont;
void *pIter = NULL; void *pIter = NULL;
// bool rebalanceOnce = false; // to ensure only once. // bool rebalanceOnce = false; // to ensure only once.
if (!mndRebTryStart()) {
mInfo("mq rebalance already in progress, do nothing");
return 0;
}
mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(pReq->rebSubHash)); mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(pReq->rebSubHash));