From 51a66a684a5b6afdd8d562ac245cec4dab7658ea Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 21 Jun 2024 18:25:35 +0800 Subject: [PATCH] fix:[TS-5067] memory leak --- source/dnode/mnode/impl/src/mndCompact.c | 2 ++ source/dnode/mnode/impl/src/mndDb.c | 15 ++++++++++-- source/dnode/mnode/impl/src/mndSubscribe.c | 27 ++++++++++++++-------- tests/system-test/7-tmq/tmq_taosx.py | 3 +++ 4 files changed, 36 insertions(+), 11 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndCompact.c b/source/dnode/mnode/impl/src/mndCompact.c index abbcb52db4..308089a9c1 100644 --- a/source/dnode/mnode/impl/src/mndCompact.c +++ b/source/dnode/mnode/impl/src/mndCompact.c @@ -542,6 +542,8 @@ void mndCompactSendProgressReq(SMnode *pMnode, SCompactObj *pCompact) { int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req); if (contLen < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; + sdbCancelFetch(pMnode->pSdb, pDetail); + sdbRelease(pMnode->pSdb, pDetail); continue; } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 78e3ceabce..88b38d5e38 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1125,11 +1125,22 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj * if (pNewDb->cfg.withArbitrator) { SArbGroup arbGroup = {0}; mndArbGroupInitFromVgObj(&newVgroup, &arbGroup); - if (mndSetCreateArbGroupCommitLogs(pTrans, &arbGroup) != 0) return -1; + if (mndSetCreateArbGroupCommitLogs(pTrans, &arbGroup) != 0) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + taosArrayDestroy(pArray); + return -1; + } + } else { SArbGroup arbGroup = {0}; mndArbGroupInitFromVgObj(pVgroup, &arbGroup); - if (mndSetDropArbGroupCommitLogs(pTrans, &arbGroup) != 0) return -1; + if (mndSetDropArbGroupCommitLogs(pTrans, &arbGroup) != 0) { + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + taosArrayDestroy(pArray); + return -1; + } } } } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index afeeeef924..5f019d07f5 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -811,6 +811,7 @@ static void checkConsumer(SMnode *pMnode, SMqSubscribeObj* pSub){ SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pConsumerEp->consumerId); if (pConsumer != NULL) { + mndReleaseConsumer(pMnode, pConsumer); continue; } mError("consumer:0x%" PRIx64 " not exists in sdb for exception", pConsumerEp->consumerId); @@ -933,6 +934,7 @@ END: static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans){ void* pIter = NULL; SVgObj* pVgObj = NULL; + int32_t ret = 0; while (1) { pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgObj); if (pIter == NULL) { @@ -946,8 +948,8 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq)); if(pReq == NULL){ terrno = TSDB_CODE_OUT_OF_MEMORY; - sdbRelease(pMnode->pSdb, pVgObj); - return -1; + ret = -1; + goto END; } pReq->head.vgId = htonl(pVgObj->vgId); pReq->vgId = pVgObj->vgId; @@ -963,16 +965,20 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran sdbRelease(pMnode->pSdb, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - return -1; + ret = -1; + goto END; } } - return 0; + END: + sdbRelease(pMnode->pSdb, pVgObj); + sdbCancelFetch(pMnode->pSdb, pIter); + return ret; } static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgroup, char *topic){ void *pIter = NULL; SMqConsumerObj *pConsumer = NULL; + int ret = 0; while (1) { pIter = sdbFetch(pMnode->pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); if (pIter == NULL) { @@ -990,16 +996,19 @@ static int32_t mndDropConsumerByGroup(SMnode *pMnode, STrans *pTrans, char *cgro if (strcmp(topic, name) == 0) { int32_t code = mndSetConsumerDropLogs(pTrans, pConsumer); if (code != 0) { - sdbRelease(pMnode->pSdb, pConsumer); - sdbCancelFetch(pMnode->pSdb, pIter); - return code; + ret = code; + goto END; } } } sdbRelease(pMnode->pSdb, pConsumer); } - return 0; + +END: + sdbRelease(pMnode->pSdb, pConsumer); + sdbCancelFetch(pMnode->pSdb, pIter); + return ret; } static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index 200708e4ec..49b62d8abb 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -555,6 +555,9 @@ class TDTestCase: tdSql.query(f'show consumers') tdSql.checkRows(1) consumer1.close() + tdSql.execute(f'drop topic t1') + tdSql.execute(f'drop topic t2') + tdSql.execute(f'drop database d1') def run(self): self.consume_TS_5067_Test()