diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index a74200472a..b2a866979c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -362,13 +362,13 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) { SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; int32_t j = 0; while (j < taosArrayGetSize(pConsumerEp->vgs)) { - SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); - MND_TMQ_NULL_CHECK(pVgEp); + SMqVgEp *pVgEpTmp = taosArrayGetP(pConsumerEp->vgs, j); + MND_TMQ_NULL_CHECK(pVgEpTmp); bool find = false; for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) { SMqVgEp *pnewVgEp = taosArrayGetP(newVgs, k); MND_TMQ_NULL_CHECK(pnewVgEp); - if (pVgEp->vgId == pnewVgEp->vgId) { + if (pVgEpTmp->vgId == pnewVgEp->vgId) { tDeleteSMqVgEp(pnewVgEp); taosArrayRemove(newVgs, k); find = true; @@ -376,8 +376,8 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) { } } if (!find) { - mInfo("[rebalance] processRemoveAddVgs old vgId:%d", pVgEp->vgId); - tDeleteSMqVgEp(pVgEp); + mInfo("[rebalance] processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId); + tDeleteSMqVgEp(pVgEpTmp); taosArrayRemove(pConsumerEp->vgs, j); continue; } @@ -385,7 +385,7 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) { } } - if (taosArrayGetSize(newVgs) != 0) { + if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) { MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs)); mInfo("[rebalance] processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs)); taosArrayDestroy(newVgs);