From c7691590be6580bc21dd5bcf5216377883bea6d4 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 23 May 2022 23:53:15 +0800 Subject: [PATCH 1/3] enh(sync) add syncIsRestoreFinish --- include/libs/sync/sync.h | 1 + source/libs/sync/src/syncMain.c | 12 ++++++++++++ 2 files changed, 13 insertions(+) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 1eed353f33..2bf678fa48 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -146,6 +146,7 @@ int32_t syncGetVgId(int64_t rid); int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); bool syncEnvIsStart(); const char* syncStr(ESyncState state); +bool syncIsRestoreFinish(int64_t rid); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 821ed364e8..b5315e33c8 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -157,6 +157,18 @@ ESyncState syncGetMyRole(int64_t rid) { return state; } +bool syncIsRestoreFinish(int64_t rid) { + SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); + if (pSyncNode == NULL) { + return false; + } + assert(rid == pSyncNode->rid); + bool b = pSyncNode->restoreFinish; + + taosReleaseRef(tsNodeRefId, pSyncNode->rid); + return b; +} + const char* syncGetMyRoleStr(int64_t rid) { const char* s = syncUtilState2String(syncGetMyRole(rid)); return s; From 0e8e04805ac0f93a20673f56b0d68896cb4a4275 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 24 May 2022 09:46:54 +0800 Subject: [PATCH 2/3] fix: avoid memory leak --- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 41 ++++++++++++++++++--- source/dnode/mnode/impl/src/mndSync.c | 10 +---- source/libs/sync/src/syncMain.c | 2 + 3 files changed, 39 insertions(+), 14 deletions(-) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 45dec1153f..40b1bd5cfb 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -99,21 +99,50 @@ static inline int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) } int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return mmPutRpcMsgToWorker(&pMgmt->queryWorker, pMsg); + int32_t code = -1; + if (mmAcquire(pMgmt) == 0) { + mmPutRpcMsgToWorker(&pMgmt->queryWorker, pMsg); + mmRelease(pMgmt); + } else { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + } + return code; } int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return mmPutRpcMsgToWorker(&pMgmt->writeWorker, pMsg); + int32_t code = -1; + if (mmAcquire(pMgmt) == 0) { + code = mmPutRpcMsgToWorker(&pMgmt->writeWorker, pMsg); + mmRelease(pMgmt); + } else { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + } + return code; } int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - return mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg); + int32_t code = -1; + if (mmAcquire(pMgmt) == 0) { + code = mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg); + mmRelease(pMgmt); + } else { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + } + return code; } int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - if (mmAcquire(pMgmt) != 0) return -1; - int32_t code = mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); - mmRelease(pMgmt); + int32_t code = -1; + if (mmAcquire(pMgmt) == 0) { + code = mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); + mmRelease(pMgmt); + } else { + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; + } return code; } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index e3f7b40526..b77a2c20a6 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -17,13 +17,7 @@ #include "mndSync.h" #include "mndTrans.h" -int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { - int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); - if (code != 0) { - rpcFreeCont(pMsg->pCont); - } - return code; -} +int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } @@ -33,7 +27,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSdbRaw *pRaw = pMsg->pCont; - mTrace("ver:%" PRId64 ", apply raw:%p to sdb, role:%s", cbMeta.index, pRaw, syncStr(cbMeta.state)); + mTrace("raw:%p, apply to sdb, ver:%" PRId64 " role:%s", pRaw, cbMeta.index, syncStr(cbMeta.state)); sdbWriteWithoutFree(pSdb, pRaw); sdbSetApplyIndex(pSdb, cbMeta.index); sdbSetApplyTerm(pSdb, cbMeta.term); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b5315e33c8..a233603adf 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -320,8 +320,10 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS; SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { + rpcFreeCont(pMsg->pCont); return TAOS_SYNC_PROPOSE_OTHER_ERROR; } + assert(rid == pSyncNode->rid); if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { From 5a8a24b463c322e322456bf3a328c92875c64f14 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 24 May 2022 10:19:14 +0800 Subject: [PATCH 3/3] fix: avoid memory leak --- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 34 ++++----------------- 1 file changed, 6 insertions(+), 28 deletions(-) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 40b1bd5cfb..7249edc706 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -99,39 +99,15 @@ static inline int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) } int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - int32_t code = -1; - if (mmAcquire(pMgmt) == 0) { - mmPutRpcMsgToWorker(&pMgmt->queryWorker, pMsg); - mmRelease(pMgmt); - } else { - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; - } - return code; + return mmPutRpcMsgToWorker(&pMgmt->queryWorker, pMsg); } int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - int32_t code = -1; - if (mmAcquire(pMgmt) == 0) { - code = mmPutRpcMsgToWorker(&pMgmt->writeWorker, pMsg); - mmRelease(pMgmt); - } else { - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; - } - return code; + return mmPutRpcMsgToWorker(&pMgmt->writeWorker, pMsg); } int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - int32_t code = -1; - if (mmAcquire(pMgmt) == 0) { - code = mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg); - mmRelease(pMgmt); - } else { - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; - } - return code; + return mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg); } int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { @@ -139,7 +115,9 @@ int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { if (mmAcquire(pMgmt) == 0) { code = mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); mmRelease(pMgmt); - } else { + } + + if (code != 0) { rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; }