fix: memory leak of sync timer
This commit is contained in:
parent
665e2400ef
commit
a57efc49b4
|
@ -349,6 +349,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
rpcFreeCont(req.pCont);
|
rpcFreeCont(req.pCont);
|
||||||
|
req.pCont = NULL;
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code);
|
mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -70,7 +70,7 @@ typedef struct SSyncTimer {
|
||||||
uint64_t counter;
|
uint64_t counter;
|
||||||
int32_t timerMS;
|
int32_t timerMS;
|
||||||
SRaftId destId;
|
SRaftId destId;
|
||||||
void* pData;
|
SSyncHbTimerData hbData;
|
||||||
} SSyncTimer;
|
} SSyncTimer;
|
||||||
|
|
||||||
typedef struct SElectTimer {
|
typedef struct SElectTimer {
|
||||||
|
|
|
@ -442,7 +442,9 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
|
||||||
pMsg->newLeaderId.vgId = pSyncNode->vgId;
|
pMsg->newLeaderId.vgId = pSyncNode->vgId;
|
||||||
pMsg->newNodeInfo = newLeader;
|
pMsg->newNodeInfo = newLeader;
|
||||||
|
|
||||||
return syncNodePropose(pSyncNode, &rpcMsg, false);
|
int32_t ret = syncNodePropose(pSyncNode, &rpcMsg, false);
|
||||||
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncState syncGetState(int64_t rid) {
|
SSyncState syncGetState(int64_t rid) {
|
||||||
|
@ -645,13 +647,12 @@ static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRa
|
||||||
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
|
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
if (syncIsInit()) {
|
if (syncIsInit()) {
|
||||||
SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData));
|
SSyncHbTimerData* pData = &pSyncTimer->hbData;
|
||||||
pData->pSyncNode = pSyncNode;
|
pData->pSyncNode = pSyncNode;
|
||||||
pData->pTimer = pSyncTimer;
|
pData->pTimer = pSyncTimer;
|
||||||
pData->destId = pSyncTimer->destId;
|
pData->destId = pSyncTimer->destId;
|
||||||
pData->logicClock = pSyncTimer->logicClock;
|
pData->logicClock = pSyncTimer->logicClock;
|
||||||
|
|
||||||
pSyncTimer->pData = pData;
|
|
||||||
taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager, &pSyncTimer->pTimer);
|
taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager, &pSyncTimer->pTimer);
|
||||||
} else {
|
} else {
|
||||||
sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
|
sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId);
|
||||||
|
@ -664,7 +665,6 @@ static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
|
||||||
atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
|
atomic_add_fetch_64(&pSyncTimer->logicClock, 1);
|
||||||
taosTmrStop(pSyncTimer->pTimer);
|
taosTmrStop(pSyncTimer->pTimer);
|
||||||
pSyncTimer->pTimer = NULL;
|
pSyncTimer->pTimer = NULL;
|
||||||
// taosMemoryFree(pSyncTimer->pData);
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1086,15 +1086,8 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
if (syncIsInit()) {
|
if (syncIsInit()) {
|
||||||
pSyncNode->electTimerMS = ms;
|
pSyncNode->electTimerMS = ms;
|
||||||
|
taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, syncEnv()->pTimerManager,
|
||||||
SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer));
|
|
||||||
pElectTimer->logicClock = pSyncNode->electTimerLogicClock;
|
|
||||||
pElectTimer->pSyncNode = pSyncNode;
|
|
||||||
pElectTimer->pData = NULL;
|
|
||||||
|
|
||||||
taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, syncEnv()->pTimerManager,
|
|
||||||
&pSyncNode->pElectTimer);
|
&pSyncNode->pElectTimer);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
|
sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId);
|
||||||
}
|
}
|
||||||
|
@ -1827,20 +1820,17 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
||||||
|
SSyncNode* pNode = param;
|
||||||
|
|
||||||
if (!syncIsInit()) return;
|
if (!syncIsInit()) return;
|
||||||
|
|
||||||
SElectTimer* pElectTimer = param;
|
|
||||||
SSyncNode* pNode = pElectTimer->pSyncNode;
|
|
||||||
|
|
||||||
if (pNode == NULL) return;
|
if (pNode == NULL) return;
|
||||||
if (pNode->syncEqMsg == NULL) return;
|
if (pNode->syncEqMsg == NULL) return;
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode);
|
int32_t code =
|
||||||
|
syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerLogicClock, pNode->electTimerMS, pNode);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
sError("failed to build elect msg");
|
sError("failed to build elect msg");
|
||||||
taosMemoryFree(pElectTimer);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1851,12 +1841,8 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
sError("failed to sync enqueue elect msg since %s", terrstr());
|
sError("failed to sync enqueue elect msg since %s", terrstr());
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
taosMemoryFree(pElectTimer);
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pElectTimer);
|
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
// reset timer ms
|
// reset timer ms
|
||||||
if (syncIsInit() && pNode->electBaseLine > 0) {
|
if (syncIsInit() && pNode->electBaseLine > 0) {
|
||||||
|
|
Loading…
Reference in New Issue