fix:[TS-4391] rebalance cnt always 1 if msg lost

This commit is contained in:
wangmm0220 2023-12-29 13:57:13 +08:00
parent 10e86fc733
commit b58a23df49
4 changed files with 151 additions and 214 deletions

View File

@ -2544,12 +2544,6 @@ _err:
return NULL; return NULL;
} }
// this message is sent from mnode to mnode(read thread to write thread),
// so there is no need for serialization or deserialization
typedef struct {
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
} SMqDoRebalanceMsg;
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
int64_t checkpointId; int64_t checkpointId;

View File

@ -46,9 +46,10 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer); int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer);
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer); int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer);
bool mndRebTryStart(); const char *mndConsumerStatusName(int status);
bool mndRebCanStart(); bool mndRebCanStart();
void mndRebEnd(); bool mndRebTryStart();
void mndRebCntInc(); void mndRebCntInc();
void mndRebCntDec(); void mndRebCntDec();

View File

@ -29,13 +29,9 @@
#define MND_CONSUMER_RESERVE_SIZE 64 #define MND_CONSUMER_RESERVE_SIZE 64
#define MND_MAX_GROUP_PER_TOPIC 100 #define MND_MAX_GROUP_PER_TOPIC 100
#define MND_CONSUMER_LOST_HB_CNT 6
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
static int32_t mqRebInExecCnt = 0; static int32_t mqRebInExecCnt = 0;
static const char *mndConsumerStatusName(int status);
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer); static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer); static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer); static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer);
@ -45,7 +41,6 @@ static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg); static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg); static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg); static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg); static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg); static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
@ -63,7 +58,7 @@ int32_t mndInitConsumer(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg); // mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
@ -97,42 +92,18 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo*
bool mndRebTryStart() { bool mndRebTryStart() {
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1); int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
mDebug("tq timer, rebalance counter old val:%d", old); mInfo("rebalance counter old val:%d", old);
return old == 0; return old == 0;
} }
bool mndRebCanStart() {
int32_t val = atomic_load_32(&mqRebInExecCnt);
if (val < 0) {
mError("rebalance trans end, rebalance counter:%d should not be less equalled than 0, ignore counter desc", val);
return false;
}
mInfo("tq timer, rebalance counter val:%d", val);
return val == 0;
}
void mndRebEnd() { mndRebCntDec(); }
void mndRebCntInc() { void mndRebCntInc() {
int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1); int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
mInfo("rebalance trans start, rebalance counter:%d", val); mInfo("rebalance cnt inc, value:%d", val);
} }
void mndRebCntDec() { void mndRebCntDec() {
while (1) { int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
int32_t val = atomic_load_32(&mqRebInExecCnt); mInfo("rebalance cnt sub, value:%d", val);
if (val <= 0) {
mError("rebalance trans end, rebalance counter:%d should not be less equalled than 0, ignore counter desc", val);
break;
}
int32_t newVal = val - 1;
int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
if (oldVal == val) {
mInfo("rebalance trans end, rebalance counter:%d", newVal);
break;
}
}
} }
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, bool enableReplay) { static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, bool enableReplay) {
@ -282,149 +253,6 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
return pRebInfo; return pRebInfo;
} }
static void freeRebalanceItem(void *param) {
SMqRebInfo *pInfo = param;
taosArrayDestroy(pInfo->newConsumers);
taosArrayDestroy(pInfo->removedConsumers);
}
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
SSdb *pSdb = pMnode->pSdb;
SMqConsumerObj *pConsumer;
void *pIter = NULL;
mInfo("start to process mq timer");
// rebalance cannot be parallel
if (!mndRebCanStart()) {
mInfo("mq rebalance already in progress, do nothing");
return 0;
}
SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
if (pRebMsg == NULL) {
mError("failed to create the rebalance msg, size:%d, quit mq timer", (int32_t)sizeof(SMqDoRebalanceMsg));
return TSDB_CODE_OUT_OF_MEMORY;
}
pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
if (pRebMsg->rebSubHash == NULL) {
mError("failed to create rebalance hashmap");
rpcFreeCont(pRebMsg);
return TSDB_CODE_OUT_OF_MEMORY;
}
taosHashSetFreeFp(pRebMsg->rebSubHash, freeRebalanceItem);
// iterate all consumers, find all modification
while (1) {
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) {
break;
}
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
int32_t status = atomic_load_32(&pConsumer->status);
mInfo("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
hbStatus);
if (status == MQ_CONSUMER_STATUS_READY) {
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
} else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
taosRLockLatch(&pConsumer->lock);
int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
for (int32_t i = 0; i < topicNum; i++) {
char key[TSDB_SUBSCRIBE_KEY_LEN];
char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
}
taosRUnLockLatch(&pConsumer->lock);
}else{
int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
for (int32_t i = 0; i < newTopicNum; i++) {
char * topic = taosArrayGetP(pConsumer->currentTopics, i);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
if (pSub == NULL) {
continue;
}
taosRLockLatch(&pSub->lock);
// 2.2 iterate all vg assigned to the consumer of that topic
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
for (int32_t j = 0; j < vgNum; j++) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
SVgObj * pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
if (!pVgroup) {
char key[TSDB_SUBSCRIBE_KEY_LEN];
mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
}
mndReleaseVgroup(pMnode, pVgroup);
}
taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
}
}
} else if (status == MQ_CONSUMER_STATUS_LOST) {
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
}
} else { // MQ_CONSUMER_STATUS_REBALANCE
taosRLockLatch(&pConsumer->lock);
int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
for (int32_t i = 0; i < newTopicNum; i++) {
char key[TSDB_SUBSCRIBE_KEY_LEN];
char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
}
int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics);
for (int32_t i = 0; i < removedTopicNum; i++) {
char key[TSDB_SUBSCRIBE_KEY_LEN];
char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
}
if (newTopicNum == 0 && removedTopicNum == 0 && taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
}
taosRUnLockLatch(&pConsumer->lock);
}
mndReleaseConsumer(pMnode, pConsumer);
}
if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
mInfo("mq send msg to rebalance");
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_TMQ_DO_REBALANCE,
.pCont = pRebMsg,
.contLen = sizeof(SMqDoRebalanceMsg),
};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
} else {
taosHashCleanup(pRebMsg->rebSubHash);
rpcFreeCont(pRebMsg);
mInfo("mq timer finished, no need to re-balance");
}
return 0;
}
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
@ -1263,7 +1091,7 @@ static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
} }
static const char *mndConsumerStatusName(int status) { const char *mndConsumerStatusName(int status) {
switch (status) { switch (status) {
case MQ_CONSUMER_STATUS_READY: case MQ_CONSUMER_STATUS_READY:
return "ready"; return "ready";

View File

@ -27,7 +27,8 @@
#define MND_SUBSCRIBE_VER_NUMBER 2 #define MND_SUBSCRIBE_VER_NUMBER 2
#define MND_SUBSCRIBE_RESERVE_SIZE 64 #define MND_SUBSCRIBE_RESERVE_SIZE 64
#define MND_SUBSCRIBE_REBALANCE_CNT 3 #define MND_CONSUMER_LOST_HB_CNT 6
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *); static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw); static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
@ -38,14 +39,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg);
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg); static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg);
static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter); static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj* hash);
static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
}
static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
SSdbRaw *pCommitRaw = mndSubActionEncode(pSub); SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
@ -68,7 +62,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DO_REBALANCE, mndProcessRebalanceReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessRebalanceReq);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
@ -727,21 +721,140 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
return 0; return 0;
} }
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { static void freeRebalanceItem(void *param) {
SMnode *pMnode = pMsg->info.node; SMqRebInfo *pInfo = param;
SMqDoRebalanceMsg *pReq = pMsg->pCont; taosArrayDestroy(pInfo->newConsumers);
void *pIter = NULL; taosArrayDestroy(pInfo->removedConsumers);
// bool rebalanceOnce = false; // to ensure only once. }
if (!mndRebTryStart()) {
mInfo("mq rebalance already in progress, do nothing"); static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) {
return 0; SMnode *pMnode = pMsg->info.node;
SSdb *pSdb = pMnode->pSdb;
SMqConsumerObj *pConsumer;
void *pIter = NULL;
mInfo("start to process mq timer");
// iterate all consumers, find all modification
while (1) {
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) {
break;
}
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
int32_t status = atomic_load_32(&pConsumer->status);
mInfo("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
hbStatus);
if (status == MQ_CONSUMER_STATUS_READY) {
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
} else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
taosRLockLatch(&pConsumer->lock);
int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
for (int32_t i = 0; i < topicNum; i++) {
char key[TSDB_SUBSCRIBE_KEY_LEN];
char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
SMqRebInfo *pRebSub = mndGetOrCreateRebSub(rebSubHash, key);
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
}
taosRUnLockLatch(&pConsumer->lock);
}else{
int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
for (int32_t i = 0; i < newTopicNum; i++) {
char * topic = taosArrayGetP(pConsumer->currentTopics, i);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
if (pSub == NULL) {
continue;
}
taosRLockLatch(&pSub->lock);
// 2.2 iterate all vg assigned to the consumer of that topic
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
for (int32_t j = 0; j < vgNum; j++) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
SVgObj * pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
if (!pVgroup) {
char key[TSDB_SUBSCRIBE_KEY_LEN];
mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
mndGetOrCreateRebSub(rebSubHash, key);
mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
}
mndReleaseVgroup(pMnode, pVgroup);
}
taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
}
}
} else if (status == MQ_CONSUMER_STATUS_LOST) {
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
}
} else {
taosRLockLatch(&pConsumer->lock);
int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
for (int32_t i = 0; i < newTopicNum; i++) {
char key[TSDB_SUBSCRIBE_KEY_LEN];
char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
SMqRebInfo *pRebSub = mndGetOrCreateRebSub(rebSubHash, key);
taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
}
int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics);
for (int32_t i = 0; i < removedTopicNum; i++) {
char key[TSDB_SUBSCRIBE_KEY_LEN];
char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
SMqRebInfo *pRebSub = mndGetOrCreateRebSub(rebSubHash, key);
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
}
if (newTopicNum == 0 && removedTopicNum == 0 && taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
}
taosRUnLockLatch(&pConsumer->lock);
}
mndReleaseConsumer(pMnode, pConsumer);
} }
mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(pReq->rebSubHash)); return 0;
}
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
int code = 0;
if (!mndRebTryStart()) {
mInfo("mq rebalance already in progress, do nothing");
return code;
}
SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
if (rebSubHash == NULL) {
mError("failed to create rebalance hashmap");
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto END;
}
taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
mndCheckConsumer(pMsg, rebSubHash);
mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash));
// here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction. // here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction.
void *pIter = NULL;
SMnode *pMnode = pMsg->info.node;
while (1) { while (1) {
pIter = taosHashIterate(pReq->rebSubHash, pIter); pIter = taosHashIterate(rebSubHash, pIter);
if (pIter == NULL) { if (pIter == NULL) {
break; break;
} }
@ -760,12 +873,11 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
taosArrayDestroy(rebOutput.modifyConsumers); taosArrayDestroy(rebOutput.modifyConsumers);
taosArrayDestroy(rebOutput.rebVgs); taosArrayDestroy(rebOutput.rebVgs);
taosHashCancelIterate(pReq->rebSubHash, pIter); taosHashCancelIterate(rebSubHash, pIter);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mInfo("mq re-balance failed, due to out of memory"); mError("mq re-balance failed, due to out of memory");
taosHashCleanup(pReq->rebSubHash); code = -1;
mndRebEnd(); goto END;
return -1;
} }
SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter; SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter;
@ -833,10 +945,12 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
// reset flag // reset flag
mInfo("mq re-balance completed successfully"); mInfo("mq re-balance completed successfully");
taosHashCleanup(pReq->rebSubHash);
mndRebEnd();
return 0; END:
taosHashCleanup(rebSubHash);
mndRebCntDec();
return code;
} }
static int32_t sendDeleteSubToVnode(SMqSubscribeObj *pSub, STrans *pTrans){ static int32_t sendDeleteSubToVnode(SMqSubscribeObj *pSub, STrans *pTrans){