fix:[TS-5067] memory leak

This commit is contained in:
wangmm0220 2024-06-21 18:25:35 +08:00
parent a49dc93baf
commit 51a66a684a
4 changed files with 36 additions and 11 deletions

View File

@ -542,6 +542,8 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) {
int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req); int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req);
if (contLen < 0) { if (contLen < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbCancelFetch(pMnode->pSdb, pDetail);
sdbRelease(pMnode->pSdb, pDetail);
continue; continue;
} }

View File

@ -1125,11 +1125,22 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
if (pNewDb->cfg.withArbitrator) { if (pNewDb->cfg.withArbitrator) {
SArbGroup arbGroup = {0}; SArbGroup arbGroup = {0};
mndArbGroupInitFromVgObj(&newVgroup, &arbGroup); mndArbGroupInitFromVgObj(&newVgroup, &arbGroup);
if (mndSetCreateArbGroupCommitLogs(pTrans, &arbGroup) != 0) return -1; if (mndSetCreateArbGroupCommitLogs(pTrans, &arbGroup) != 0) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
taosArrayDestroy(pArray);
return -1;
}
} else { } else {
SArbGroup arbGroup = {0}; SArbGroup arbGroup = {0};
mndArbGroupInitFromVgObj(pVgroup, &arbGroup); mndArbGroupInitFromVgObj(pVgroup, &arbGroup);
if (mndSetDropArbGroupCommitLogs(pTrans, &arbGroup) != 0) return -1; if (mndSetDropArbGroupCommitLogs(pTrans, &arbGroup) != 0) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pVgroup);
taosArrayDestroy(pArray);
return -1;
}
} }
} }
} }

View File

@ -811,6 +811,7 @@ static void checkConsumer(SMnode *pMnode, SMqSubscribeObj* pSub){
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pConsumerEp->consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pConsumerEp->consumerId);
if (pConsumer != NULL) { if (pConsumer != NULL) {
mndReleaseConsumer(pMnode, pConsumer);
continue; continue;
} }
mError("consumer:0x%" PRIx64 " not exists in sdb for exception", pConsumerEp->consumerId); mError("consumer:0x%" PRIx64 " not exists in sdb for exception", pConsumerEp->consumerId);
@ -933,6 +934,7 @@ END:
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans){ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans){
void* pIter = NULL; void* pIter = NULL;
SVgObj* pVgObj = NULL; SVgObj* pVgObj = NULL;
int32_t ret = 0;
while (1) { while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgObj); pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgObj);
if (pIter == NULL) { if (pIter == NULL) {
@ -946,8 +948,8 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran
SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq)); SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
if(pReq == NULL){ if(pReq == NULL){
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbRelease(pMnode->pSdb, pVgObj); ret = -1;
return -1; goto END;
} }
pReq->head.vgId = htonl(pVgObj->vgId); pReq->head.vgId = htonl(pVgObj->vgId);
pReq->vgId = pVgObj->vgId; pReq->vgId = pVgObj->vgId;
@ -963,16 +965,20 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran
sdbRelease(pMnode->pSdb, pVgObj); sdbRelease(pMnode->pSdb, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); ret = -1;
return -1; goto END;
} }
} }
return 0; END:
sdbRelease(pMnode->pSdb, pVgObj);
sdbCancelFetch(pMnode->pSdb, pIter);
return ret;
} }
static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic){ static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic){
void *pIter = NULL; void *pIter = NULL;
SMqConsumerObj *pConsumer = NULL; SMqConsumerObj *pConsumer = NULL;
int ret = 0;
while (1) { while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
if (pIter == NULL) { if (pIter == NULL) {
@ -990,16 +996,19 @@ static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgro
if (strcmp(topic, name) == 0) { if (strcmp(topic, name) == 0) {
int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer); int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer);
if (code != 0) { if (code != 0) {
sdbRelease(pMnode->pSdb, pConsumer); ret = code;
sdbCancelFetch(pMnode->pSdb, pIter); goto END;
return code;
} }
} }
} }
sdbRelease(pMnode->pSdb, pConsumer); sdbRelease(pMnode->pSdb, pConsumer);
} }
return 0;
END:
sdbRelease(pMnode->pSdb, pConsumer);
sdbCancelFetch(pMnode->pSdb, pIter);
return ret;
} }
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {

View File

@ -555,6 +555,9 @@ class TDTestCase:
tdSql.query(f'show consumers') tdSql.query(f'show consumers')
tdSql.checkRows(1) tdSql.checkRows(1)
consumer1.close() consumer1.close()
tdSql.execute(f'drop topic t1')
tdSql.execute(f'drop topic t2')
tdSql.execute(f'drop database d1')
def run(self): def run(self):
self.consume_TS_5067_Test() self.consume_TS_5067_Test()