feat:[TS-4592]remove lost status for consumer
This commit is contained in:
parent
44027f7978
commit
9c2dae3613
|
@ -362,13 +362,13 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
|
|||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
int32_t j = 0;
|
||||
while (j < taosArrayGetSize(pConsumerEp->vgs)) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
|
||||
MND_TMQ_NULL_CHECK(pVgEp);
|
||||
SMqVgEp *pVgEpTmp = taosArrayGetP(pConsumerEp->vgs, j);
|
||||
MND_TMQ_NULL_CHECK(pVgEpTmp);
|
||||
bool find = false;
|
||||
for (int32_t k = 0; k < taosArrayGetSize(newVgs); k++) {
|
||||
SMqVgEp *pnewVgEp = taosArrayGetP(newVgs, k);
|
||||
MND_TMQ_NULL_CHECK(pnewVgEp);
|
||||
if (pVgEp->vgId == pnewVgEp->vgId) {
|
||||
if (pVgEpTmp->vgId == pnewVgEp->vgId) {
|
||||
tDeleteSMqVgEp(pnewVgEp);
|
||||
taosArrayRemove(newVgs, k);
|
||||
find = true;
|
||||
|
@ -376,8 +376,8 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
|
|||
}
|
||||
}
|
||||
if (!find) {
|
||||
mInfo("[rebalance] processRemoveAddVgs old vgId:%d", pVgEp->vgId);
|
||||
tDeleteSMqVgEp(pVgEp);
|
||||
mInfo("[rebalance] processRemoveAddVgs old vgId:%d", pVgEpTmp->vgId);
|
||||
tDeleteSMqVgEp(pVgEpTmp);
|
||||
taosArrayRemove(pConsumerEp->vgs, j);
|
||||
continue;
|
||||
}
|
||||
|
@ -385,7 +385,7 @@ static int32_t processRemoveAddVgs(SMnode *pMnode, SMqRebOutputObj *pOutput) {
|
|||
}
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(newVgs) != 0) {
|
||||
if (taosArrayGetSize(pOutput->pSub->unassignedVgs) == 0 && taosArrayGetSize(newVgs) != 0) {
|
||||
MND_TMQ_NULL_CHECK(taosArrayAddAll(pOutput->pSub->unassignedVgs, newVgs));
|
||||
mInfo("[rebalance] processRemoveAddVgs add new vg num:%d", (int)taosArrayGetSize(newVgs));
|
||||
taosArrayDestroy(newVgs);
|
||||
|
|
Loading…
Reference in New Issue