From 924b334b00169fa4faaf339cb5191f1df1829370 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 24 May 2022 00:07:09 +0800 Subject: [PATCH] fix: avoid memory leak whilel stop sync --- source/dnode/mgmt/mgmt_mnode/inc/mmInt.h | 31 ++++++++++++--------- source/dnode/mgmt/mgmt_mnode/src/mmInt.c | 21 ++++++++++++++ source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 10 ++++++- source/dnode/mnode/impl/src/mndSync.c | 7 +++-- source/dnode/mnode/impl/src/mnode.c | 2 -- 5 files changed, 52 insertions(+), 19 deletions(-) diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 75e83d6547..030d4b309e 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -24,19 +24,22 @@ extern "C" { #endif typedef struct SMnodeMgmt { - SDnodeData *pData; - SMnode *pMnode; - SMsgCb msgCb; - const char *path; - const char *name; - SSingleWorker queryWorker; - SSingleWorker readWorker; - SSingleWorker writeWorker; - SSingleWorker syncWorker; - SSingleWorker monitorWorker; - SReplica replicas[TSDB_MAX_REPLICA]; - int8_t replica; - int8_t selfIndex; + SDnodeData *pData; + SMnode *pMnode; + SMsgCb msgCb; + const char *path; + const char *name; + SSingleWorker queryWorker; + SSingleWorker readWorker; + SSingleWorker writeWorker; + SSingleWorker syncWorker; + SSingleWorker monitorWorker; + SReplica replicas[TSDB_MAX_REPLICA]; + int8_t replica; + int8_t selfIndex; + bool stopped; + int32_t refCount; + TdThreadRwlock lock; } SMnodeMgmt; // mmFile.c @@ -45,6 +48,8 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed); // mmInt.c int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg); +int32_t mmAcquire(SMnodeMgmt *pMgmt); +void mmRelease(SMnodeMgmt *pMgmt); // mmHandle.c SArray *mmGetMsgHandles(); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 68b2889278..43113d05af 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -110,6 +110,7 @@ static void mmClose(SMnodeMgmt *pMgmt) { if (pMgmt->pMnode != NULL) { mmStopWorker(pMgmt); mndClose(pMgmt->pMnode); + taosThreadRwlockDestroy(&pMgmt->lock); pMgmt->pMnode = NULL; } @@ -142,6 +143,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue; pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToSyncQueue; pMgmt->msgCb.mgmt = pMgmt; + taosThreadRwlockInit(&pMgmt->lock, NULL); bool deployed = false; if (mmReadFile(pMgmt, &deployed) != 0) { @@ -211,3 +213,22 @@ SMgmtFunc mmGetMgmtFunc() { return mgmtFunc; } + +int32_t mmAcquire(SMnodeMgmt *pMgmt) { + int32_t code = 0; + + taosThreadRwlockRdlock(&pMgmt->lock); + if (pMgmt->stopped) { + code = -1; + } else { + atomic_add_fetch_32(&pMgmt->refCount, 1); + } + taosThreadRwlockUnlock(&pMgmt->lock); + return code; +} + +void mmRelease(SMnodeMgmt *pMgmt) { + taosThreadRwlockRdlock(&pMgmt->lock); + atomic_sub_fetch_32(&pMgmt->refCount, 1); + taosThreadRwlockUnlock(&pMgmt->lock); +} \ No newline at end of file diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index f408992aa6..45dec1153f 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -111,7 +111,10 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { } int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); + if (mmAcquire(pMgmt) != 0) return -1; + int32_t code = mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); + mmRelease(pMgmt); + return code; } int32_t mmStartWorker(SMnodeMgmt *pMgmt) { @@ -180,6 +183,11 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { } void mmStopWorker(SMnodeMgmt *pMgmt) { + taosThreadRwlockWrlock(&pMgmt->lock); + pMgmt->stopped = 1; + taosThreadRwlockUnlock(&pMgmt->lock); + while (pMgmt->refCount > 0) taosMsleep(10); + tSingleWorkerCleanup(&pMgmt->monitorWorker); tSingleWorkerCleanup(&pMgmt->queryWorker); tSingleWorkerCleanup(&pMgmt->readWorker); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 84da778300..e3f7b40526 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -152,9 +152,10 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { } void mndSyncStart(SMnode *pMnode) { - syncSetMsgCb(pMnode->syncMgmt.sync, &pMnode->msgCb); - syncStart(pMnode->syncMgmt.sync); - mDebug("sync:%" PRId64 " is started", pMnode->syncMgmt.sync); + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + syncSetMsgCb(pMgmt->sync, &pMnode->msgCb); + syncStart(pMgmt->sync); + mDebug("sync:%" PRId64 " is started", pMgmt->sync); } void mndSyncStop(SMnode *pMnode) {} diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 535d04e565..5b8c1a3101 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -435,8 +435,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { } return ret; - - return 0; } int32_t mndProcessMsg(SRpcMsg *pMsg) {