fix:[TS-5067] check if consumer belong to this cgroup:topic where drop cgroup
This commit is contained in:
parent
ddcf7c74f7
commit
a49dc93baf
|
@ -856,9 +856,9 @@ static int32_t buildRebOutput(SMnode *pMnode, SMqRebInputObj *rebInput, SMqRebOu
|
||||||
mInfo("[rebalance] sub topic:%s has no consumers sub yet", key);
|
mInfo("[rebalance] sub topic:%s has no consumers sub yet", key);
|
||||||
} else {
|
} else {
|
||||||
taosRLockLatch(&pSub->lock);
|
taosRLockLatch(&pSub->lock);
|
||||||
rebOutput.pSub = tCloneSubscribeObj(pSub);
|
rebOutput->pSub = tCloneSubscribeObj(pSub);
|
||||||
checkConsumer(pMnode, rebOutput.pSub);
|
checkConsumer(pMnode, rebOutput->pSub);
|
||||||
rebInput.oldConsumerNum = taosHashGetSize(rebOutput.pSub->consumerHash);
|
rebInput->oldConsumerNum = taosHashGetSize(rebOutput->pSub->consumerHash);
|
||||||
taosRUnLockLatch(&pSub->lock);
|
taosRUnLockLatch(&pSub->lock);
|
||||||
|
|
||||||
mInfo("[rebalance] sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum);
|
mInfo("[rebalance] sub topic:%s has %d consumers sub till now", key, rebInput->oldConsumerNum);
|
||||||
|
@ -979,7 +979,15 @@ static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgro
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (strcmp(cgroup, pConsumer->cgroup) == 0 && taosArrayGetSize(pConsumer->currentTopics) == 0) {
|
// drop consumer in lost status, other consumers not in lost status already deleted by rebalance
|
||||||
|
if (pConsumer->status != MQ_CONSUMER_STATUS_LOST || strcmp(cgroup, pConsumer->cgroup) != 0) {
|
||||||
|
sdbRelease(pMnode->pSdb, pConsumer);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
char *name = taosArrayGetP(pConsumer->assignedTopics, i);
|
||||||
|
if (strcmp(topic, name) == 0) {
|
||||||
int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer);
|
int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
sdbRelease(pMnode->pSdb, pConsumer);
|
sdbRelease(pMnode->pSdb, pConsumer);
|
||||||
|
@ -987,6 +995,8 @@ static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgro
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
sdbRelease(pMnode->pSdb, pConsumer);
|
sdbRelease(pMnode->pSdb, pConsumer);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
Loading…
Reference in New Issue