diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index e4d4b28b6b..362618fece 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -73,11 +73,12 @@ typedef struct SSyncTimer { SSyncHbTimerData hbData; } SSyncTimer; -typedef struct SElectTimer { +typedef struct SElectTimerParam { uint64_t logicClock; SSyncNode* pSyncNode; + int64_t executeTime; void* pData; -} SElectTimer; +} SElectTimerParam; typedef struct SPeerState { SyncIndex lastSendIndex; @@ -153,6 +154,7 @@ typedef struct SSyncNode { uint64_t electTimerLogicClock; TAOS_TMR_CALLBACK FpElectTimerCB; // Timer Fp uint64_t electTimerCounter; + SElectTimerParam electTimerParam; // heartbeat timer tmr_h pHeartbeatTimer; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 514160235f..d47bf4f401 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1104,8 +1104,16 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { int32_t ret = 0; if (syncIsInit()) { pSyncNode->electTimerMS = ms; + + int64_t execTime = taosGetTimestampMs() + ms; + atomic_store_64(&(pSyncNode->electTimerParam.executeTime), execTime); + atomic_store_64(&(pSyncNode->electTimerParam.logicClock), pSyncNode->electTimerLogicClock); + pSyncNode->electTimerParam.pSyncNode = pSyncNode; + pSyncNode->electTimerParam.pData = NULL; + taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, pSyncNode, syncEnv()->pTimerManager, &pSyncNode->pElectTimer); + } else { sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId); } @@ -1855,27 +1863,35 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { } static void syncNodeEqElectTimer(void* param, void* tmrId) { - SSyncNode* pNode = param; - if (!syncIsInit()) return; + + SSyncNode* pNode = (SSyncNode*)param; + if (pNode == NULL) return; if (pNode->syncEqMsg == NULL) return; + int64_t tsNow = taosGetTimestampMs(); + if (tsNow < pNode->electTimerParam.executeTime) return; + SRpcMsg rpcMsg = {0}; int32_t code = - syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerLogicClock, pNode->electTimerMS, pNode); + syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pNode->electTimerParam.logicClock, pNode->electTimerMS, pNode); + if (code != 0) { sError("failed to build elect msg"); + return; } SyncTimeout* pTimeout = rpcMsg.pCont; - sTrace("enqueue elect msg lc:%" PRId64, pTimeout->logicClock); + sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { sError("failed to sync enqueue elect msg since %s", terrstr()); rpcFreeCont(rpcMsg.pCont); + + return; } } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index fb1b07b0b6..b50336cd63 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -193,7 +193,7 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { } void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) { - if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; + if (pNode == NULL || pNode->pRaftCfg == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; int64_t currentTerm = pNode->pRaftStore->currentTerm; // save error code, otherwise it will be overwritten @@ -252,7 +252,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender, const char* format, ...) { SSyncNode* pNode = pSender->pSyncNode; - if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; + if (pNode == NULL || pNode->pRaftCfg == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) { @@ -304,7 +304,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver, const char* format, ...) { SSyncNode* pNode = pReceiver->pSyncNode; - if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; + if (pNode == NULL || pNode->pRaftCfg == NULL || pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) { @@ -554,4 +554,4 @@ void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, "send sync-request-vote-reply to %s:%d {term:%" PRId64 ", grant:%d}, %s", host, port, pMsg->term, pMsg->voteGranted, s); -} \ No newline at end of file +}