fix:add rows to pSub if rebalance
This commit is contained in:
parent
4a17f4b9f5
commit
ce0b71c634
|
@ -484,16 +484,28 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
if (init) {
|
||||
taosArrayAddAll(pOutput->pSub->offsetRows, pConsumerEp->offsetRows);
|
||||
// mDebug("pSub->offsetRows is init");
|
||||
mInfo("pSub->offsetRows is init");
|
||||
} else {
|
||||
SMqConsumerEp *pConsumerEpNew = taosHashGet(pOutput->pSub->consumerHash, &pConsumerEp->consumerId, sizeof(int64_t));
|
||||
|
||||
for (int j = 0; j < taosArrayGetSize(pConsumerEp->offsetRows); j++) {
|
||||
OffsetRows *d1 = taosArrayGet(pConsumerEp->offsetRows, j);
|
||||
bool jump = false;
|
||||
for (int i = 0; pConsumerEpNew && i < taosArrayGetSize(pConsumerEpNew->vgs); i++){
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEpNew->vgs, j);
|
||||
if(pVgEp->vgId == d1->vgId){
|
||||
jump = true;
|
||||
mInfo("pSub->offsetRows jump, because consumer id:%"PRIx64 " and vgId:%d not change", pConsumerEp->consumerId, pVgEp->vgId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if(jump) continue;
|
||||
for (int i = 0; i < taosArrayGetSize(pOutput->pSub->offsetRows); i++) {
|
||||
OffsetRows *d2 = taosArrayGet(pOutput->pSub->offsetRows, i);
|
||||
if (d1->vgId == d2->vgId) {
|
||||
d2->rows += d1->rows;
|
||||
d2->offset = d1->offset;
|
||||
// mDebug("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows);
|
||||
mInfo("pSub->offsetRows add vgId:%d, after:%"PRId64", before:%"PRId64, d2->vgId, d2->rows, d1->rows);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue