Merge pull request #12895 from taosdata/fix/mnode
fix: avoid memory leak
This commit is contained in:
commit
2cf5519318
|
@ -146,6 +146,7 @@ int32_t syncGetVgId(int64_t rid);
|
||||||
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
|
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
|
||||||
bool syncEnvIsStart();
|
bool syncEnvIsStart();
|
||||||
const char* syncStr(ESyncState state);
|
const char* syncStr(ESyncState state);
|
||||||
|
bool syncIsRestoreFinish(int64_t rid);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -111,9 +111,16 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
if (mmAcquire(pMgmt) != 0) return -1;
|
int32_t code = -1;
|
||||||
int32_t code = mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg);
|
if (mmAcquire(pMgmt) == 0) {
|
||||||
mmRelease(pMgmt);
|
code = mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg);
|
||||||
|
mmRelease(pMgmt);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
pMsg->pCont = NULL;
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,13 +17,7 @@
|
||||||
#include "mndSync.h"
|
#include "mndSync.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
|
|
||||||
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
|
||||||
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
|
|
||||||
if (code != 0) {
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
|
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
|
||||||
|
|
||||||
|
@ -33,7 +27,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
SSdbRaw *pRaw = pMsg->pCont;
|
SSdbRaw *pRaw = pMsg->pCont;
|
||||||
|
|
||||||
mTrace("ver:%" PRId64 ", apply raw:%p to sdb, role:%s", cbMeta.index, pRaw, syncStr(cbMeta.state));
|
mTrace("raw:%p, apply to sdb, ver:%" PRId64 " role:%s", pRaw, cbMeta.index, syncStr(cbMeta.state));
|
||||||
sdbWriteWithoutFree(pSdb, pRaw);
|
sdbWriteWithoutFree(pSdb, pRaw);
|
||||||
sdbSetApplyIndex(pSdb, cbMeta.index);
|
sdbSetApplyIndex(pSdb, cbMeta.index);
|
||||||
sdbSetApplyTerm(pSdb, cbMeta.term);
|
sdbSetApplyTerm(pSdb, cbMeta.term);
|
||||||
|
|
|
@ -157,6 +157,18 @@ ESyncState syncGetMyRole(int64_t rid) {
|
||||||
return state;
|
return state;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool syncIsRestoreFinish(int64_t rid) {
|
||||||
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
|
if (pSyncNode == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
assert(rid == pSyncNode->rid);
|
||||||
|
bool b = pSyncNode->restoreFinish;
|
||||||
|
|
||||||
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
|
||||||
const char* syncGetMyRoleStr(int64_t rid) {
|
const char* syncGetMyRoleStr(int64_t rid) {
|
||||||
const char* s = syncUtilState2String(syncGetMyRole(rid));
|
const char* s = syncUtilState2String(syncGetMyRole(rid));
|
||||||
return s;
|
return s;
|
||||||
|
@ -308,8 +320,10 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
|
||||||
int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS;
|
int32_t ret = TAOS_SYNC_PROPOSE_SUCCESS;
|
||||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
return TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
return TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(rid == pSyncNode->rid);
|
assert(rid == pSyncNode->rid);
|
||||||
|
|
||||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
|
|
Loading…
Reference in New Issue