Merge pull request #24254 from taosdata/fix/TS-3491-3.0
fix:[TS-4391] rebalance cnt always 1 if msg lost
This commit is contained in:
commit
59642c674d
|
@ -2544,12 +2544,6 @@ _err:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
// this message is sent from mnode to mnode(read thread to write thread),
|
||||
// so there is no need for serialization or deserialization
|
||||
typedef struct {
|
||||
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
|
||||
} SMqDoRebalanceMsg;
|
||||
|
||||
typedef struct {
|
||||
int64_t streamId;
|
||||
int64_t checkpointId;
|
||||
|
|
|
@ -46,10 +46,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw);
|
|||
int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer);
|
||||
int32_t mndSetConsumerDropLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer);
|
||||
|
||||
bool mndRebTryStart();
|
||||
void mndRebEnd();
|
||||
void mndRebCntInc();
|
||||
void mndRebCntDec();
|
||||
const char *mndConsumerStatusName(int status);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -32,14 +32,13 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
|
|||
|
||||
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
|
||||
|
||||
//static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, const char *topicName, int32_t vgId) {
|
||||
// return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName);
|
||||
//}
|
||||
|
||||
//int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
|
||||
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub);
|
||||
|
||||
bool mndRebTryStart();
|
||||
void mndRebCntInc();
|
||||
void mndRebCntDec();
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -29,12 +29,6 @@
|
|||
#define MND_CONSUMER_RESERVE_SIZE 64
|
||||
|
||||
#define MND_MAX_GROUP_PER_TOPIC 100
|
||||
#define MND_CONSUMER_LOST_HB_CNT 6
|
||||
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
|
||||
|
||||
static int32_t mqRebInExecCnt = 0;
|
||||
|
||||
static const char *mndConsumerStatusName(int status);
|
||||
|
||||
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
||||
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
||||
|
@ -45,7 +39,6 @@ static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
|
|||
static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg);
|
||||
static int32_t mndProcessAskEpReq(SRpcMsg *pMsg);
|
||||
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg);
|
||||
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg);
|
||||
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg);
|
||||
static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg);
|
||||
|
||||
|
@ -63,7 +56,7 @@ int32_t mndInitConsumer(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_SUBSCRIBE, mndProcessSubscribeReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_HB, mndProcessMqHbReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_ASK_EP, mndProcessAskEpReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
|
||||
// mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessMqTimerMsg);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_CONSUMER_RECOVER, mndProcessConsumerRecoverMsg);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, mndProcessConsumerClearMsg);
|
||||
|
||||
|
@ -95,36 +88,6 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId, SRpcHandleInfo*
|
|||
return;
|
||||
}
|
||||
|
||||
bool mndRebTryStart() {
|
||||
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
|
||||
mDebug("tq timer, rebalance counter old val:%d", old);
|
||||
return old == 0;
|
||||
}
|
||||
|
||||
void mndRebEnd() { mndRebCntDec(); }
|
||||
|
||||
void mndRebCntInc() {
|
||||
int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
|
||||
mInfo("rebalance trans start, rebalance counter:%d", val);
|
||||
}
|
||||
|
||||
void mndRebCntDec() {
|
||||
while (1) {
|
||||
int32_t val = atomic_load_32(&mqRebInExecCnt);
|
||||
if (val <= 0) {
|
||||
mError("rebalance trans end, rebalance counter:%d should not be less equalled than 0, ignore counter desc", val);
|
||||
break;
|
||||
}
|
||||
|
||||
int32_t newVal = val - 1;
|
||||
int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
|
||||
if (oldVal == val) {
|
||||
mDebug("rebalance trans end, rebalance counter:%d", newVal);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser, bool enableReplay) {
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
int32_t code = 0;
|
||||
|
@ -257,162 +220,6 @@ FAIL:
|
|||
return -1;
|
||||
}
|
||||
|
||||
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
|
||||
SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
|
||||
if (pRebInfo == NULL) {
|
||||
pRebInfo = tNewSMqRebSubscribe(key);
|
||||
if (pRebInfo == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
|
||||
taosMemoryFree(pRebInfo);
|
||||
pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
|
||||
}
|
||||
return pRebInfo;
|
||||
}
|
||||
|
||||
static void freeRebalanceItem(void *param) {
|
||||
SMqRebInfo *pInfo = param;
|
||||
taosArrayDestroy(pInfo->newConsumers);
|
||||
taosArrayDestroy(pInfo->removedConsumers);
|
||||
}
|
||||
|
||||
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SMqConsumerObj *pConsumer;
|
||||
void *pIter = NULL;
|
||||
|
||||
mDebug("start to process mq timer");
|
||||
|
||||
// rebalance cannot be parallel
|
||||
if (!mndRebTryStart()) {
|
||||
mInfo("mq rebalance already in progress, do nothing");
|
||||
return 0;
|
||||
}
|
||||
|
||||
SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
|
||||
if (pRebMsg == NULL) {
|
||||
mError("failed to create the rebalance msg, size:%d, quit mq timer", (int32_t)sizeof(SMqDoRebalanceMsg));
|
||||
mndRebEnd();
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
|
||||
if (pRebMsg->rebSubHash == NULL) {
|
||||
mError("failed to create rebalance hashmap");
|
||||
rpcFreeCont(pRebMsg);
|
||||
mndRebEnd();
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
taosHashSetFreeFp(pRebMsg->rebSubHash, freeRebalanceItem);
|
||||
|
||||
// iterate all consumers, find all modification
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
|
||||
int32_t status = atomic_load_32(&pConsumer->status);
|
||||
|
||||
mInfo("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
|
||||
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
|
||||
hbStatus);
|
||||
|
||||
if (status == MQ_CONSUMER_STATUS_READY) {
|
||||
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close
|
||||
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
|
||||
} else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
|
||||
taosRLockLatch(&pConsumer->lock);
|
||||
int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
|
||||
for (int32_t i = 0; i < topicNum; i++) {
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
|
||||
mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
|
||||
SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
|
||||
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
|
||||
}
|
||||
taosRUnLockLatch(&pConsumer->lock);
|
||||
}else{
|
||||
int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
|
||||
for (int32_t i = 0; i < newTopicNum; i++) {
|
||||
char * topic = taosArrayGetP(pConsumer->currentTopics, i);
|
||||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
|
||||
if (pSub == NULL) {
|
||||
continue;
|
||||
}
|
||||
taosRLockLatch(&pSub->lock);
|
||||
|
||||
// 2.2 iterate all vg assigned to the consumer of that topic
|
||||
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
|
||||
int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
|
||||
|
||||
for (int32_t j = 0; j < vgNum; j++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
|
||||
SVgObj * pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
|
||||
if (!pVgroup) {
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
|
||||
mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
|
||||
mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
|
||||
}
|
||||
mndReleaseVgroup(pMnode, pVgroup);
|
||||
}
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
}
|
||||
}
|
||||
} else if (status == MQ_CONSUMER_STATUS_LOST) {
|
||||
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day
|
||||
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
|
||||
}
|
||||
} else { // MQ_CONSUMER_STATUS_REBALANCE
|
||||
taosRLockLatch(&pConsumer->lock);
|
||||
|
||||
int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
|
||||
for (int32_t i = 0; i < newTopicNum; i++) {
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
|
||||
mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
|
||||
SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
|
||||
taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
|
||||
}
|
||||
|
||||
int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics);
|
||||
for (int32_t i = 0; i < removedTopicNum; i++) {
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
|
||||
mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
|
||||
SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
|
||||
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
|
||||
}
|
||||
taosRUnLockLatch(&pConsumer->lock);
|
||||
}
|
||||
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
}
|
||||
|
||||
if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
|
||||
mInfo("mq rebalance will be triggered");
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = TDMT_MND_TMQ_DO_REBALANCE,
|
||||
.pCont = pRebMsg,
|
||||
.contLen = sizeof(SMqDoRebalanceMsg),
|
||||
};
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
} else {
|
||||
taosHashCleanup(pRebMsg->rebSubHash);
|
||||
rpcFreeCont(pRebMsg);
|
||||
mDebug("mq timer finished, no need to re-balance");
|
||||
mndRebEnd();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
|
@ -1251,7 +1058,7 @@ static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
|
|||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
|
||||
static const char *mndConsumerStatusName(int status) {
|
||||
const char *mndConsumerStatusName(int status) {
|
||||
switch (status) {
|
||||
case MQ_CONSUMER_STATUS_READY:
|
||||
return "ready";
|
||||
|
|
|
@ -27,7 +27,10 @@
|
|||
#define MND_SUBSCRIBE_VER_NUMBER 2
|
||||
#define MND_SUBSCRIBE_RESERVE_SIZE 64
|
||||
|
||||
#define MND_SUBSCRIBE_REBALANCE_CNT 3
|
||||
#define MND_CONSUMER_LOST_HB_CNT 6
|
||||
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
|
||||
|
||||
static int32_t mqRebInExecCnt = 0;
|
||||
|
||||
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
|
||||
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
|
||||
|
@ -38,14 +41,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg);
|
|||
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg);
|
||||
static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
|
||||
|
||||
static int32_t mndSetSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
|
||||
SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj* hash);
|
||||
|
||||
static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
|
||||
SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
|
||||
|
@ -68,7 +64,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
|
|||
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_TMQ_SUBSCRIBE_RSP, mndTransProcessRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_TMQ_DELETE_SUB_RSP, mndTransProcessRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DO_REBALANCE, mndProcessRebalanceReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_TIMER, mndProcessRebalanceReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP, mndProcessDropCgroupReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TMQ_DROP_CGROUP_RSP, mndTransProcessRsp);
|
||||
|
||||
|
@ -213,16 +209,18 @@ static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup,
|
|||
}
|
||||
|
||||
static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
|
||||
SMqRebInfo *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
|
||||
if (pRebSub == NULL) {
|
||||
pRebSub = tNewSMqRebSubscribe(key);
|
||||
if (pRebSub == NULL) {
|
||||
SMqRebInfo *pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
|
||||
if (pRebInfo == NULL) {
|
||||
pRebInfo = tNewSMqRebSubscribe(key);
|
||||
if (pRebInfo == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebInfo));
|
||||
taosHashPut(pHash, key, strlen(key) + 1, pRebInfo, sizeof(SMqRebInfo));
|
||||
taosMemoryFree(pRebInfo);
|
||||
pRebInfo = taosHashGet(pHash, key, strlen(key) + 1);
|
||||
}
|
||||
return pRebSub;
|
||||
return pRebInfo;
|
||||
}
|
||||
|
||||
static void doRemoveLostConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
|
||||
|
@ -727,17 +725,156 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
SMqDoRebalanceMsg *pReq = pMsg->pCont;
|
||||
void *pIter = NULL;
|
||||
// bool rebalanceOnce = false; // to ensure only once.
|
||||
static void freeRebalanceItem(void *param) {
|
||||
SMqRebInfo *pInfo = param;
|
||||
taosArrayDestroy(pInfo->newConsumers);
|
||||
taosArrayDestroy(pInfo->removedConsumers);
|
||||
}
|
||||
|
||||
mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(pReq->rebSubHash));
|
||||
static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SMqConsumerObj *pConsumer;
|
||||
void *pIter = NULL;
|
||||
|
||||
mInfo("start to process mq timer");
|
||||
|
||||
// iterate all consumers, find all modification
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
|
||||
int32_t status = atomic_load_32(&pConsumer->status);
|
||||
|
||||
mInfo("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
|
||||
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
|
||||
hbStatus);
|
||||
|
||||
if (status == MQ_CONSUMER_STATUS_READY) {
|
||||
if (taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close
|
||||
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
|
||||
} else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
|
||||
taosRLockLatch(&pConsumer->lock);
|
||||
int32_t topicNum = taosArrayGetSize(pConsumer->currentTopics);
|
||||
for (int32_t i = 0; i < topicNum; i++) {
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
|
||||
mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
|
||||
SMqRebInfo *pRebSub = mndGetOrCreateRebSub(rebSubHash, key);
|
||||
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
|
||||
}
|
||||
taosRUnLockLatch(&pConsumer->lock);
|
||||
}else{
|
||||
int32_t newTopicNum = taosArrayGetSize(pConsumer->currentTopics);
|
||||
for (int32_t i = 0; i < newTopicNum; i++) {
|
||||
char * topic = taosArrayGetP(pConsumer->currentTopics, i);
|
||||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
|
||||
if (pSub == NULL) {
|
||||
continue;
|
||||
}
|
||||
taosRLockLatch(&pSub->lock);
|
||||
|
||||
// 2.2 iterate all vg assigned to the consumer of that topic
|
||||
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &pConsumer->consumerId, sizeof(int64_t));
|
||||
int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
|
||||
|
||||
for (int32_t j = 0; j < vgNum; j++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
|
||||
SVgObj * pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId);
|
||||
if (!pVgroup) {
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
mndMakeSubscribeKey(key, pConsumer->cgroup, topic);
|
||||
mndGetOrCreateRebSub(rebSubHash, key);
|
||||
mInfo("vnode splitted, vgId:%d rebalance will be triggered", pVgEp->vgId);
|
||||
}
|
||||
mndReleaseVgroup(pMnode, pVgroup);
|
||||
}
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
}
|
||||
}
|
||||
} else if (status == MQ_CONSUMER_STATUS_LOST) {
|
||||
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day
|
||||
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
|
||||
}
|
||||
} else {
|
||||
taosRLockLatch(&pConsumer->lock);
|
||||
|
||||
int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
|
||||
for (int32_t i = 0; i < newTopicNum; i++) {
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
|
||||
mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
|
||||
SMqRebInfo *pRebSub = mndGetOrCreateRebSub(rebSubHash, key);
|
||||
taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
|
||||
}
|
||||
|
||||
int32_t removedTopicNum = taosArrayGetSize(pConsumer->rebRemovedTopics);
|
||||
for (int32_t i = 0; i < removedTopicNum; i++) {
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
|
||||
mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
|
||||
SMqRebInfo *pRebSub = mndGetOrCreateRebSub(rebSubHash, key);
|
||||
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
|
||||
}
|
||||
|
||||
if (newTopicNum == 0 && removedTopicNum == 0 && taosArrayGetSize(pConsumer->assignedTopics) == 0) { // unsubscribe or close
|
||||
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId, &pMsg->info);
|
||||
}
|
||||
|
||||
taosRUnLockLatch(&pConsumer->lock);
|
||||
}
|
||||
|
||||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool mndRebTryStart() {
|
||||
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
|
||||
mInfo("rebalance counter old val:%d", old);
|
||||
return old == 0;
|
||||
}
|
||||
|
||||
void mndRebCntInc() {
|
||||
int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
|
||||
mInfo("rebalance cnt inc, value:%d", val);
|
||||
}
|
||||
|
||||
void mndRebCntDec() {
|
||||
int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
|
||||
mInfo("rebalance cnt sub, value:%d", val);
|
||||
}
|
||||
|
||||
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
||||
int code = 0;
|
||||
if (!mndRebTryStart()) {
|
||||
mInfo("mq rebalance already in progress, do nothing");
|
||||
return code;
|
||||
}
|
||||
|
||||
SHashObj *rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
|
||||
if (rebSubHash == NULL) {
|
||||
mError("failed to create rebalance hashmap");
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
|
||||
|
||||
mndCheckConsumer(pMsg, rebSubHash);
|
||||
mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash));
|
||||
|
||||
// here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction.
|
||||
void *pIter = NULL;
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pReq->rebSubHash, pIter);
|
||||
pIter = taosHashIterate(rebSubHash, pIter);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
@ -756,12 +893,11 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
|||
taosArrayDestroy(rebOutput.modifyConsumers);
|
||||
taosArrayDestroy(rebOutput.rebVgs);
|
||||
|
||||
taosHashCancelIterate(pReq->rebSubHash, pIter);
|
||||
taosHashCancelIterate(rebSubHash, pIter);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mInfo("mq re-balance failed, due to out of memory");
|
||||
taosHashCleanup(pReq->rebSubHash);
|
||||
mndRebEnd();
|
||||
return -1;
|
||||
mError("mq re-balance failed, due to out of memory");
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter;
|
||||
|
@ -829,10 +965,12 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
|||
|
||||
// reset flag
|
||||
mInfo("mq re-balance completed successfully");
|
||||
taosHashCleanup(pReq->rebSubHash);
|
||||
mndRebEnd();
|
||||
|
||||
return 0;
|
||||
END:
|
||||
taosHashCleanup(rebSubHash);
|
||||
mndRebCntDec();
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t sendDeleteSubToVnode(SMqSubscribeObj *pSub, STrans *pTrans){
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mndTrans.h"
|
||||
#include "mndConsumer.h"
|
||||
#include "mndSubscribe.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndPrivilege.h"
|
||||
#include "mndShow.h"
|
||||
|
|
Loading…
Reference in New Issue