fix(tmq): fix the invalid free

This commit is contained in:
Haojun Liao 2023-03-29 10:46:44 +08:00
parent 70d0d7a63f
commit 94c6af39da
1 changed files with 3 additions and 7 deletions

View File

@ -596,13 +596,13 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
SMqDoRebalanceMsg *pReq = pMsg->pCont;
void *pIter = NULL;
bool rebalanceExec = false; // to ensure only once.
bool rebalanceOnce = false; // to ensure only once.
mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(pReq->rebSubHash));
// here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction.
while (1) {
if (rebalanceExec) {
if (rebalanceOnce) {
break;
}
@ -673,10 +673,6 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
mError("mq re-balance persist output error, possibly vnode splitted or dropped");
}
taosArrayDestroy(pRebInfo->lostConsumers);
taosArrayDestroy(pRebInfo->newConsumers);
taosArrayDestroy(pRebInfo->removedConsumers);
taosArrayDestroy(rebOutput.newConsumers);
taosArrayDestroy(rebOutput.touchedConsumers);
taosArrayDestroy(rebOutput.removedConsumers);
@ -684,7 +680,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
tDeleteSubscribeObj(rebOutput.pSub);
taosMemoryFree(rebOutput.pSub);
rebalanceExec = true;
rebalanceOnce = true;
}
// reset flag