opti:the logic of mndDoRebalance for clear
This commit is contained in:
parent
584cdf041c
commit
f65cd36e80
|
@ -269,7 +269,7 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
|
||||||
};
|
};
|
||||||
|
|
||||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
|
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
|
||||||
mInfo("sub:%s mq re-balance remove vgId:%d from unassigned", pSubKey, pVgEp->vgId);
|
mInfo("sub:%s mq re-balance addUnassignedVgroups vgId:%d from unassigned", pSubKey, pVgEp->vgId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -399,33 +399,30 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(imbConsumerNum != 0) {
|
while (1) {
|
||||||
while (1) {
|
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
if (pIter == NULL) {
|
||||||
if (pIter == NULL) {
|
break;
|
||||||
|
}
|
||||||
|
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||||
|
|
||||||
|
if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
|
||||||
|
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
|
||||||
|
if (pRemovedIter == NULL) {
|
||||||
|
mError("sub:%s removed iter is null", pSubKey);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
|
||||||
|
|
||||||
if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt) {
|
pRebVg = (SMqRebOutputVg *)pRemovedIter;
|
||||||
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
|
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
||||||
if (pRemovedIter == NULL) {
|
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||||
mError("sub:%s removed iter is null", pSubKey);
|
mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
pRebVg = (SMqRebOutputVg *)pRemovedIter;
|
|
||||||
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
|
||||||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
|
||||||
mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " for average + 1", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// All assigned vg should be put into pOutput->rebVgs
|
// All assigned vg should be put into pOutput->rebVgs
|
||||||
if(pRemovedIter != NULL){
|
if(pRemovedIter != NULL){
|
||||||
mError("sub:%s pRemovedIter should be NULL", pSubKey);
|
mError("sub:%s error pRemovedIter should be NULL", pSubKey);
|
||||||
}
|
}
|
||||||
while (1) {
|
while (1) {
|
||||||
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
|
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
|
||||||
|
|
Loading…
Reference in New Issue