opti:the logic of mndDoRebalance for clear
This commit is contained in:
parent
02ec67ad9e
commit
584cdf041c
|
@ -339,13 +339,13 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
mInfo("sub:%s mq re-balance %d vgroups, existed consumers:%d, added:%d, removed:%d", pSubKey, totalVgNum,
|
||||
pInput->oldConsumerNum, numOfAdded, numOfRemoved);
|
||||
|
||||
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
|
||||
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store vg that need to be assigned
|
||||
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
|
||||
// 2. check and get actual removed consumers, put their vg into hash
|
||||
// 2. check and get actual removed consumers, put their vg into pHash
|
||||
doRemoveExistedConsumers(pOutput, pHash, pInput);
|
||||
|
||||
// 3. if previously no consumer, there are vgs not assigned
|
||||
// 3. if previously no consumer, there are vgs not assigned, put these vg into pHash
|
||||
addUnassignedVgroups(pOutput, pHash);
|
||||
|
||||
// 4. calc vg number of each consumer
|
||||
|
@ -364,19 +364,17 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
mInfo("sub:%s no consumer subscribe this topic", pSubKey);
|
||||
}
|
||||
|
||||
// 5. first scan: remove vgroups from te consumers, who have more vgroups than the threashold value that is
|
||||
// minVgCnt, and then put them into the recycled hash list
|
||||
// 5. remove vgroups from consumers who have more vgroups than the threshold value(minVgCnt or minVgCnt + 1), and then another vg into pHash
|
||||
transferVgroupsForConsumers(pOutput, pHash, minVgCnt, imbConsumerNum);
|
||||
|
||||
// 6. add new consumer into sub
|
||||
doAddNewConsumers(pOutput, pInput);
|
||||
|
||||
// 7. second scan: find consumer do not have enough vgroups, extract from temporary hash and assign to them
|
||||
// All related vg should be put into rebVgs
|
||||
SMqRebOutputVg *pRebVg = NULL;
|
||||
void *pRemovedIter = NULL;
|
||||
void *pIter = NULL;
|
||||
|
||||
// 7. extract bgroups from pHash and assign to consumers that do not have enough vgroups
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) {
|
||||
|
@ -390,68 +388,55 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
// iter hash and find one vg
|
||||
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
|
||||
if (pRemovedIter == NULL) {
|
||||
mError("sub:%s removed iter is null", pSubKey);
|
||||
mError("sub:%s removed iter is null, never can reach hear", pSubKey);
|
||||
break;
|
||||
}
|
||||
|
||||
pRebVg = (SMqRebOutputVg *)pRemovedIter;
|
||||
// push
|
||||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
||||
taosArrayPush(pOutput->rebVgs, pRebVg);
|
||||
mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " (second scan) (not enough)", pRebVg->pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||
mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
|
||||
}
|
||||
}
|
||||
|
||||
// 7. handle unassigned vg
|
||||
if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) {
|
||||
// if has consumer, assign all left vg
|
||||
if(imbConsumerNum != 0) {
|
||||
while (1) {
|
||||
SMqConsumerEp *pConsumerEp = NULL;
|
||||
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
|
||||
if (pRemovedIter == NULL) {
|
||||
if (pIter != NULL) {
|
||||
taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
|
||||
pIter = NULL;
|
||||
}
|
||||
break;
|
||||
}
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||
pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
|
||||
if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
pRebVg = (SMqRebOutputVg *)pRemovedIter;
|
||||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
||||
if (pRebVg->newConsumerId == pRebVg->oldConsumerId) {
|
||||
mInfo("mq rebalance: skip vg %d for same consumer:0x%" PRIx64 " (second scan)", pRebVg->pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
continue;
|
||||
}
|
||||
taosArrayPush(pOutput->rebVgs, pRebVg);
|
||||
mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
}
|
||||
} else {
|
||||
// if all consumer is removed, put all vg into unassigned
|
||||
pIter = NULL;
|
||||
SMqRebOutputVg *pRebOutput = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pHash, pIter);
|
||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
|
||||
pRebOutput = (SMqRebOutputVg *)pIter;
|
||||
if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
|
||||
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
|
||||
if (pRemovedIter == NULL) {
|
||||
mError("sub:%s removed iter is null", pSubKey);
|
||||
break;
|
||||
}
|
||||
|
||||
pRebVg = (SMqRebOutputVg *)pRemovedIter;
|
||||
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
||||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||
mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// All assigned vg should be put into pOutput->rebVgs
|
||||
if(pRemovedIter != NULL){
|
||||
mError("sub:%s pRemovedIter should be NULL", pSubKey);
|
||||
}
|
||||
while (1) {
|
||||
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
|
||||
if (pRemovedIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pRemovedIter;
|
||||
taosArrayPush(pOutput->rebVgs, pRebOutput);
|
||||
if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed, put all vg into unassigned
|
||||
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
|
||||
taosArrayPush(pOutput->rebVgs, pRebOutput);
|
||||
mInfo("sub:%s mq re-balance unassign vgId:%d (second scan)", pSubKey, pRebOutput->pVgEp->vgId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -462,19 +447,18 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pSubKey,
|
||||
pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
|
||||
}
|
||||
{
|
||||
pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) break;
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
||||
mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
|
||||
mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
}
|
||||
|
||||
pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) break;
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
||||
mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
|
||||
mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -653,13 +637,13 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
|
|||
mndReleaseTopic(pMnode, pTopic);
|
||||
|
||||
rebInput.oldConsumerNum = 0;
|
||||
mInfo("topic:%s has no consumers sub yet", topic);
|
||||
mInfo("sub topic:%s has no consumers sub yet", pRebInfo->key);
|
||||
} else {
|
||||
taosRLockLatch(&pSub->lock);
|
||||
rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash);
|
||||
rebOutput.pSub = tCloneSubscribeObj(pSub);
|
||||
taosRUnLockLatch(&pSub->lock);
|
||||
mInfo("topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum);
|
||||
mInfo("sub topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum);
|
||||
mndReleaseSubscribe(pMnode, pSub);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue