opti:tmq logic

This commit is contained in:
wangmm0220 2024-03-13 19:36:00 +08:00
parent 4dfc531e6c
commit d0bdb0ce2b
3 changed files with 62 additions and 140 deletions

View File

@ -641,12 +641,6 @@ static SMqConsumerObj* buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscri
terrno = code; terrno = code;
goto _over; goto _over;
} }
// no topics need to be rebalanced
if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 &&
taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
goto _over;
}
} }
return pConsumerNew; return pConsumerNew;
@ -821,48 +815,17 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
} }
} }
// remove from new topic // remove from topic list
static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { static void removeFromTopicList(SArray *topicList, const char *pTopic, int64_t consumerId, char *type) {
int32_t size = taosArrayGetSize(pConsumer->rebNewTopics); int32_t size = taosArrayGetSize(topicList);
for (int32_t i = 0; i < size; i++) { for (int32_t i = 0; i < size; i++) {
char *p = taosArrayGetP(pConsumer->rebNewTopics, i); char *p = taosArrayGetP(topicList, i);
if (strcmp(pTopic, p) == 0) { if (strcmp(pTopic, p) == 0) {
taosArrayRemove(pConsumer->rebNewTopics, i); taosArrayRemove(topicList, i);
taosMemoryFree(p); taosMemoryFree(p);
mInfo("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId, mInfo("[rebalance] consumer:0x%" PRIx64 " remove topic:%s in the %s topic list, remain newTopics:%d",
pTopic, (int)taosArrayGetSize(pConsumer->rebNewTopics)); consumerId, pTopic, type, (int)taosArrayGetSize(topicList));
break;
}
}
}
// remove from removed topic
static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
int32_t size = taosArrayGetSize(pConsumer->rebRemovedTopics);
for (int32_t i = 0; i < size; i++) {
char *p = taosArrayGetP(pConsumer->rebRemovedTopics, i);
if (strcmp(pTopic, p) == 0) {
taosArrayRemove(pConsumer->rebRemovedTopics, i);
taosMemoryFree(p);
mInfo("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d",
pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebRemovedTopics));
break;
}
}
}
static void removeFromCurrentTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
if (strcmp(pTopic, topic) == 0) {
taosArrayRemove(pConsumer->currentTopics, i);
taosMemoryFree(topic);
mInfo("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d",
pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->currentTopics));
break; break;
} }
} }
@ -896,46 +859,38 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer->subscribeTime = taosGetTimestampMs(); pOldConsumer->subscribeTime = taosGetTimestampMs();
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
mInfo("consumer:0x%" PRIx64 " sub update, modify existed consumer", pOldConsumer->consumerId); mInfo("consumer:0x%" PRIx64 " subscribe update, modify existed consumer", pOldConsumer->consumerId);
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) { } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) {
int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i)); char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
taosArrayPush(pOldConsumer->rebNewTopics, &topic); taosArrayPush(pOldConsumer->rebNewTopics, &topic);
} }
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
mInfo("consumer:0x%" PRIx64 " timer update, timer recover", pOldConsumer->consumerId); mInfo("consumer:0x%" PRIx64 " recover update", pOldConsumer->consumerId);
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) { } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
atomic_add_fetch_32(&pOldConsumer->epoch, 1); atomic_add_fetch_32(&pOldConsumer->epoch, 1);
pOldConsumer->rebalanceTime = taosGetTimestampMs(); pOldConsumer->rebalanceTime = taosGetTimestampMs();
mInfo("consumer:0x%" PRIx64 " reb update, only rebalance time", pOldConsumer->consumerId); mInfo("[rebalance] consumer:0x%" PRIx64 " rebalance update, only rebalance time", pOldConsumer->consumerId);
} else if (pNewConsumer->updateType == CONSUMER_ADD_REB) { } else if (pNewConsumer->updateType == CONSUMER_ADD_REB) {
char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0)); char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
removeFromTopicList(pOldConsumer->rebNewTopics, pNewTopic, pOldConsumer->consumerId, "new");
// check if exist in current topic
removeFromNewTopicList(pOldConsumer, pNewTopic);
// add to current topic
bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic); bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
if (existing) { if (existing) {
mError("consumer:0x%" PRIx64 "new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic); mError("[rebalance] consumer:0x%" PRIx64 " add new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic);
taosMemoryFree(pNewTopic); taosMemoryFree(pNewTopic);
} else { // added into current topic list } else {
taosArrayPush(pOldConsumer->currentTopics, &pNewTopic); taosArrayPush(pOldConsumer->currentTopics, &pNewTopic);
taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString); taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
} }
// set status
int32_t status = pOldConsumer->status; int32_t status = pOldConsumer->status;
updateConsumerStatus(pOldConsumer); updateConsumerStatus(pOldConsumer);
// the re-balance is triggered when the new consumer is launched.
pOldConsumer->rebalanceTime = taosGetTimestampMs(); pOldConsumer->rebalanceTime = taosGetTimestampMs();
atomic_add_fetch_32(&pOldConsumer->epoch, 1); atomic_add_fetch_32(&pOldConsumer->epoch, 1);
mInfo("consumer:0x%" PRIx64 " reb update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
mInfo("[rebalance] consumer:0x%" PRIx64 " rebalance update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
", current topics:%d, newTopics:%d, removeTopics:%d", ", current topics:%d, newTopics:%d, removeTopics:%d",
pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status, pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime, mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
@ -943,22 +898,16 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
(int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
} else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) { } else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) {
char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0); char *topic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
removeFromTopicList(pOldConsumer->rebRemovedTopics, topic, pOldConsumer->consumerId, "remove");
removeFromTopicList(pOldConsumer->currentTopics, topic, pOldConsumer->consumerId, "current");
// remove from removed topic
removeFromRemoveTopicList(pOldConsumer, removedTopic);
// remove from current topic
removeFromCurrentTopicList(pOldConsumer, removedTopic);
// set status
int32_t status = pOldConsumer->status; int32_t status = pOldConsumer->status;
updateConsumerStatus(pOldConsumer); updateConsumerStatus(pOldConsumer);
pOldConsumer->rebalanceTime = taosGetTimestampMs(); pOldConsumer->rebalanceTime = taosGetTimestampMs();
atomic_add_fetch_32(&pOldConsumer->epoch, 1); atomic_add_fetch_32(&pOldConsumer->epoch, 1);
mInfo("consumer:0x%" PRIx64 " reb update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 mInfo("[rebalance]consumer:0x%" PRIx64 " rebalance update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
", current topics:%d, newTopics:%d, removeTopics:%d", ", current topics:%d, newTopics:%d, removeTopics:%d",
pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status, pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime, mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,

View File

@ -302,7 +302,7 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char *cgroup, int8_t upda
pConsumer->assignedTopics = subscribe->topicNames; pConsumer->assignedTopics = subscribe->topicNames;
subscribe->topicNames = NULL; subscribe->topicNames = NULL;
}else if (updateType == CONSUMER_UPDATE_SUB){ }else if (updateType == CONSUMER_UPDATE_SUB){
pConsumer->assignedTopics = subscribe->topicNames;; pConsumer->assignedTopics = subscribe->topicNames;
subscribe->topicNames = NULL; subscribe->topicNames = NULL;
} }

View File

@ -223,6 +223,14 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
return pRebInfo; return pRebInfo;
} }
static void pushVgDataToHash(SArray *vgs, SHashObj *pHash, int64_t consumerId, char* key){
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(vgs);
SMqRebOutputVg outputVg = {consumerId, -1, pVgEp};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("[rebalance] sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64,
key, pVgEp->vgId, consumerId);
}
static void processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) { static void processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers); int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
int32_t actualRemoved = 0; int32_t actualRemoved = 0;
@ -235,10 +243,7 @@ static void processRemovedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, c
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
for (int32_t j = 0; j < consumerVgNum; j++) { for (int32_t j = 0; j < consumerVgNum; j++) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); pushVgDataToHash(pConsumerEp->vgs, pHash, consumerId, pOutput->pSub->key);
SMqRebOutputVg outputVg = {.oldConsumerId = consumerId, .newConsumerId = -1, .pVgEp = pVgEp};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("[rebalance] sub:%s mq re-balance remove vgId:%d from consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId, consumerId);
} }
taosArrayDestroy(pConsumerEp->vgs); taosArrayDestroy(pConsumerEp->vgs);
@ -273,15 +278,7 @@ static void processNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *
static void processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) { static void processUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs); int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
for (int32_t i = 0; i < numOfVgroups; i++) { for (int32_t i = 0; i < numOfVgroups; i++) {
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs); pushVgDataToHash(pOutput->pSub->unassignedVgs, pHash, -1, pOutput->pSub->key);
SMqRebOutputVg rebOutput = {
.oldConsumerId = -1,
.newConsumerId = -1,
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
mInfo("[rebalance] sub:%s mq re-balance processUnassignedVgroups vgId:%d from unassigned", pOutput->pSub->key, pVgEp->vgId);
} }
} }
@ -314,31 +311,13 @@ static void processModifiedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash,
taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId); taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId);
if (consumerVgNum > minVgCnt) { if (consumerVgNum > minVgCnt) {
if (cnt < remainderVgCnt) { if (cnt < remainderVgCnt) {
// pop until equal minVg + 1 while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) { // pop until equal minVg + 1
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) { pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key);
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
SMqRebOutputVg outputVg = {
.oldConsumerId = pConsumerEp->consumerId,
.newConsumerId = -1,
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("[rebalance] sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64,
pOutput->pSub->key, pVgEp->vgId, pConsumerEp->consumerId);
} }
cnt++; cnt++;
} else { } else {
// all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) { while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs); pushVgDataToHash(pConsumerEp->vgs, pHash, pConsumerEp->consumerId, pOutput->pSub->key);
SMqRebOutputVg outputVg = {
.oldConsumerId = pConsumerEp->consumerId,
.newConsumerId = -1,
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("[rebalance] sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64,
pOutput->pSub->key, pVgEp->vgId, pConsumerEp->consumerId);
} }
} }
} }
@ -500,7 +479,7 @@ static void calcVgroupsCnt(const SMqRebInputObj *pInput, int32_t totalVgNum, con
static void assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt){ static void assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt){
SMqRebOutputVg *pRebVg = NULL; SMqRebOutputVg *pRebVg = NULL;
void *pRemovedIter = NULL; void *pAssignIter = NULL;
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
@ -510,13 +489,13 @@ static void assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t min
} }
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) { while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
pRemovedIter = taosHashIterate(pHash, pRemovedIter); pAssignIter = taosHashIterate(pHash, pAssignIter);
if (pRemovedIter == NULL) { if (pAssignIter == NULL) {
mError("[rebalance] sub:%s removed iter is null, never can reach hear", pOutput->pSub->key); mError("[rebalance] sub:%s assign iter is NULL, never should reach here", pOutput->pSub->key);
break; break;
} }
pRebVg = (SMqRebOutputVg *)pRemovedIter; pRebVg = (SMqRebOutputVg *)pAssignIter;
pRebVg->newConsumerId = pConsumerEp->consumerId; pRebVg->newConsumerId = pConsumerEp->consumerId;
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average",
@ -531,13 +510,13 @@ static void assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t min
} }
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) { if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
pRemovedIter = taosHashIterate(pHash, pRemovedIter); pAssignIter = taosHashIterate(pHash, pAssignIter);
if (pRemovedIter == NULL) { if (pAssignIter == NULL) {
mInfo("[rebalance] sub:%s removed iter is null", pOutput->pSub->key); mInfo("[rebalance] sub:%s assign iter is used up", pOutput->pSub->key);
break; break;
} }
pRebVg = (SMqRebOutputVg *)pRemovedIter; pRebVg = (SMqRebOutputVg *)pAssignIter;
pRebVg->newConsumerId = pConsumerEp->consumerId; pRebVg->newConsumerId = pConsumerEp->consumerId;
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", mInfo("[rebalance] mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1",
@ -545,16 +524,17 @@ static void assignVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t min
} }
} }
if(pRemovedIter != NULL){ taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
mError("[rebalance]sub:%s error pRemovedIter should be NULL", pOutput->pSub->key); if(pAssignIter != NULL){
mError("[rebalance]sub:%s assign iter is not NULL, never should reach here", pOutput->pSub->key);
} }
while (1) { while (1) {
pRemovedIter = taosHashIterate(pHash, pRemovedIter); pAssignIter = taosHashIterate(pHash, pAssignIter);
if (pRemovedIter == NULL) { if (pAssignIter == NULL) {
break; break;
} }
SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pRemovedIter; SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pAssignIter;
taosArrayPush(pOutput->rebVgs, pRebOutput); taosArrayPush(pOutput->rebVgs, pRebOutput);
if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); // put all vg into unassigned taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); // put all vg into unassigned
@ -678,10 +658,6 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
// 5. execution // 5. execution
code = mndTransPrepare(pMnode, pTrans); code = mndTransPrepare(pMnode, pTrans);
if (code != 0) {
mError("failed to prepare trans rebalance since %s", terrstr());
goto END;
}
END: END:
nodesDestroyNode((SNode*)pPlan); nodesDestroyNode((SNode*)pPlan);
@ -757,24 +733,23 @@ static void mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) {
int32_t status = atomic_load_32(&pConsumer->status); int32_t status = atomic_load_32(&pConsumer->status);
mDebug("[rebalance] check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d", mDebug("[rebalance] check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime, pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime, hbStatus);
hbStatus);
if ((taosArrayGetSize(pConsumer->assignedTopics) == 0) ||
(status == MQ_CONSUMER_STATUS_LOST &&
hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD)){ // unsubscribe or close
mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info);
}
if (status == MQ_CONSUMER_STATUS_READY) { if (status == MQ_CONSUMER_STATUS_READY) {
if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { if (taosArrayGetSize(pConsumer->assignedTopics) == 0) {// unsubscribe or close
mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info);
} else if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
taosRLockLatch(&pConsumer->lock); taosRLockLatch(&pConsumer->lock);
buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer->cgroup, pConsumer->consumerId); buildRebInfo(rebSubHash, pConsumer->currentTopics, 0, pConsumer->cgroup, pConsumer->consumerId);
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
}else{ }else{
checkForVgroupSplit(pMnode, pConsumer, rebSubHash); checkForVgroupSplit(pMnode, pConsumer, rebSubHash);
} }
} else if (status != MQ_CONSUMER_STATUS_LOST) { } else if (status == MQ_CONSUMER_STATUS_LOST) {
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { // clear consumer if lost a day
mndSendConsumerMsg(pMnode, pConsumer->consumerId, TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, &pMsg->info);
}
} else {
taosRLockLatch(&pConsumer->lock); taosRLockLatch(&pConsumer->lock);
buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId); buildRebInfo(rebSubHash, pConsumer->rebNewTopics, 1, pConsumer->cgroup, pConsumer->consumerId);
buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId); buildRebInfo(rebSubHash, pConsumer->rebRemovedTopics, 0, pConsumer->cgroup, pConsumer->consumerId);
@ -842,6 +817,7 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu
taosRLockLatch(&pTopic->lock); taosRLockLatch(&pTopic->lock);
rebInput->oldConsumerNum = 0;
rebOutput->pSub = mndCreateSubscription(pMnode, pTopic, key); rebOutput->pSub = mndCreateSubscription(pMnode, pTopic, key);
if (rebOutput->pSub == NULL) { if (rebOutput->pSub == NULL) {
@ -855,7 +831,6 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu
taosRUnLockLatch(&pTopic->lock); taosRUnLockLatch(&pTopic->lock);
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
rebInput->oldConsumerNum = 0;
mInfo("[rebalance] sub topic:%s has no consumers sub yet", key); mInfo("[rebalance] sub topic:%s has no consumers sub yet", key);
} else { } else {
taosRLockLatch(&pSub->lock); taosRLockLatch(&pSub->lock);
@ -871,6 +846,8 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
int code = 0; int code = 0;
void *pIter = NULL;
SMnode *pMnode = pMsg->info.node;
mInfo("[rebalance] start to process mq timer"); mInfo("[rebalance] start to process mq timer");
if (!mndRebTryStart()) { if (!mndRebTryStart()) {
@ -883,14 +860,11 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto END; goto END;
} }
taosHashSetFreeFp(rebSubHash, freeRebalanceItem); taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
mndCheckConsumer(pMsg, rebSubHash); mndCheckConsumer(pMsg, rebSubHash);
mInfo("[rebalance] mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash)); mInfo("[rebalance] mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash));
void *pIter = NULL;
SMnode *pMnode = pMsg->info.node;
while (1) { while (1) {
pIter = taosHashIterate(rebSubHash, pIter); pIter = taosHashIterate(rebSubHash, pIter);
if (pIter == NULL) { if (pIter == NULL) {
@ -904,8 +878,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
goto END; goto END;
} }
SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter; rebInput.pRebInfo = (SMqRebInfo*)pIter;
rebInput.pRebInfo = pRebInfo;
if (buildRebOutput(pMnode, &rebInput, &rebOutput) != 0){ if (buildRebOutput(pMnode, &rebInput, &rebOutput) != 0){
continue; continue;