From 3ada61c346f46257e9eb146cc12bd620c71f4e9d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 16 Nov 2022 11:13:53 +0800 Subject: [PATCH 1/5] restore some invalid code modify --- source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/src/syncMain.c | 39 +++++++++++++++++++++++++++------ 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index e4d4b28b6b..0e0042d526 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -70,7 +70,7 @@ typedef struct SSyncTimer { uint64_t counter; int32_t timerMS; SRaftId destId; - SSyncHbTimerData hbData; + void* pData; } SSyncTimer; typedef struct SElectTimer { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 514160235f..036127bf9c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -665,12 +665,13 @@ static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRa static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { int32_t ret = 0; if (syncIsInit()) { - SSyncHbTimerData* pData = &pSyncTimer->hbData; + SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData)); pData->pSyncNode = pSyncNode; pData->pTimer = pSyncTimer; pData->destId = pSyncTimer->destId; pData->logicClock = pSyncTimer->logicClock; + pSyncTimer->pData = pData; taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager, &pSyncTimer->pTimer); } else { sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId); @@ -1104,8 +1105,15 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { int32_t ret = 0; if (syncIsInit()) { pSyncNode->electTimerMS = ms; - taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, syncEnv()->pTimerManager, + + SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer)); + pElectTimer->logicClock = pSyncNode->electTimerLogicClock; + pElectTimer->pSyncNode = pSyncNode; + pElectTimer->pData = NULL; + + taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, syncEnv()->pTimerManager, &pSyncNode->pElectTimer); + } else { sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId); } @@ -1855,28 +1863,45 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { } static void syncNodeEqElectTimer(void* param, void* tmrId) { - SSyncNode* pNode = param; - if (!syncIsInit()) return; + + SElectTimer* pElectTimer = param; + SSyncNode* pNode = pElectTimer->pSyncNode; + if (pNode == NULL) return; if (pNode->syncEqMsg == NULL) return; SRpcMsg rpcMsg = {0}; - int32_t code = - syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerLogicClock, pNode->electTimerMS, pNode); + int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode); + if (code != 0) { sError("failed to build elect msg"); + taosMemoryFree(pElectTimer); return; } SyncTimeout* pTimeout = rpcMsg.pCont; - sTrace("enqueue elect msg lc:%" PRId64, pTimeout->logicClock); + sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { sError("failed to sync enqueue elect msg since %s", terrstr()); rpcFreeCont(rpcMsg.pCont); + taosMemoryFree(pElectTimer); + return; } + + taosMemoryFree(pElectTimer); + +#if 0 + // reset timer ms + if (syncIsInit() && pNode->electBaseLine > 0) { + pNode->electTimerMS = syncUtilElectRandomMS(pNode->electBaseLine, 2 * pNode->electBaseLine); + taosTmrReset(syncNodeEqElectTimer, pNode->electTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pElectTimer); + } else { + sError("sync env is stop, syncNodeEqElectTimer"); + } +#endif } static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { From 7bcd13e82ae20e39d4b2217d79b8d7e8f9f94a87 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 16 Nov 2022 14:05:34 +0800 Subject: [PATCH 2/5] fix(sync): fix elect timer memory leak --- source/libs/sync/inc/syncInt.h | 14 +++++++---- source/libs/sync/src/syncMain.c | 44 ++++++++++++++------------------- source/libs/sync/src/syncUtil.c | 4 +-- 3 files changed, 29 insertions(+), 33 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 0e0042d526..0507bb3661 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -41,7 +41,7 @@ typedef struct SSyncRespMgr SSyncRespMgr; typedef struct SSyncSnapshotSender SSyncSnapshotSender; typedef struct SSyncSnapshotReceiver SSyncSnapshotReceiver; typedef struct SSyncTimer SSyncTimer; -typedef struct SSyncHbTimerData SSyncHbTimerData; +typedef struct SSyncHbTimerParam SSyncHbTimerParam; typedef struct SyncSnapshotSend SyncSnapshotSend; typedef struct SyncSnapshotRsp SyncSnapshotRsp; typedef struct SyncLocalCmd SyncLocalCmd; @@ -56,12 +56,13 @@ typedef struct SRaftId { SyncGroupId vgId; } SRaftId; -typedef struct SSyncHbTimerData { +typedef struct SSyncHbTimerParam { SSyncNode* pSyncNode; SSyncTimer* pTimer; SRaftId destId; uint64_t logicClock; -} SSyncHbTimerData; + int64_t executeTime; +} SSyncHbTimerParam; typedef struct SSyncTimer { void* pTimer; @@ -73,11 +74,12 @@ typedef struct SSyncTimer { void* pData; } SSyncTimer; -typedef struct SElectTimer { +typedef struct SElectTimerParam { uint64_t logicClock; SSyncNode* pSyncNode; + int64_t executeTime; void* pData; -} SElectTimer; +} SElectTimerParam; typedef struct SPeerState { SyncIndex lastSendIndex; @@ -153,6 +155,7 @@ typedef struct SSyncNode { uint64_t electTimerLogicClock; TAOS_TMR_CALLBACK FpElectTimerCB; // Timer Fp uint64_t electTimerCounter; + SElectTimerParam electTimerParam; // heartbeat timer tmr_h pHeartbeatTimer; @@ -161,6 +164,7 @@ typedef struct SSyncNode { uint64_t heartbeatTimerLogicClockUser; TAOS_TMR_CALLBACK FpHeartbeatTimerCB; // Timer Fp uint64_t heartbeatTimerCounter; + SSyncHbTimerParam hbTimerParam; // peer heartbeat timer SSyncTimer peerHeartbeatTimerArr[TSDB_MAX_REPLICA]; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 036127bf9c..7484e93abe 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -665,7 +665,7 @@ static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRa static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { int32_t ret = 0; if (syncIsInit()) { - SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData)); + SSyncHbTimerParam* pData = taosMemoryMalloc(sizeof(SSyncHbTimerParam)); pData->pSyncNode = pSyncNode; pData->pTimer = pSyncTimer; pData->destId = pSyncTimer->destId; @@ -1106,12 +1106,13 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { if (syncIsInit()) { pSyncNode->electTimerMS = ms; - SElectTimer* pElectTimer = taosMemoryMalloc(sizeof(SElectTimer)); - pElectTimer->logicClock = pSyncNode->electTimerLogicClock; - pElectTimer->pSyncNode = pSyncNode; - pElectTimer->pData = NULL; + int64_t execTime = taosGetTimestampMs() + ms; + atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime); + atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock); + pSyncNode->electTimerParam.pSyncNode = pSyncNode; + pSyncNode->electTimerParam.pData = NULL; - taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pElectTimer, syncEnv()->pTimerManager, + taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, syncEnv()->pTimerManager, &pSyncNode->pElectTimer); } else { @@ -1865,18 +1866,21 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { static void syncNodeEqElectTimer(void* param, void* tmrId) { if (!syncIsInit()) return; - SElectTimer* pElectTimer = param; - SSyncNode* pNode = pElectTimer->pSyncNode; + SSyncNode* pNode = (SSyncNode*)param; if (pNode == NULL) return; if (pNode->syncEqMsg == NULL) return; + int64_t tsNow = taosGetTimestampMs(); + if (tsNow < pNode->electTimerParam.executeTime) return; + SRpcMsg rpcMsg = {0}; - int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode); + int32_t code = + syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode); if (code != 0) { sError("failed to build elect msg"); - taosMemoryFree(pElectTimer); + return; } @@ -1887,21 +1891,9 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { if (code != 0) { sError("failed to sync enqueue elect msg since %s", terrstr()); rpcFreeCont(rpcMsg.pCont); - taosMemoryFree(pElectTimer); + return; } - - taosMemoryFree(pElectTimer); - -#if 0 - // reset timer ms - if (syncIsInit() && pNode->electBaseLine > 0) { - pNode->electTimerMS = syncUtilElectRandomMS(pNode->electBaseLine, 2 * pNode->electBaseLine); - taosTmrReset(syncNodeEqElectTimer, pNode->electTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pElectTimer); - } else { - sError("sync env is stop, syncNodeEqElectTimer"); - } -#endif } static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { @@ -1938,9 +1930,9 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { } static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { - SSyncHbTimerData* pData = (SSyncHbTimerData*)param; - SSyncNode* pSyncNode = pData->pSyncNode; - SSyncTimer* pSyncTimer = pData->pTimer; + SSyncHbTimerParam* pData = (SSyncHbTimerParam*)param; + SSyncNode* pSyncNode = pData->pSyncNode; + SSyncTimer* pSyncTimer = pData->pTimer; if (pSyncNode == NULL) { return; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index fb1b07b0b6..1d0ad07c99 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -193,7 +193,7 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { } void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) { - if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; + if (pNode == NULL || pNode->pRaftCfg == NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; int64_t currentTerm = pNode->pRaftStore->currentTerm; // save error code, otherwise it will be overwritten @@ -554,4 +554,4 @@ void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term, pMsg->voteGranted, s); -} \ No newline at end of file +} From 81592dfb09bc18b5ecc49e7efb21bf4c03fc32bb Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 16 Nov 2022 14:25:21 +0800 Subject: [PATCH 3/5] fix(sync): fix hb-timer memory leak --- source/libs/sync/inc/syncInt.h | 8 +++----- source/libs/sync/src/syncMain.c | 9 ++++----- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 0507bb3661..b7ca735ade 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -56,13 +56,12 @@ typedef struct SRaftId { SyncGroupId vgId; } SRaftId; -typedef struct SSyncHbTimerParam { +typedef struct SSyncHbTimerData { SSyncNode* pSyncNode; SSyncTimer* pTimer; SRaftId destId; uint64_t logicClock; - int64_t executeTime; -} SSyncHbTimerParam; +} SSyncHbTimerData; typedef struct SSyncTimer { void* pTimer; @@ -71,7 +70,7 @@ typedef struct SSyncTimer { uint64_t counter; int32_t timerMS; SRaftId destId; - void* pData; + SSyncHbTimerData hbData; } SSyncTimer; typedef struct SElectTimerParam { @@ -164,7 +163,6 @@ typedef struct SSyncNode { uint64_t heartbeatTimerLogicClockUser; TAOS_TMR_CALLBACK FpHeartbeatTimerCB; // Timer Fp uint64_t heartbeatTimerCounter; - SSyncHbTimerParam hbTimerParam; // peer heartbeat timer SSyncTimer peerHeartbeatTimerArr[TSDB_MAX_REPLICA]; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7484e93abe..d47bf4f401 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -665,13 +665,12 @@ static int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRa static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { int32_t ret = 0; if (syncIsInit()) { - SSyncHbTimerParam* pData = taosMemoryMalloc(sizeof(SSyncHbTimerParam)); + SSyncHbTimerData* pData = &pSyncTimer->hbData; pData->pSyncNode = pSyncNode; pData->pTimer = pSyncTimer; pData->destId = pSyncTimer->destId; pData->logicClock = pSyncTimer->logicClock; - pSyncTimer->pData = pData; taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, syncEnv()->pTimerManager, &pSyncTimer->pTimer); } else { sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId); @@ -1930,9 +1929,9 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { } static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { - SSyncHbTimerParam* pData = (SSyncHbTimerParam*)param; - SSyncNode* pSyncNode = pData->pSyncNode; - SSyncTimer* pSyncTimer = pData->pTimer; + SSyncHbTimerData* pData = (SSyncHbTimerData*)param; + SSyncNode* pSyncNode = pData->pSyncNode; + SSyncTimer* pSyncTimer = pData->pTimer; if (pSyncNode == NULL) { return; From 4c912aa95537baf46a7cdf4eb336e62652eff2ef Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 16 Nov 2022 14:28:20 +0800 Subject: [PATCH 4/5] fix(sync): fix hb-timer memory leak --- source/libs/sync/inc/syncInt.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index b7ca735ade..362618fece 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -41,7 +41,7 @@ typedef struct SSyncRespMgr SSyncRespMgr; typedef struct SSyncSnapshotSender SSyncSnapshotSender; typedef struct SSyncSnapshotReceiver SSyncSnapshotReceiver; typedef struct SSyncTimer SSyncTimer; -typedef struct SSyncHbTimerParam SSyncHbTimerParam; +typedef struct SSyncHbTimerData SSyncHbTimerData; typedef struct SyncSnapshotSend SyncSnapshotSend; typedef struct SyncSnapshotRsp SyncSnapshotRsp; typedef struct SyncLocalCmd SyncLocalCmd; From 1114bdefb4db0fbc44deeae53a283fd49dbfae77 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 16 Nov 2022 14:31:17 +0800 Subject: [PATCH 5/5] fix: invalid log print --- source/libs/sync/src/syncUtil.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 1d0ad07c99..b50336cd63 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -193,7 +193,7 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { } void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) { - if (pNode == NULL || pNode->pRaftCfg == NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; + if (pNode == NULL || pNode->pRaftCfg == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; int64_t currentTerm = pNode->pRaftStore->currentTerm; // save error code, otherwise it will be overwritten @@ -252,7 +252,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender, const char* format, ...) { SSyncNode* pNode = pSender->pSyncNode; - if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; + if (pNode == NULL || pNode->pRaftCfg == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) { @@ -304,7 +304,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver, const char* format, ...) { SSyncNode* pNode = pReceiver->pSyncNode; - if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; + if (pNode == NULL || pNode->pRaftCfg == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {