fix:add rows to pSub if rebalance

This commit is contained in:
wangmm0220 2023-06-28 16:37:57 +08:00
parent ce0b71c634
commit 865a3b2509
1 changed files with 24 additions and 25 deletions

View File

@ -472,43 +472,42 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows
if (pSub) {
taosRLockLatch(&pSub->lock);
bool init = false;
if (pOutput->pSub->offsetRows == NULL) {
pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
init = true;
}
pIter = NULL;
while (1) {
pIter = taosHashIterate(pSub->consumerHash, pIter);
if (pIter == NULL) break;
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
if (init) {
taosArrayAddAll(pOutput->pSub->offsetRows, pConsumerEp->offsetRows);
mInfo("pSub->offsetRows is init");
} else {
SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
bool jump = false;
for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++){
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, j);
if(pVgEp->vgId == d1->vgId){
jump = true;
mInfo("pSub->offsetRows jump, because consumer id:%"PRIx64 " and vgId:%d not change", pConsumerEp->consumerId, pVgEp->vgId);
break;
}
for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
bool jump = false;
for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++){
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, j);
if(pVgEp->vgId == d1->vgId){
jump = true;
mInfo("pSub->offsetRows jump, because consumer id:%"PRIx64 " and vgId:%d not change", pConsumerEp->consumerId, pVgEp->vgId);
break;
}
if(jump) continue;
for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
if (d1->vgId == d2->vgId) {
d2->rows += d1->rows;
d2->offset = d1->offset;
mInfo("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows);
}
}
if(jump) continue;
bool find = false;
for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
if (d1->vgId == d2->vgId) {
d2->rows += d1->rows;
d2->offset = d1->offset;
find = true;
mInfo("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows);
break;
}
}
if(!find){
taosArrayPush(pOutput->pSub->offsetRows, d1);
}
}
}
taosRUnLockLatch(&pSub->lock);