fix(tmq): rebalance

This commit is contained in:
Liu Jicong 2022-08-16 19:46:42 +08:00
parent b2b0f2f8fa
commit 58ea5563a5
1 changed files with 22 additions and 10 deletions

View File

@ -356,7 +356,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pConsumerEp->consumerId; pRebVg->newConsumerId = pConsumerEp->consumerId;
taosArrayPush(pOutput->rebVgs, pRebVg); taosArrayPush(pOutput->rebVgs, pRebVg);
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 " (second scan) (not enough)", pRebVg->pVgEp->vgId,
pConsumerEp->consumerId); pConsumerEp->consumerId);
} }
} }
@ -365,22 +365,34 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) { if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) {
// if has consumer, assign all left vg // if has consumer, assign all left vg
while (1) { while (1) {
SMqConsumerEp *pConsumerEp = NULL;
pRemovedIter = taosHashIterate(pHash, pRemovedIter); pRemovedIter = taosHashIterate(pHash, pRemovedIter);
if (pRemovedIter == NULL) break; if (pRemovedIter == NULL) {
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); if (pIter != NULL) {
ASSERT(pIter); taosHashCancelIterate(pOutput->pSub->consumerHash, pIter);
pIter = NULL;
}
break;
}
while (1) {
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
ASSERT(pIter);
pConsumerEp = (SMqConsumerEp *)pIter;
ASSERT(pConsumerEp->consumerId > 0);
if (taosArrayGetSize(pConsumerEp->vgs) == minVgCnt + 1) {
continue;
}
}
pRebVg = (SMqRebOutputVg *)pRemovedIter; pRebVg = (SMqRebOutputVg *)pRemovedIter;
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
ASSERT(pConsumerEp->consumerId > 0);
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp); taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
pRebVg->newConsumerId = pConsumerEp->consumerId; pRebVg->newConsumerId = pConsumerEp->consumerId;
if (pRebVg->newConsumerId == pRebVg->oldConsumerId) { if (pRebVg->newConsumerId == pRebVg->oldConsumerId) {
mInfo("mq rebalance: skip vg %d for same consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, mInfo("mq rebalance: skip vg %d for same consumer:%" PRId64 " (second scan)", pRebVg->pVgEp->vgId,
pConsumerEp->consumerId); pConsumerEp->consumerId);
continue; continue;
} }
taosArrayPush(pOutput->rebVgs, pRebVg); taosArrayPush(pOutput->rebVgs, pRebVg);
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId,
pConsumerEp->consumerId); pConsumerEp->consumerId);
} }
} else { } else {
@ -571,7 +583,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
/*ASSERT(pTopic);*/ /*ASSERT(pTopic);*/
if (pTopic == NULL) { if (pTopic == NULL) {
mError("rebalance %s failed since topic %s was dropped, abort", pRebInfo->key, topic); mError("mq rebalance %s failed since topic %s not exist, abort", pRebInfo->key, topic);
continue; continue;
} }
taosRLockLatch(&pTopic->lock); taosRLockLatch(&pTopic->lock);
@ -601,7 +613,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
// TODO replace assert with error check // TODO replace assert with error check
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) { if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
mError("persist rebalance output error, possibly vnode splitted or dropped"); mError("mq rebalance persist rebalance output error, possibly vnode splitted or dropped");
} }
taosArrayDestroy(pRebInfo->lostConsumers); taosArrayDestroy(pRebInfo->lostConsumers);
taosArrayDestroy(pRebInfo->newConsumers); taosArrayDestroy(pRebInfo->newConsumers);