fix(sync): fix elect timer memory leak

This commit is contained in:
Minghao Li 2022-11-16 14:05:34 +08:00
parent 3ada61c346
commit 7bcd13e82a
3 changed files with 29 additions and 33 deletions

View File

@ -41,7 +41,7 @@ typedef struct SSyncRespMgr SSyncRespMgr;
typedef struct SSyncSnapshotSender SSyncSnapshotSender; typedef struct SSyncSnapshotSender SSyncSnapshotSender;
typedef struct SSyncSnapshotReceiver SSyncSnapshotReceiver; typedef struct SSyncSnapshotReceiver SSyncSnapshotReceiver;
typedef struct SSyncTimer SSyncTimer; typedef struct SSyncTimer SSyncTimer;
typedef struct SSyncHbTimerData SSyncHbTimerData; typedef struct SSyncHbTimerParam SSyncHbTimerParam;
typedef struct SyncSnapshotSend SyncSnapshotSend; typedef struct SyncSnapshotSend SyncSnapshotSend;
typedef struct SyncSnapshotRsp SyncSnapshotRsp; typedef struct SyncSnapshotRsp SyncSnapshotRsp;
typedef struct SyncLocalCmd SyncLocalCmd; typedef struct SyncLocalCmd SyncLocalCmd;
@ -56,12 +56,13 @@ typedef struct SRaftId {
SyncGroupId vgId; SyncGroupId vgId;
} SRaftId; } SRaftId;
typedef struct SSyncHbTimerData { typedef struct SSyncHbTimerParam {
SSyncNode* pSyncNode; SSyncNode* pSyncNode;
SSyncTimer* pTimer; SSyncTimer* pTimer;
SRaftId destId; SRaftId destId;
uint64_t logicClock; uint64_t logicClock;
} SSyncHbTimerData; int64_t executeTime;
} SSyncHbTimerParam;
typedef struct SSyncTimer { typedef struct SSyncTimer {
void* pTimer; void* pTimer;
@ -73,11 +74,12 @@ typedef struct SSyncTimer {
void* pData; void* pData;
} SSyncTimer; } SSyncTimer;
typedef struct SElectTimer { typedef struct SElectTimerParam {
uint64_t logicClock; uint64_t logicClock;
SSyncNode* pSyncNode; SSyncNode* pSyncNode;
int64_t executeTime;
void* pData; void* pData;
} SElectTimer; } SElectTimerParam;
typedef struct SPeerState { typedef struct SPeerState {
SyncIndex lastSendIndex; SyncIndex lastSendIndex;
@ -153,6 +155,7 @@ typedef struct SSyncNode {
uint64_t electTimerLogicClock; uint64_t electTimerLogicClock;
TAOS_TMR_CALLBACK FpElectTimerCB; // Timer Fp TAOS_TMR_CALLBACK FpElectTimerCB; // Timer Fp
uint64_t electTimerCounter; uint64_t electTimerCounter;
SElectTimerParam electTimerParam;
// heartbeat timer // heartbeat timer
tmr_h pHeartbeatTimer; tmr_h pHeartbeatTimer;
@ -161,6 +164,7 @@ typedef struct SSyncNode {
uint64_t heartbeatTimerLogicClockUser; uint64_t heartbeatTimerLogicClockUser;
TAOS_TMR_CALLBACK FpHeartbeatTimerCB; // Timer Fp TAOS_TMR_CALLBACK FpHeartbeatTimerCB; // Timer Fp
uint64_t heartbeatTimerCounter; uint64_t heartbeatTimerCounter;
SSyncHbTimerParam hbTimerParam;
// peer heartbeat timer // peer heartbeat timer
SSyncTimer peerHeartbeatTimerArr[TSDB_MAX_REPLICA]; SSyncTimer peerHeartbeatTimerArr[TSDB_MAX_REPLICA];

View File

@ -665,7 +665,7 @@ static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRa
static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) {
int32_t ret = 0; int32_t ret = 0;
if (syncIsInit()) { if (syncIsInit()) {
SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData)); SSyncHbTimerParam* pData = taosMemoryMalloc(sizeof(SSyncHbTimerParam));
pData->pSyncNode = pSyncNode; pData->pSyncNode = pSyncNode;
pData->pTimer = pSyncTimer; pData->pTimer = pSyncTimer;
pData->destId = pSyncTimer->destId; pData->destId = pSyncTimer->destId;
@ -1106,12 +1106,13 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
if (syncIsInit()) { if (syncIsInit()) {
pSyncNode->electTimerMS = ms; pSyncNode->electTimerMS = ms;
SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer)); int64_t execTime = taosGetTimestampMs() + ms;
pElectTimer->logicClock = pSyncNode->electTimerLogicClock; atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime);
pElectTimer->pSyncNode = pSyncNode; atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock);
pElectTimer->pData = NULL; pSyncNode->electTimerParam.pSyncNode = pSyncNode;
pSyncNode->electTimerParam.pData = NULL;
taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, syncEnv()->pTimerManager, taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, syncEnv()->pTimerManager,
&pSyncNode->pElectTimer); &pSyncNode->pElectTimer);
} else { } else {
@ -1865,18 +1866,21 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
static void syncNodeEqElectTimer(void* param, void* tmrId) { static void syncNodeEqElectTimer(void* param, void* tmrId) {
if (!syncIsInit()) return; if (!syncIsInit()) return;
SElectTimer* pElectTimer = param; SSyncNode* pNode = (SSyncNode*)param;
SSyncNode* pNode = pElectTimer->pSyncNode;
if (pNode == NULL) return; if (pNode == NULL) return;
if (pNode->syncEqMsg == NULL) return; if (pNode->syncEqMsg == NULL) return;
int64_t tsNow = taosGetTimestampMs();
if (tsNow < pNode->electTimerParam.executeTime) return;
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode); int32_t code =
syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode);
if (code != 0) { if (code != 0) {
sError("failed to build elect msg"); sError("failed to build elect msg");
taosMemoryFree(pElectTimer);
return; return;
} }
@ -1887,21 +1891,9 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
if (code != 0) { if (code != 0) {
sError("failed to sync enqueue elect msg since %s", terrstr()); sError("failed to sync enqueue elect msg since %s", terrstr());
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
taosMemoryFree(pElectTimer);
return; return;
} }
taosMemoryFree(pElectTimer);
#if 0
// reset timer ms
if (syncIsInit() && pNode->electBaseLine > 0) {
pNode->electTimerMS = syncUtilElectRandomMS(pNode->electBaseLine, 2 * pNode->electBaseLine);
taosTmrReset(syncNodeEqElectTimer, pNode->electTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pElectTimer);
} else {
sError("sync env is stop, syncNodeEqElectTimer");
}
#endif
} }
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
@ -1938,7 +1930,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
} }
static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
SSyncHbTimerData* pData = (SSyncHbTimerData*)param; SSyncHbTimerParam* pData = (SSyncHbTimerParam*)param;
SSyncNode* pSyncNode = pData->pSyncNode; SSyncNode* pSyncNode = pData->pSyncNode;
SSyncTimer* pSyncTimer = pData->pTimer; SSyncTimer* pSyncTimer = pData->pTimer;

View File

@ -193,7 +193,7 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
} }
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) { void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) {
if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; if (pNode == NULL || pNode->pRaftCfg == NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;
int64_t currentTerm = pNode->pRaftStore->currentTerm; int64_t currentTerm = pNode->pRaftStore->currentTerm;
// save error code, otherwise it will be overwritten // save error code, otherwise it will be overwritten